From e2608e3f82a3813c7d9fa1e886c3e5efa491cbc9 Mon Sep 17 00:00:00 2001 From: Rohit Ghumare Date: Wed, 10 Jun 2026 19:44:17 +0100 Subject: [PATCH] Serve graph reads from side-indexes instead of full enumeration (#828) state::list over the graph scopes blocks the worker event loop past ~25K nodes, so every read path that enumerated mem:graph:nodes and mem:graph:edges (searchByEntities, expandFromChunks, temporalQuery, graph-query query/startNodeId) was a worker-killer at scale. New side-indexes under src/state/graph-indexes.ts: - mem:graph:name-shards: hash(nodeId) % 64 -> {id, name}[], the name catalog as 64 bounded gets with exact substring-match semantics - mem:graph:adjacency: nodeId -> incident edgeId[], bounding traversal by degree x depth (graph-query BFS capped at 5000 visited nodes) - mem:graph:obs-nodes: obsId -> nodeId[] for chunk expansion - mem:graph:index-meta: readiness marker; absent means readers fall back to the previous enumeration, never silently empty Writes are hints only; readers verify every hit against the live record (stale flag plus snapshot resetAt), so cascades and wipes need no index cleanup. Hints are written by graph-extract, temporal extract, mesh LWW merges, import, and snapshot restore. The marker is set by boot backfill (gated on snapshot totalNodes <= 25K), by graph-snapshot-rebuild, and by graph-reset, which also clears the name shards so post-reset retrieval stops surfacing pre-reset rows. Property-value matching in graph-query is served from the snapshot topNodes with an explicit coverage warning when the snapshot does not cover the whole corpus. Start-node iteration in retrieval scoring is now deterministic (sorted by node id) so index and enumeration paths score identically. Parity tests compare both paths on identical graphs for all four read paths, plus fallback, post-rebuild maintenance, and post-reset cases. --- src/functions/export-import.ts | 3 + src/functions/graph-retrieval.ts | 201 ++++++++++++++--- src/functions/graph.ts | 172 +++++++++++++- src/functions/mesh.ts | 30 ++- src/functions/snapshot.ts | 2 + src/functions/temporal-graph.ts | 8 + src/index.ts | 35 +++ src/state/graph-indexes.ts | 292 ++++++++++++++++++++++++ src/state/schema.ts | 17 ++ test/graph-index-parity.test.ts | 371 +++++++++++++++++++++++++++++++ 10 files changed, 1088 insertions(+), 43 deletions(-) create mode 100644 src/state/graph-indexes.ts create mode 100644 test/graph-index-parity.test.ts diff --git a/src/functions/export-import.ts b/src/functions/export-import.ts index 327117b26..1c6dacad7 100644 --- a/src/functions/export-import.ts +++ b/src/functions/export-import.ts @@ -26,6 +26,7 @@ import type { } from "../types.js"; import { normalizeAccessLog } from "./access-tracker.js"; import { KV } from "../state/schema.js"; +import { indexGraphEdge, indexGraphNode } from "../state/graph-indexes.js"; import { StateKV } from "../state/kv.js"; import { VERSION } from "../version.js"; import { recordAudit } from "./audit.js"; @@ -404,6 +405,7 @@ export function registerExportImportFunction(sdk: ISdk, kv: StateKV): void { if (existing) { stats.skipped++; continue; } } await kv.set(KV.graphNodes, node.id, node); + await indexGraphNode(kv, node); } } if (importData.graphEdges) { @@ -413,6 +415,7 @@ export function registerExportImportFunction(sdk: ISdk, kv: StateKV): void { if (existing) { stats.skipped++; continue; } } await kv.set(KV.graphEdges, edge.id, edge); + await indexGraphEdge(kv, edge); } } if (importData.semanticMemories) { diff --git a/src/functions/graph-retrieval.ts b/src/functions/graph-retrieval.ts index 10fd51a76..c14ac6cd1 100644 --- a/src/functions/graph-retrieval.ts +++ b/src/functions/graph-retrieval.ts @@ -4,6 +4,12 @@ import type { } from "../types.js"; import { KV } from "../state/schema.js"; import type { StateKV } from "../state/kv.js"; +import { + GraphIndexReader, + graphIndexesReady, + loadNameCatalog, + loadNodeIdsForObservations, +} from "../state/graph-indexes.js"; export interface GraphRetrievalResult { obsId: string; @@ -13,6 +19,10 @@ export interface GraphRetrievalResult { pathLength: number; } +type NeighborProvider = ( + nodeId: string, +) => Promise>; + function buildGraphContext( path: Array<{ node: GraphNode; edge?: GraphEdge }>, ): string { @@ -38,6 +48,31 @@ function buildGraphContext( return parts.join(" "); } +function neighborsFromArrays( + allNodes: GraphNode[], + allEdges: GraphEdge[], +): NeighborProvider { + const nodeIndex = new Map(); + for (const n of allNodes) nodeIndex.set(n.id, n); + + const adjacency = new Map< + string, + Array<{ node: GraphNode; edge: GraphEdge }> + >(); + const append = (from: string, to: string, edge: GraphEdge): void => { + const node = nodeIndex.get(to); + if (!node) return; + if (!adjacency.has(from)) adjacency.set(from, []); + adjacency.get(from)!.push({ node, edge }); + }; + for (const edge of allEdges) { + append(edge.sourceNodeId, edge.targetNodeId, edge); + append(edge.targetNodeId, edge.sourceNodeId, edge); + } + + return async (nodeId) => adjacency.get(nodeId) ?? []; +} + export class GraphRetrieval { constructor(private kv: StateKV) {} @@ -46,6 +81,28 @@ export class GraphRetrieval { maxDepth = 2, maxResults = 20, ): Promise { + if (await graphIndexesReady(this.kv)) { + const reader = await GraphIndexReader.open(this.kv); + const catalog = await loadNameCatalog(this.kv); + const lowered = entityNames.map((e) => e.toLowerCase()); + const matchingNodes: GraphNode[] = []; + for (const entry of catalog) { + const nameLower = entry.name.toLowerCase(); + const matched = lowered.some( + (e) => nameLower.includes(e) || e.includes(nameLower), + ); + if (!matched) continue; + const node = await reader.getNode(entry.id); + if (node) matchingNodes.push(node); + } + return this.scoreEntityMatches( + matchingNodes, + (id) => reader.getNeighbors(id), + maxDepth, + maxResults, + ); + } + const allNodes = (await this.kv.list(KV.graphNodes)).filter((n) => !n.stale); const allEdges = (await this.kv.list(KV.graphEdges)).filter((e) => !e.stale); @@ -58,16 +115,36 @@ export class GraphRetrieval { ); }); + return this.scoreEntityMatches( + matchingNodes, + neighborsFromArrays(allNodes, allEdges), + maxDepth, + maxResults, + ); + } + + private async scoreEntityMatches( + matchingNodes: GraphNode[], + getNeighbors: NeighborProvider, + maxDepth: number, + maxResults: number, + ): Promise { if (matchingNodes.length === 0) return []; + // Which start node first claims an observation decides its score, + // so iterate in a deterministic order regardless of whether the + // matches came from enumeration or the sharded name catalog. + const orderedMatches = [...matchingNodes].sort((a, b) => + a.id.localeCompare(b.id), + ); + const results: GraphRetrievalResult[] = []; const visitedObs = new Set(); - for (const startNode of matchingNodes) { - const paths = this.dijkstraTraversal( + for (const startNode of orderedMatches) { + const paths = await this.dijkstraTraversal( startNode, - allNodes, - allEdges, + getNeighbors, maxDepth, ); @@ -119,6 +196,28 @@ export class GraphRetrieval { maxDepth = 1, maxResults = 10, ): Promise { + if (await graphIndexesReady(this.kv)) { + const reader = await GraphIndexReader.open(this.kv); + const candidateIds = await loadNodeIdsForObservations(this.kv, obsIds); + const linkedNodes: GraphNode[] = []; + for (const nodeId of candidateIds) { + const node = await reader.getNode(nodeId); + if ( + node && + (node.sourceObservationIds ?? []).some((id) => obsIds.includes(id)) + ) { + linkedNodes.push(node); + } + } + return this.scoreExpansion( + linkedNodes, + (id) => reader.getNeighbors(id), + obsIds, + maxDepth, + maxResults, + ); + } + const allNodes = (await this.kv.list(KV.graphNodes)).filter((n) => !n.stale); const allEdges = (await this.kv.list(KV.graphEdges)).filter((e) => !e.stale); @@ -126,11 +225,31 @@ export class GraphRetrieval { n.sourceObservationIds.some((id) => obsIds.includes(id)), ); + return this.scoreExpansion( + linkedNodes, + neighborsFromArrays(allNodes, allEdges), + obsIds, + maxDepth, + maxResults, + ); + } + + private async scoreExpansion( + linkedNodes: GraphNode[], + getNeighbors: NeighborProvider, + obsIds: string[], + maxDepth: number, + maxResults: number, + ): Promise { + const orderedLinked = [...linkedNodes].sort((a, b) => + a.id.localeCompare(b.id), + ); + const results: GraphRetrievalResult[] = []; const visitedObs = new Set(obsIds); - for (const node of linkedNodes) { - const paths = this.dijkstraTraversal(node, allNodes, allEdges, maxDepth); + for (const node of orderedLinked) { + const paths = await this.dijkstraTraversal(node, getNeighbors, maxDepth); for (const path of paths) { const lastNode = path[path.length - 1].node; for (const obsId of lastNode.sourceObservationIds) { @@ -163,6 +282,25 @@ export class GraphRetrieval { currentState: GraphEdge[]; history: GraphEdge[]; }> { + if (await graphIndexesReady(this.kv)) { + const reader = await GraphIndexReader.open(this.kv); + const catalog = await loadNameCatalog(this.kv); + const lower = entityName.toLowerCase(); + let entity: GraphNode | null = null; + for (const entry of catalog) { + if (entry.name.toLowerCase() !== lower) continue; + const node = await reader.getNode(entry.id); + if (node) { + entity = node; + break; + } + } + if (!entity) return { entity: null, currentState: [], history: [] }; + + const relatedEdges = await reader.getIncidentEdges(entity.id); + return this.partitionTemporalEdges(entity, relatedEdges, asOf); + } + const allNodes = (await this.kv.list(KV.graphNodes)).filter((n) => !n.stale); const allEdges = (await this.kv.list(KV.graphEdges)).filter((e) => !e.stale); @@ -175,6 +313,18 @@ export class GraphRetrieval { (e) => e.sourceNodeId === entity.id || e.targetNodeId === entity.id, ); + return this.partitionTemporalEdges(entity, relatedEdges, asOf); + } + + private partitionTemporalEdges( + entity: GraphNode, + relatedEdges: GraphEdge[], + asOf?: string, + ): { + entity: GraphNode | null; + currentState: GraphEdge[]; + history: GraphEdge[]; + } { if (!asOf) { const latestEdges = this.getLatestEdges(relatedEdges); const historicalEdges = relatedEdges.filter( @@ -231,32 +381,16 @@ export class GraphRetrieval { // which fell back to edge-count order and ignored the 0.1-1.0 weight // attached to every graph edge. Dijkstra over `cost = 1/weight` // (cheaper edges = stronger relationships) returns the - // highest-weighted path to each reachable node within maxDepth. Also - // tightens the perf profile: - // - Adjacency built once in O(V+E) (previous BFS re-filtered - // allEdges per visited node, O(V·E) overall). - // - Min-heap dequeue is O(log V) per pop (previous queue.shift() - // was O(n) — the dominant cost on graphs above ~200 nodes per - // the contributor's benchmark in #328). - private dijkstraTraversal( + // highest-weighted path to each reachable node within maxDepth. + // Neighbor expansion is delegated to the provider so the same + // traversal serves both the enumeration fallback (prebuilt adjacency + // over kv.list arrays) and the side-index path (targeted adjacency + // gets bounded by degree x maxDepth). + private async dijkstraTraversal( startNode: GraphNode, - allNodes: GraphNode[], - allEdges: GraphEdge[], + getNeighbors: NeighborProvider, maxDepth: number, - ): Array> { - const nodeIndex = new Map(); - for (const n of allNodes) nodeIndex.set(n.id, n); - - const adjacency = new Map>(); - for (const edge of allEdges) { - const a = edge.sourceNodeId; - const b = edge.targetNodeId; - if (!adjacency.has(a)) adjacency.set(a, []); - if (!adjacency.has(b)) adjacency.set(b, []); - adjacency.get(a)!.push({ neighborId: b, edge }); - adjacency.get(b)!.push({ neighborId: a, edge }); - } - + ): Promise>> { const dist = new Map(); const pathTo = new Map>(); dist.set(startNode.id, 0); @@ -273,10 +407,9 @@ export class GraphRetrieval { if (cost > (dist.get(nodeId) ?? Infinity)) continue; if (depth >= maxDepth) continue; - const neighbors = adjacency.get(nodeId) ?? []; - for (const { neighborId, edge } of neighbors) { - const nextNode = nodeIndex.get(neighborId); - if (!nextNode) continue; + const neighbors = await getNeighbors(nodeId); + for (const { node: nextNode, edge } of neighbors) { + const neighborId = nextNode.id; // Clamp weight to avoid division-by-zero on malformed edges; // 0.01 is below the documented 0.1 floor. const edgeCost = 1 / Math.max(edge.weight, 0.01); diff --git a/src/functions/graph.ts b/src/functions/graph.ts index bb0d72722..4a34fd9af 100644 --- a/src/functions/graph.ts +++ b/src/functions/graph.ts @@ -9,6 +9,18 @@ import type { } from "../types.js"; import { KV, generateId } from "../state/schema.js"; import type { StateKV } from "../state/kv.js"; +import { + GRAPH_INDEX_NODE_CEILING, + GraphIndexReader, + backfillGraphIndexes, + clearNameShards, + graphIndexesReady, + indexGraphEdge, + indexGraphNode, + linkObservationsToNode, + loadNameCatalog, + markGraphIndexesReady, +} from "../state/graph-indexes.js"; import { GRAPH_EXTRACTION_SYSTEM, buildGraphExtractionPrompt, @@ -179,7 +191,122 @@ function paginateFromSnapshot( // extract-driven snapshot is the right approach for those corpora. // Operators above the threshold should use mem::graph-reset and let // future extracts rebuild incrementally. -const REBUILD_SAFE_NODE_CEILING = 25000; +const REBUILD_SAFE_NODE_CEILING = GRAPH_INDEX_NODE_CEILING; + +// Bounds the index-served BFS in mem::graph-query so a dense corpus +// can't expand into an unbounded number of targeted gets. Hitting the +// cap returns a truncated page with an explanatory warning. +const TRAVERSAL_VISIT_CAP = 5000; + +async function queryViaIndexes( + kv: StateKV, + query: string, + limit: number, + offset: number, +): Promise { + const reader = await GraphIndexReader.open(kv); + const lower = query.toLowerCase(); + const catalog = await loadNameCatalog(kv); + const matched = new Map(); + for (const entry of catalog) { + if (!entry.name.toLowerCase().includes(lower)) continue; + const node = await reader.getNode(entry.id); + if (node) matched.set(node.id, node); + } + + const snap = await readSnapshot(kv); + let partialPropertyCoverage = false; + for (const node of snap?.topNodes ?? []) { + if (node.stale || matched.has(node.id)) continue; + const propMatch = Object.values(node.properties).some( + (v) => typeof v === "string" && v.toLowerCase().includes(lower), + ); + if (propMatch) matched.set(node.id, node); + } + if (!snap || snap.stats.totalNodes > snap.topNodes.length) { + partialPropertyCoverage = true; + } + + const nodes = [...matched.values()]; + const edgeIds = new Set(); + const edges: GraphEdge[] = []; + for (const node of nodes) { + for (const edge of await reader.getIncidentEdges(node.id)) { + if (edgeIds.has(edge.id)) continue; + edgeIds.add(edge.id); + edges.push(edge); + } + } + + const result = paginate(nodes, edges, 0, limit, offset); + if (partialPropertyCoverage) { + return { + ...result, + warning: + "Property-value matches are served from the top-degree snapshot; " + + "nodes outside it are matched by name only.", + }; + } + return result; +} + +async function traverseViaIndexes( + kv: StateKV, + startNodeId: string, + nodeType: string | undefined, + maxDepth: number, + limit: number, + offset: number, +): Promise { + const reader = await GraphIndexReader.open(kv); + const visited = new Set(); + const visitedEdges = new Set(); + const resultNodes: GraphNode[] = []; + const resultEdges: GraphEdge[] = []; + const queue: Array<{ nodeId: string; depth: number }> = [ + { nodeId: startNodeId, depth: 0 }, + ]; + let capped = false; + + while (queue.length > 0) { + const { nodeId, depth } = queue.shift()!; + if (visited.has(nodeId) || depth > maxDepth) continue; + if (visited.size >= TRAVERSAL_VISIT_CAP) { + capped = true; + break; + } + visited.add(nodeId); + + const node = await reader.getNode(nodeId); + if (node && (!nodeType || node.type === nodeType)) { + resultNodes.push(node); + } + + for (const edge of await reader.getIncidentEdges(nodeId)) { + if (!visitedEdges.has(edge.id)) { + visitedEdges.add(edge.id); + resultEdges.push(edge); + } + const nextId = + edge.sourceNodeId === nodeId ? edge.targetNodeId : edge.sourceNodeId; + if (!visited.has(nextId)) { + queue.push({ nodeId: nextId, depth: depth + 1 }); + } + } + } + + const result = paginate(resultNodes, resultEdges, maxDepth, limit, offset); + if (capped) { + return { + ...result, + truncated: true, + warning: + `Traversal stopped after visiting ${TRAVERSAL_VISIT_CAP} nodes. ` + + `Lower maxDepth or start from a lower-degree node for a complete walk.`, + }; + } + return result; +} function nameIndexKey(type: string, name: string): string { return `${type}|${name}`; @@ -519,6 +646,7 @@ export function registerGraphFunction( if (existing) { const merged = mergeNode(existing, node, obsIds, capturedAt); await kv.set(KV.graphNodes, existing.id, merged); + await linkObservationsToNode(kv, existing.id, obsIds); // Update topNodes entry if present so a stale clone isn't // returned from the snapshot fast path. const topIdx = snap.topNodes.findIndex( @@ -529,6 +657,7 @@ export function registerGraphFunction( await kv.set(KV.graphNodes, node.id, node); await kv.set(KV.graphNameIndex, indexKey, node.id); await kv.set(KV.graphNodeDegree, node.id, 0); + await indexGraphNode(kv, node); snap.stats.totalNodes += 1; snap.stats.nodesByType[node.type] = (snap.stats.nodesByType[node.type] ?? 0) + 1; @@ -575,6 +704,7 @@ export function registerGraphFunction( } else { await kv.set(KV.graphEdges, edge.id, edge); await kv.set(KV.graphEdgeKey, eKey, edge.id); + await indexGraphEdge(kv, edge); snap.stats.totalEdges += 1; snap.stats.edgesByType[edge.type] = (snap.stats.edgesByType[edge.type] ?? 0) + 1; @@ -671,10 +801,31 @@ export function registerGraphFunction( }; } - // Query / startNodeId paths still need broader access. Race the - // live enumeration against a wall-clock budget so a long - // kv.list doesn't block the worker indefinitely. On timeout the - // caller gets a snapshot-backed approximation instead of a 500. + // Query / startNodeId paths serve from the read side-indexes + // when they have been built (boot backfill, snapshot-rebuild, or + // graph-reset). Name matches come from the sharded name catalog + // (64 bounded gets) and traversal expands via per-node adjacency + // lists, so cost scales with matches x degree instead of corpus + // size. + if (await graphIndexesReady(kv)) { + if (data.query) { + return queryViaIndexes(kv, data.query, limit, offset); + } + return traverseViaIndexes( + kv, + data.startNodeId!, + data.nodeType, + maxDepth, + limit, + offset, + ); + } + + // Without the side-indexes the query / startNodeId paths still + // need broader access. Race the live enumeration against a + // wall-clock budget so a long kv.list doesn't block the worker + // indefinitely. On timeout the caller gets a snapshot-backed + // approximation instead of a 500. let allNodes: GraphNode[]; let allEdges: GraphEdge[]; try { @@ -928,6 +1079,8 @@ export function registerGraphFunction( ); } + await backfillGraphIndexes(kv, liveNodes, liveEdges); + const snap = buildSnapshotFromArrays(nodes, edges); await kv.set(KV.graphSnapshot, SNAPSHOT_KEY, snap); const tookMs = Date.now() - started; @@ -987,6 +1140,15 @@ export function registerGraphFunction( resetAt: new Date().toISOString(), }; await kv.set(KV.graphSnapshot, SNAPSHOT_KEY, resetSnapshot); + // The name shards are the only side-index with a bounded, known + // key set, so they can be wiped outright. Adjacency / obs-node + // hints for pre-reset rows stay on disk; index readers verify + // every hit against `resetAt`, so those orphans are never served. + // Marking the indexes ready flips retrieval onto the index path, + // which (unlike the enumeration fallback) applies the resetAt + // filter and therefore stops surfacing pre-reset rows. + await clearNameShards(kv); + await markGraphIndexesReady(kv); const counts: Record = { [KV.graphSnapshot]: 1, }; diff --git a/src/functions/mesh.ts b/src/functions/mesh.ts index ad52e5ea7..4e435e5d6 100644 --- a/src/functions/mesh.ts +++ b/src/functions/mesh.ts @@ -2,6 +2,7 @@ import type { ISdk } from "iii-sdk"; import type { StateKV } from "../state/kv.js"; import { KV, generateId } from "../state/schema.js"; import { withKeyedLock } from "../state/keyed-mutex.js"; +import { indexGraphEdge, indexGraphNode } from "../state/graph-indexes.js"; import { recordAudit } from "./audit.js"; import type { MeshPeer, @@ -80,6 +81,7 @@ async function lwwMergeList( items: T[] | undefined, lockPrefix: string, tsField: "updatedAt" | "createdAt", + onWrite?: (item: T) => Promise, ): Promise { if (!items || !Array.isArray(items)) return 0; let count = 0; @@ -100,7 +102,10 @@ async function lwwMergeList( } return false; }); - if (wrote) count++; + if (wrote) { + count++; + if (onWrite) await onWrite(item); + } } return count; } @@ -131,7 +136,10 @@ async function lwwMergeGraphNodes( } return false; }); - if (wrote) count++; + if (wrote) { + count++; + await indexGraphNode(kv, item); + } } return count; } @@ -361,7 +369,14 @@ export function registerMeshFunction( } } accepted += await lwwMergeGraphNodes(kv, data.graphNodes); - accepted += await lwwMergeList(kv, KV.graphEdges, data.graphEdges, "mem:gedge", "createdAt"); + accepted += await lwwMergeList( + kv, + KV.graphEdges, + data.graphEdges, + "mem:gedge", + "createdAt", + (edge) => indexGraphEdge(kv, edge), + ); await recordAudit(kv, "mesh_sync", "mem::mesh-receive", [], { action: "mesh.receive", accepted, @@ -488,7 +503,14 @@ async function applySyncData( applied += await lwwMergeGraphNodes(kv, data.graphNodes); } if (scopes.includes("graph:edges")) { - applied += await lwwMergeList(kv, KV.graphEdges, data.graphEdges, "mem:gedge", "createdAt"); + applied += await lwwMergeList( + kv, + KV.graphEdges, + data.graphEdges, + "mem:gedge", + "createdAt", + (edge) => indexGraphEdge(kv, edge), + ); } return applied; diff --git a/src/functions/snapshot.ts b/src/functions/snapshot.ts index 40430ce50..adf8b78dd 100644 --- a/src/functions/snapshot.ts +++ b/src/functions/snapshot.ts @@ -12,6 +12,7 @@ import type { } from "../types.js"; import { KV, generateId } from "../state/schema.js"; import type { StateKV } from "../state/kv.js"; +import { indexGraphNode } from "../state/graph-indexes.js"; import { recordAudit } from "./audit.js"; import { VERSION } from "../version.js"; import { logger } from "../logger.js"; @@ -194,6 +195,7 @@ export function registerSnapshotFunction( if (state.graphNodes) { for (const node of state.graphNodes) { await kv.set(KV.graphNodes, node.id, node); + await indexGraphNode(kv, node as unknown as GraphNode); } } if (state.observations) { diff --git a/src/functions/temporal-graph.ts b/src/functions/temporal-graph.ts index 1e8d1df10..92d8e2b3a 100644 --- a/src/functions/temporal-graph.ts +++ b/src/functions/temporal-graph.ts @@ -9,6 +9,11 @@ import type { } from "../types.js"; import { KV, generateId } from "../state/schema.js"; import type { StateKV } from "../state/kv.js"; +import { + indexGraphEdge, + indexGraphNode, + linkObservationsToNode, +} from "../state/graph-indexes.js"; import { logger } from "../logger.js"; const TEMPORAL_EXTRACTION_SYSTEM = `You are a temporal knowledge extraction engine. Given observations, extract entities AND their temporal relationships with full context metadata. @@ -216,10 +221,12 @@ export function registerTemporalGraphFunctions( }; if (merged.aliases.length === 0) delete (merged as any).aliases; await kv.set(KV.graphNodes, existing.id, merged); + await linkObservationsToNode(kv, existing.id, obsIds); node.id = existing.id; idRemap.set(oldId, existing.id); } else { await kv.set(KV.graphNodes, node.id, node); + await indexGraphNode(kv, node); existingNodes.push(node); } } @@ -254,6 +261,7 @@ export function registerTemporalGraphFunctions( } await kv.set(KV.graphEdges, edge.id, edge); + await indexGraphEdge(kv, edge); existingEdges.push(edge); } diff --git a/src/index.ts b/src/index.ts index 4233e8a67..ea85a0d58 100644 --- a/src/index.ts +++ b/src/index.ts @@ -21,6 +21,11 @@ import { } from "./providers/index.js"; import { StateKV } from "./state/kv.js"; import { KV } from "./state/schema.js"; +import { + GRAPH_INDEX_NODE_CEILING, + backfillGraphIndexes, + graphIndexesReady, +} from "./state/graph-indexes.js"; import { VectorIndex } from "./state/vector-index.js"; import { HybridSearch } from "./state/hybrid-search.js"; import { IndexPersistence } from "./state/index-persistence.js"; @@ -510,6 +515,36 @@ async function main() { } } + // Backfill the graph read side-indexes for corpora that predate + // them. Mirrors the BM25 memories backfill above: one-time, gated on + // the snapshot's recorded node count so we never enumerate a corpus + // large enough to starve the worker heartbeat. While the readiness + // marker is absent, graph retrieval falls back to full enumeration, + // so skipping here is safe (just slower). + try { + if (!(await graphIndexesReady(kv))) { + const graphSnap = await kv.get( + KV.graphSnapshot, + "current", + ); + const totalNodes = graphSnap?.stats?.totalNodes ?? 0; + if (graphSnap && totalNodes > 0 && totalNodes <= GRAPH_INDEX_NODE_CEILING) { + const [graphNodes, graphEdges] = await Promise.all([ + kv.list(KV.graphNodes), + kv.list(KV.graphEdges), + ]); + await backfillGraphIndexes( + kv, + graphNodes.filter((n) => !n.stale), + graphEdges.filter((e) => !e.stale), + ); + bootLog(`Backfilled graph read indexes (${totalNodes} nodes)`); + } + } + } catch (err) { + console.warn(`[agentmemory] Failed to backfill graph indexes:`, err); + } + // Ready / Endpoints lines are emitted via `bootLog` so they're // buffered in quiet mode and printed verbatim under --verbose. The // CLI surfaces a compact summary when it sees the worker reach diff --git a/src/state/graph-indexes.ts b/src/state/graph-indexes.ts new file mode 100644 index 000000000..57321fa94 --- /dev/null +++ b/src/state/graph-indexes.ts @@ -0,0 +1,292 @@ +import type { GraphNode, GraphEdge } from "../types.js"; +import { KV } from "./schema.js"; +import type { StateKV } from "./kv.js"; +import { withKeyedLock } from "./keyed-mutex.js"; + +export const NAME_SHARD_COUNT = 64; +export const GRAPH_INDEX_NODE_CEILING = 25000; +const META_KEY = "current"; +const SNAPSHOT_KEY = "current"; + +export interface NameCatalogEntry { + id: string; + name: string; +} + +export function nameShardKey(nodeId: string): string { + let hash = 5381; + for (let i = 0; i < nodeId.length; i++) { + hash = ((hash * 33) ^ nodeId.charCodeAt(i)) >>> 0; + } + return String(hash % NAME_SHARD_COUNT); +} + +export async function graphIndexesReady(kv: StateKV): Promise { + try { + const meta = await kv.get<{ version?: number }>(KV.graphIndexMeta, META_KEY); + return meta?.version === 1; + } catch { + return false; + } +} + +export async function markGraphIndexesReady(kv: StateKV): Promise { + await kv.set(KV.graphIndexMeta, META_KEY, { + version: 1, + builtAt: new Date().toISOString(), + }); +} + +export async function clearNameShards(kv: StateKV): Promise { + for (let shard = 0; shard < NAME_SHARD_COUNT; shard++) { + await kv.delete(KV.graphNameShards, String(shard)).catch(() => {}); + } +} + +export async function indexGraphNode( + kv: StateKV, + node: GraphNode, +): Promise { + if (!node?.id || typeof node.name !== "string") return; + const shard = nameShardKey(node.id); + await withKeyedLock(`gidx:shard:${shard}`, async () => { + const entries = + (await kv.get(KV.graphNameShards, shard)) ?? []; + if (!entries.some((e) => e.id === node.id)) { + entries.push({ id: node.id, name: node.name }); + await kv.set(KV.graphNameShards, shard, entries); + } + }); + await linkObservationsToNode(kv, node.id, node.sourceObservationIds); +} + +export async function linkObservationsToNode( + kv: StateKV, + nodeId: string, + obsIds: string[] | undefined, +): Promise { + for (const obsId of obsIds ?? []) { + await withKeyedLock(`gidx:obs:${obsId}`, async () => { + const nodeIds = (await kv.get(KV.graphObsNodes, obsId)) ?? []; + if (!nodeIds.includes(nodeId)) { + nodeIds.push(nodeId); + await kv.set(KV.graphObsNodes, obsId, nodeIds); + } + }); + } +} + +export async function indexGraphEdge( + kv: StateKV, + edge: GraphEdge, +): Promise { + if (!edge?.id || !edge.sourceNodeId || !edge.targetNodeId) return; + const endpoints = + edge.sourceNodeId === edge.targetNodeId + ? [edge.sourceNodeId] + : [edge.sourceNodeId, edge.targetNodeId]; + for (const nodeId of endpoints) { + await withKeyedLock(`gidx:adj:${nodeId}`, async () => { + const edgeIds = (await kv.get(KV.graphAdjacency, nodeId)) ?? []; + if (!edgeIds.includes(edge.id)) { + edgeIds.push(edge.id); + await kv.set(KV.graphAdjacency, nodeId, edgeIds); + } + }); + } +} + +export async function loadNameCatalog( + kv: StateKV, +): Promise { + const shards = await Promise.all( + Array.from({ length: NAME_SHARD_COUNT }, (_, shard) => + kv + .get(KV.graphNameShards, String(shard)) + .catch(() => null), + ), + ); + const catalog: NameCatalogEntry[] = []; + for (const entries of shards) { + if (Array.isArray(entries)) catalog.push(...entries); + } + return catalog; +} + +export async function loadAdjacentEdgeIds( + kv: StateKV, + nodeId: string, +): Promise { + const edgeIds = await kv + .get(KV.graphAdjacency, nodeId) + .catch(() => null); + return Array.isArray(edgeIds) ? edgeIds : []; +} + +export async function loadNodeIdsForObservations( + kv: StateKV, + obsIds: string[], +): Promise { + const ids = new Set(); + for (const obsId of obsIds) { + const nodeIds = await kv + .get(KV.graphObsNodes, obsId) + .catch(() => null); + if (Array.isArray(nodeIds)) { + for (const id of nodeIds) ids.add(id); + } + } + return [...ids]; +} + +export async function readGraphResetAt( + kv: StateKV, +): Promise { + try { + const snap = await kv.get<{ resetAt?: string }>( + KV.graphSnapshot, + SNAPSHOT_KEY, + ); + return snap?.resetAt; + } catch { + return undefined; + } +} + +export function isLiveGraphRecord( + record: { stale?: boolean; createdAt?: string } | null | undefined, + resetAt: string | undefined, +): boolean { + if (!record || record.stale) return false; + if ( + resetAt && + typeof record.createdAt === "string" && + record.createdAt < resetAt + ) { + return false; + } + return true; +} + +export class GraphIndexReader { + private nodeCache = new Map(); + private edgeCache = new Map(); + + private constructor( + private kv: StateKV, + private resetAt: string | undefined, + ) {} + + static async open(kv: StateKV): Promise { + return new GraphIndexReader(kv, await readGraphResetAt(kv)); + } + + async getNode(nodeId: string): Promise { + const cached = this.nodeCache.get(nodeId); + if (cached !== undefined) return cached; + const raw = await this.kv + .get(KV.graphNodes, nodeId) + .catch(() => null); + const node = isLiveGraphRecord(raw, this.resetAt) ? raw : null; + this.nodeCache.set(nodeId, node); + return node; + } + + async getEdge(edgeId: string): Promise { + const cached = this.edgeCache.get(edgeId); + if (cached !== undefined) return cached; + const raw = await this.kv + .get(KV.graphEdges, edgeId) + .catch(() => null); + const edge = isLiveGraphRecord(raw, this.resetAt) ? raw : null; + this.edgeCache.set(edgeId, edge); + return edge; + } + + async getIncidentEdges(nodeId: string): Promise { + const edgeIds = await loadAdjacentEdgeIds(this.kv, nodeId); + const edges: GraphEdge[] = []; + for (const edgeId of edgeIds) { + const edge = await this.getEdge(edgeId); + if (edge) edges.push(edge); + } + return edges; + } + + async getNeighbors( + nodeId: string, + ): Promise> { + const neighbors: Array<{ node: GraphNode; edge: GraphEdge }> = []; + for (const edge of await this.getIncidentEdges(nodeId)) { + const neighborId = + edge.sourceNodeId === nodeId ? edge.targetNodeId : edge.sourceNodeId; + const node = await this.getNode(neighborId); + if (node) neighbors.push({ node, edge }); + } + return neighbors; + } +} + +export async function backfillGraphIndexes( + kv: StateKV, + nodes: GraphNode[], + edges: GraphEdge[], +): Promise { + const shards = new Map(); + const obsNodes = new Map(); + for (const node of nodes) { + if (!node?.id || typeof node.name !== "string") continue; + const shard = nameShardKey(node.id); + const entries = shards.get(shard) ?? []; + entries.push({ id: node.id, name: node.name }); + shards.set(shard, entries); + for (const obsId of node.sourceObservationIds ?? []) { + const linked = obsNodes.get(obsId) ?? []; + if (!linked.includes(node.id)) { + linked.push(node.id); + obsNodes.set(obsId, linked); + } + } + } + + const adjacency = new Map(); + const appendEdge = (nodeId: string, edgeId: string): void => { + const list = adjacency.get(nodeId) ?? []; + if (!list.includes(edgeId)) { + list.push(edgeId); + adjacency.set(nodeId, list); + } + }; + for (const edge of edges) { + if (!edge?.id || !edge.sourceNodeId || !edge.targetNodeId) continue; + appendEdge(edge.sourceNodeId, edge.id); + if (edge.targetNodeId !== edge.sourceNodeId) { + appendEdge(edge.targetNodeId, edge.id); + } + } + + for (let shard = 0; shard < NAME_SHARD_COUNT; shard++) { + const key = String(shard); + await kv.set(KV.graphNameShards, key, shards.get(key) ?? []); + } + + const BATCH_SIZE = 100; + const adjacencyEntries = [...adjacency.entries()]; + for (let i = 0; i < adjacencyEntries.length; i += BATCH_SIZE) { + await Promise.all( + adjacencyEntries + .slice(i, i + BATCH_SIZE) + .map(([nodeId, edgeIds]) => kv.set(KV.graphAdjacency, nodeId, edgeIds)), + ); + } + const obsEntries = [...obsNodes.entries()]; + for (let i = 0; i < obsEntries.length; i += BATCH_SIZE) { + await Promise.all( + obsEntries + .slice(i, i + BATCH_SIZE) + .map(([obsId, nodeIds]) => kv.set(KV.graphObsNodes, obsId, nodeIds)), + ); + } + + await markGraphIndexesReady(kv); +} diff --git a/src/state/schema.ts b/src/state/schema.ts index cb29d41ad..895384641 100644 --- a/src/state/schema.ts +++ b/src/state/schema.ts @@ -37,6 +37,23 @@ export const KV = { graphNameIndex: "mem:graph:name-index", graphEdgeKey: "mem:graph:edge-key", graphNodeDegree: "mem:graph:node-degree", + // Read-path side-indexes so graph retrieval never enumerates the full + // nodes/edges scopes. Maintained as hints on every write site; + // readers verify each hit against the live record (stale flag + + // snapshot resetAt) so deletes/wipes need no index cleanup. + // - graphNameShards: key `hash(nodeId) % 64` -> Array<{id, name}>. + // The full name catalog is readable as 64 bounded gets, preserving + // substring-match semantics without a kv.list. + // - graphAdjacency: key nodeId -> incident edgeId[]. Bounds traversal + // cost by degree x depth instead of total edge count. + // - graphObsNodes: key obsId -> nodeId[] linking observations to the + // graph nodes extracted from them. + // - graphIndexMeta: readiness marker. Absent = indexes not built; + // readers fall back to full enumeration. + graphNameShards: "mem:graph:name-shards", + graphAdjacency: "mem:graph:adjacency", + graphObsNodes: "mem:graph:obs-nodes", + graphIndexMeta: "mem:graph:index-meta", semantic: "mem:semantic", procedural: "mem:procedural", teamShared: (teamId: string) => `mem:team:${teamId}:shared`, diff --git a/test/graph-index-parity.test.ts b/test/graph-index-parity.test.ts new file mode 100644 index 000000000..b4c07cace --- /dev/null +++ b/test/graph-index-parity.test.ts @@ -0,0 +1,371 @@ +import { describe, it, expect, vi } from "vitest"; + +vi.mock("../src/logger.js", () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +import { GraphRetrieval } from "../src/functions/graph-retrieval.js"; +import { registerGraphFunction } from "../src/functions/graph.js"; +import { + backfillGraphIndexes, + graphIndexesReady, + indexGraphEdge, + indexGraphNode, + loadNameCatalog, +} from "../src/state/graph-indexes.js"; +import type { + GraphNode, + GraphEdge, + GraphQueryResult, +} from "../src/types.js"; + +function mockKV(nodes: GraphNode[] = [], edges: GraphEdge[] = []) { + const store = new Map>(); + const nodesMap = new Map(); + for (const n of nodes) nodesMap.set(n.id, n); + store.set("mem:graph:nodes", nodesMap); + + const edgesMap = new Map(); + for (const e of edges) edgesMap.set(e.id, e); + store.set("mem:graph:edges", edgesMap); + + let listCalls = 0; + + return { + get: async (scope: string, key: string): Promise => { + return (store.get(scope)?.get(key) as T) ?? null; + }, + set: async (scope: string, key: string, data: T): Promise => { + if (!store.has(scope)) store.set(scope, new Map()); + store.get(scope)!.set(key, data); + return data; + }, + delete: async (scope: string, key: string): Promise => { + store.get(scope)?.delete(key); + }, + list: async (scope: string): Promise => { + listCalls += 1; + const entries = store.get(scope); + return entries ? (Array.from(entries.values()) as T[]) : []; + }, + listCallCount: () => listCalls, + }; +} + +function mockSdk() { + const functions = new Map(); + return { + registerFunction: ( + idOrOpts: string | { id: string }, + handler: Function, + ) => { + const id = typeof idOrOpts === "string" ? idOrOpts : idOrOpts.id; + functions.set(id, handler); + }, + registerTrigger: () => {}, + trigger: async ( + idOrInput: string | { function_id: string; payload: unknown }, + data?: unknown, + ) => { + const id = + typeof idOrInput === "string" ? idOrInput : idOrInput.function_id; + const payload = typeof idOrInput === "string" ? data : idOrInput.payload; + const fn = functions.get(id); + if (!fn) throw new Error(`No function: ${id}`); + return fn(payload); + }, + }; +} + +const mockProvider = { + name: "test", + compress: vi.fn().mockResolvedValue(""), + summarize: vi.fn(), +}; + +function makeNode( + id: string, + name: string, + type: GraphNode["type"] = "concept", + obsIds: string[] = ["obs_1"], + properties: Record = {}, +): GraphNode { + return { + id, + type, + name, + properties, + sourceObservationIds: obsIds, + createdAt: "2026-01-01T00:00:00.000Z", + }; +} + +function makeEdge( + id: string, + sourceNodeId: string, + targetNodeId: string, + type: GraphEdge["type"] = "related_to", + weight = 0.8, +): GraphEdge { + return { + id, + type, + sourceNodeId, + targetNodeId, + weight, + sourceObservationIds: ["obs_1"], + createdAt: "2026-01-01T00:00:00.000Z", + tcommit: "2026-01-01T00:00:00.000Z", + isLatest: true, + }; +} + +function fixtureGraph(): { nodes: GraphNode[]; edges: GraphEdge[] } { + const nodes = [ + makeNode("n1", "React", "library", ["obs_react"]), + makeNode("n2", "Hook", "concept", ["obs_hook"]), + makeNode("n3", "State", "concept", ["obs_state"]), + makeNode("n4", "auth-middleware", "function", ["obs_auth"]), + makeNode("n5", "Lonely", "concept", ["obs_lonely"]), + ]; + const edges = [ + makeEdge("e1", "n1", "n2", "uses", 0.9), + makeEdge("e2", "n2", "n3", "related_to", 0.8), + makeEdge("e3", "n1", "n3", "related_to", 0.15), + makeEdge("e4", "n4", "n1", "uses", 0.7), + ]; + return { nodes, edges }; +} + +async function indexedKV(nodes: GraphNode[], edges: GraphEdge[]) { + const kv = mockKV(nodes, edges); + await backfillGraphIndexes(kv as never, nodes, edges); + return kv; +} + +function sortResults(results: T[]): T[] { + return [...results].sort((a, b) => a.obsId.localeCompare(b.obsId)); +} + +describe("graph index parity", () => { + it("searchByEntities returns identical results via indexes and enumeration", async () => { + const { nodes, edges } = fixtureGraph(); + const plain = new GraphRetrieval(mockKV(nodes, edges) as never); + const indexed = new GraphRetrieval((await indexedKV(nodes, edges)) as never); + + for (const query of [["React"], ["auth"], ["react", "state"], ["nope"]]) { + const viaList = sortResults(await plain.searchByEntities(query, 3, 20)); + const viaIndex = sortResults(await indexed.searchByEntities(query, 3, 20)); + expect(viaIndex).toEqual(viaList); + } + }); + + it("searchByEntities parity holds for incrementally indexed writes", async () => { + const { nodes, edges } = fixtureGraph(); + const plain = new GraphRetrieval(mockKV(nodes, edges) as never); + + const kv = mockKV(nodes, edges); + for (const node of nodes) await indexGraphNode(kv as never, node); + for (const edge of edges) await indexGraphEdge(kv as never, edge); + await kv.set("mem:graph:index-meta", "current", { version: 1 }); + const indexed = new GraphRetrieval(kv as never); + + const viaList = sortResults(await plain.searchByEntities(["React"], 2)); + const viaIndex = sortResults(await indexed.searchByEntities(["React"], 2)); + expect(viaIndex).toEqual(viaList); + }); + + it("expandFromChunks returns identical results via indexes and enumeration", async () => { + const { nodes, edges } = fixtureGraph(); + const plain = new GraphRetrieval(mockKV(nodes, edges) as never); + const indexed = new GraphRetrieval((await indexedKV(nodes, edges)) as never); + + for (const obsIds of [["obs_react"], ["obs_hook", "obs_auth"], ["obs_x"]]) { + const viaList = sortResults(await plain.expandFromChunks(obsIds, 2, 20)); + const viaIndex = sortResults(await indexed.expandFromChunks(obsIds, 2, 20)); + expect(viaIndex).toEqual(viaList); + } + }); + + it("temporalQuery returns identical results via indexes and enumeration", async () => { + const nodes = [makeNode("n1", "Alice", "person", ["obs_1"])]; + const edges = [ + makeEdge("e1", "n1", "n1", "located_in" as never, 0.9), + { + ...makeEdge("e2", "n1", "n1", "located_in" as never, 0.9), + tcommit: "2026-02-01T00:00:00.000Z", + tvalid: "2026-02-01", + isLatest: true, + }, + ]; + const plain = new GraphRetrieval(mockKV(nodes, edges) as never); + const indexed = new GraphRetrieval((await indexedKV(nodes, edges)) as never); + + for (const asOf of [undefined, "2026-01-15T00:00:00.000Z", "2026-03-01T00:00:00.000Z"]) { + const viaList = await plain.temporalQuery("Alice", asOf); + const viaIndex = await indexed.temporalQuery("Alice", asOf); + expect(viaIndex.entity?.id).toBe(viaList.entity?.id); + expect(sortResults(mapEdges(viaIndex.currentState))).toEqual( + sortResults(mapEdges(viaList.currentState)), + ); + expect(sortResults(mapEdges(viaIndex.history))).toEqual( + sortResults(mapEdges(viaList.history)), + ); + } + + const missingViaIndex = await indexed.temporalQuery("Nobody"); + expect(missingViaIndex.entity).toBeNull(); + + function mapEdges(list: GraphEdge[]) { + return list.map((e) => ({ obsId: e.id })); + } + }); + + it("graph-query startNodeId traversal returns identical pages via indexes and enumeration", async () => { + const { nodes, edges } = fixtureGraph(); + + const plainKv = mockKV(nodes, edges); + const plainSdk = mockSdk(); + registerGraphFunction(plainSdk as never, plainKv as never, mockProvider as never); + + const idxKv = await indexedKV(nodes, edges); + const idxSdk = mockSdk(); + registerGraphFunction(idxSdk as never, idxKv as never, mockProvider as never); + + const viaList = (await plainSdk.trigger("mem::graph-query", { + startNodeId: "n1", + maxDepth: 2, + })) as GraphQueryResult; + const viaIndex = (await idxSdk.trigger("mem::graph-query", { + startNodeId: "n1", + maxDepth: 2, + })) as GraphQueryResult; + + expect(viaIndex.nodes.map((n) => n.id).sort()).toEqual( + viaList.nodes.map((n) => n.id).sort(), + ); + expect(viaIndex.edges.map((e) => e.id).sort()).toEqual( + viaList.edges.map((e) => e.id).sort(), + ); + expect(viaIndex.totalNodes).toBe(viaList.totalNodes); + expect(viaIndex.totalEdges).toBe(viaList.totalEdges); + expect(viaIndex.truncated).toBe(viaList.truncated); + }); + + it("graph-query query branch returns identical matches when the snapshot covers the corpus", async () => { + const nodes = [ + makeNode("n1", "payments-service", "project", ["obs_1"], { lang: "rust" }), + makeNode("n2", "billing", "concept", ["obs_2"], { note: "uses payments" }), + makeNode("n3", "frontend", "project", ["obs_3"]), + ]; + const edges = [makeEdge("e1", "n1", "n2", "related_to", 0.9)]; + + const plainKv = mockKV(nodes, edges); + const plainSdk = mockSdk(); + registerGraphFunction(plainSdk as never, plainKv as never, mockProvider as never); + + const idxKv = mockKV(nodes, edges); + const idxSdk = mockSdk(); + registerGraphFunction(idxSdk as never, idxKv as never, mockProvider as never); + await idxSdk.trigger("mem::graph-snapshot-rebuild", { force: true }); + expect(await graphIndexesReady(idxKv as never)).toBe(true); + + const viaList = (await plainSdk.trigger("mem::graph-query", { + query: "payments", + })) as GraphQueryResult; + const viaIndex = (await idxSdk.trigger("mem::graph-query", { + query: "payments", + })) as GraphQueryResult; + + expect(viaIndex.nodes.map((n) => n.id).sort()).toEqual( + viaList.nodes.map((n) => n.id).sort(), + ); + expect(viaIndex.nodes.map((n) => n.id).sort()).toEqual(["n1", "n2"]); + expect(viaIndex.edges.map((e) => e.id).sort()).toEqual( + viaList.edges.map((e) => e.id).sort(), + ); + expect(viaIndex.totalNodes).toBe(viaList.totalNodes); + expect(viaIndex.totalEdges).toBe(viaList.totalEdges); + expect(viaIndex.warning).toBeUndefined(); + }); + + it("falls back to enumeration when the readiness marker is absent", async () => { + const { nodes, edges } = fixtureGraph(); + const kv = mockKV(nodes, edges); + const retrieval = new GraphRetrieval(kv as never); + + const results = await retrieval.searchByEntities(["React"]); + expect(results.length).toBeGreaterThan(0); + expect(kv.listCallCount()).toBeGreaterThan(0); + }); + + it("never enumerates when the readiness marker is present", async () => { + const { nodes, edges } = fixtureGraph(); + const kv = await indexedKV(nodes, edges); + const before = kv.listCallCount(); + const retrieval = new GraphRetrieval(kv as never); + + await retrieval.searchByEntities(["React"]); + await retrieval.expandFromChunks(["obs_react"]); + await retrieval.temporalQuery("React"); + expect(kv.listCallCount()).toBe(before); + }); + + it("graph-extract maintains the side-indexes after a rebuild", async () => { + mockProvider.compress.mockResolvedValueOnce(` + + + + + +`); + + const kv = mockKV(); + const sdk = mockSdk(); + registerGraphFunction(sdk as never, kv as never, mockProvider as never); + await sdk.trigger("mem::graph-snapshot-rebuild", { force: true }); + + await sdk.trigger("mem::graph-extract", { + observations: [ + { + id: "obs_new", + sessionId: "ses_1", + timestamp: "2026-02-01T10:00:00Z", + type: "file_edit", + title: "Edit index file", + facts: [], + narrative: "Updated index.ts", + concepts: [], + files: ["src/index.ts"], + importance: 7, + }, + ], + }); + + const catalog = await loadNameCatalog(kv as never); + expect(catalog.map((c) => c.name).sort()).toEqual(["main", "src/index.ts"]); + + const before = kv.listCallCount(); + const retrieval = new GraphRetrieval(kv as never); + const results = await retrieval.searchByEntities(["index"]); + expect(results.some((r) => r.obsId === "obs_new")).toBe(true); + expect(kv.listCallCount()).toBe(before); + }); + + it("graph-reset stops indexed retrieval from serving pre-reset rows", async () => { + const { nodes, edges } = fixtureGraph(); + const kv = await indexedKV(nodes, edges); + const sdk = mockSdk(); + registerGraphFunction(sdk as never, kv as never, mockProvider as never); + + const retrieval = new GraphRetrieval(kv as never); + expect((await retrieval.searchByEntities(["React"])).length).toBeGreaterThan(0); + + await sdk.trigger("mem::graph-reset", {}); + + expect(await retrieval.searchByEntities(["React"])).toEqual([]); + expect(await retrieval.expandFromChunks(["obs_react"])).toEqual([]); + expect((await retrieval.temporalQuery("React")).entity).toBeNull(); + expect(kv.listCallCount()).toBe(0); + }); +});