Skip to content

Implement background batch writes with in-memory capacity management#1062

Closed
DZakh wants to merge 41 commits into
mainfrom
claude/reuse-memory-store-background-writes-FPPgO
Closed

Implement background batch writes with in-memory capacity management#1062
DZakh wants to merge 41 commits into
mainfrom
claude/reuse-memory-store-background-writes-FPPgO

Conversation

@DZakh

@DZakh DZakh commented Mar 30, 2026

Copy link
Copy Markdown
Member

Summary

This PR refactors the batch writing mechanism to execute asynchronously in the background while the indexer continues processing events. It introduces in-memory capacity management to prevent unbounded memory growth and improves the overall throughput of the indexing pipeline.

Key Changes

  • Background Write Queue: Introduced writePromise and pendingWrite fields to Persistence.t to queue and execute batch writes asynchronously. Writes are executed sequentially with automatic retry if a new write is queued during an in-progress write.

  • Write Data Extraction: Moved data extraction logic from writeBatch into InMemoryStore.prepareForNextBatch, which prepares write arguments and queues the background write before the next batch is processed.

  • In-Memory Capacity Management: Added totalChangeCount tracking to InMemoryStore and implemented cleanupAfterWrite to evict stale entities and indices after writes complete. When capacity exceeds 50% of the target, the indexer flushes pending writes to free memory.

  • Rollback Diff Refactoring: Changed prepareRollbackDiff to return a structured rollbackDiff type instead of an in-memory store, and introduced applyRollbackDiff to apply rollback changes to the existing in-memory store without creating a new one.

  • Index Lifecycle Tracking: Enhanced InMemoryTable.Entity.indexWithRelatedIds to track lastReferencedCheckpointId, enabling cleanup of stale indices that are no longer referenced after a write checkpoint.

  • Entity Cleanup Logic: Implemented sophisticated cleanup in InMemoryTable.Entity.cleanupAfterWrite that:

    • Removes stale indices not referenced after the written checkpoint
    • Evicts loaded entities (cache entries)
    • Transitions updated entities to loaded status if they've been written and have no active indices
    • Preserves entities with newer changes than what was written
  • Metrics Separation: Decoupled database write duration metrics from batch processing metrics, as writes now occur asynchronously in the background.

  • Error Handling: Added error checking in the main loop to detect and handle background write failures without blocking event processing.

  • Checkpoint Tracking: Added writtenCheckpointId to track the last successfully written checkpoint, enabling proper cleanup and capacity management decisions.

Notable Implementation Details

  • The executeWrite function recursively processes pending writes, ensuring sequential execution and automatic retry if writes are queued during processing.
  • flushWrites provides a way to await all pending and in-progress writes, used before rollbacks and graceful shutdown.
  • The cleanup strategy preserves valid cached entities while evicting stale ones, balancing memory usage with query performance.
  • Rollback now applies changes to the existing in-memory store rather than replacing it, maintaining consistency with ongoing event processing.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx

Summary by CodeRabbit

  • New Features

    • Managed in-memory store added with configurable target size via a new environment setting.
  • Performance Improvements

    • Background write queuing and checkpoint-aware cleanup to reduce memory pressure and improve throughput.
    • Checkpoint-aware in-memory indexing for faster cached lookups.
    • Split processing metrics for clearer loader/handler/db timing.
  • Behavior / Reliability

    • Streamlined rollback handling and stronger persistence coordination during exits and reorgs.

claude and others added 15 commits March 19, 2026 15:06
- Add lastReferencedCheckpointId and entityCount tracking to InMemoryTable.Entity
- Add cleanupAfterWrite to InMemoryTable.Entity and InMemoryStore to reset
  written entities to Loaded status and evict stale Loaded entities
- Create BackgroundWriter module with auto-chaining writes, force/await API
- Move DB write out of EventProcessing.processEventBatch into GlobalState orchestration
- Reuse in-memory store across batches instead of creating fresh each time
- Queue background writes after non-rollback batches; force synchronous writes for rollbacks
- Wait for background writes when in-memory store exceeds ENVIO_MAX_IN_MEMORY_ENTITIES
- Await background writes before rollback execution and process exit
- Reset in-memory store after rollback batches since diff store invalidates cache

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
changeCount increments on every set() call (every mutation), not just
when a new entity key is added. initValue (loading from DB) no longer
increments the counter. cleanupAfterWrite resets to count of remaining
unwritten changes.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
- Merge BackgroundWriter fields (writePromise, pendingWrite,
  writtenCheckpointId) into Persistence.t and move all write
  functions there. Delete BackgroundWriter module.
- Swap dispatch order: UpdateQueues before StartProcessingBatch
  to prevent double processing.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…add capacity management

- Add InMemoryStore.entitySet to centralize all entity write operations
- Remove diffInMemoryStore from rollback flow: store raw rollback data
  in rollbackState and apply to main inMemoryStore at processing time
- Add Persistence.writeAndManageCapacity: prune written entities when
  >50% capacity used, await write completion if still over threshold
- Remove unused isRollingBack, simplify InMemoryStore.make

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
This is a soft threshold, not a hard limit — "target" better conveys
that the store can temporarily exceed this value during background flushes.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
… reset

- Inline loader/handler prom metrics in EventProcessing, register db write
  metrics separately in Persistence where duration is measured
- Move inMemoryStore from GlobalState.t to Ctx.t
- Replace InMemoryStore recreation on rollback with reset + applyRollbackDiff
- Add InMemoryStore.reset to clear cached data before rollback
- Store totalChangeCount as float on InMemoryStore.t, increment on entitySet
- Remove InMemoryTable.Entity.getChangeCount, access field directly
- Add comment clarifying lastReferencedCheckpointId is for index-based gets

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Track lastReferencedCheckpointId per index entry so cleanupAfterWrite
can prune stale indices first, then evict entities with no active index.
Also throw on empty batch checkpoint IDs instead of silently returning 0n.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…ackground writes

- Move dispatches (UpdateQueues, StartProcessingBatch) before any await to
  prevent triggering processing twice
- Move rollbackDiff types and applyRollbackDiff from Persistence to InMemoryStore
  where it belongs (mutates in-memory state, not persistence concern)
- Remove full InMemoryStore.reset on rollback — only apply the diff since
  entities not in the diff are unchanged and remain valid in cache
- Remove rollback forceWrite special case — checkpoint IDs always increment
  so background writeAndManageCapacity works correctly for rollbacks too

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Replace direct writePromise field access with Persistence.isWriting
and Persistence.awaitCurrentWrite in the NoExit handler.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…tracking

- Move writeAndManageCapacity logic from Persistence into InMemoryStore.prepareForNextBatch
  so Persistence no longer depends on InMemoryStore
- Add Persistence.flushWrites to ensure all pending writes complete
- Force flush writes before rollback instead of only awaiting if currently writing
- Remove lastReferencedCheckpointId from entityWithIndices (derive from status instead)
- Rename entityDiffs to entityChanges

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
- Use Promise.thenResolve/done for background write error check so
  updateChainMetadataTable runs without blocking
- Move applyRollbackDiff from ProcessEventBatch to SetRollbackState
  reducer; remove rollbackDiff from RollbackReady state variant
- Remove InMemoryStore.t from Persistence.writeArgs; extract entity,
  raw event, and effect cache data in GlobalState before write
- Use onWriteComplete callback for post-write cleanup instead of
  Persistence calling InMemoryStore.cleanupAfterWrite directly
- Move rollbackDiff/rollbackEntityDiff types from InMemoryStore to
  Persistence to fully remove the Persistence→InMemoryStore dependency

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…ForNextBatch

Consolidates write data extraction (entities, effects, raw events),
writeArgs construction, and Persistence.startWrite into prepareForNextBatch,
simplifying the GlobalState ProcessEventBatch callsite.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
- Swap dispatch order: StartProcessingBatch before UpdateQueues
- Add comment explaining rollback flush relies on prepareForNextBatch
  having already queued the write
- Remove dead clone chain (InMemoryStore.clone, EntityTables.clone,
  InMemoryTable.clone, InMemoryTable.Entity.clone)
- Remove unused Persistence.forceWrite

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
@coderabbitai

coderabbitai Bot commented Mar 30, 2026

Copy link
Copy Markdown
Contributor

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a live in-memory store into the runtime context and refactors in-memory table/store behavior to track change counts, apply rollback diffs, and queue background persistence writes. Threads checkpointId through load/index lookups and splits batch metrics so DB write timing is recorded separately.

Changes

Cohort / File(s) Summary
Context & Config
packages/envio/src/Ctx.res, packages/envio/src/Main.res, packages/envio/src/Env.res
Added inMemoryStore: InMemoryStore.t to Ctx.t, initialize via InMemoryStore.make(...) in start; added targetInMemoryStoreSize env config.
In-memory store & tables
packages/envio/src/InMemoryStore.res, packages/envio/src/InMemoryTable.res
Reworked InMemoryStore to add rollbackTargetCheckpointId (mutable) and totalChangeCount; removed clone API; added entitySet, prepareForNextBatch, cleanupAfterWrite, applyRollbackDiff; InMemoryTable index entries now track lastReferencedCheckpointId and per-entity changeCount; added cleanup logic.
Persistence & write queueing
packages/envio/src/Persistence.res
Introduced write queue state (pendingWrite, writePromise, writtenCheckpointId) and helpers (startWrite, awaitCurrentWrite, flushWrites, executeWrite); changed writeBatch to take precomputed updatedEntities/rawEvents/effectCacheWriteData/rollbackTarget and return structured rollback diffs.
Global state & rollback flow
packages/envio/src/GlobalState.res
Removed cloning-based rollback snapshots; derive/apply rollbackDiff to live ctx.inMemoryStore; ensure pending writes are flushed before rollback; await inMemoryStore.prepareForNextBatch after processing before dispatching batch-processed actions.
Event processing & metrics
packages/envio/src/EventProcessing.res, packages/envio/src/Prometheus.res
Removed previous DB write timing from batch processing; compute loader and handler durations only; Prometheus API split into setLoaderAndHandlerDurations and setDbWriteDuration.
Load layer & call sites
packages/envio/src/LoadLayer.res, packages/envio/src/LoadLayer.resi, scenarios/test_codegen/test/LoadLayer_test.res
loadByField now accepts ~checkpointId: bigint; call sites and tests updated to pass checkpointId to unsafe in-memory index lookups.
User context & tests/mocks
packages/envio/src/UserContext.res, scenarios/test_codegen/test/helpers/Mock.res
Load calls now forward checkpointId; entity writes/deletes use InMemoryStore.entitySet(...); test/mock ctx now includes inMemoryStore; mock waiters flush persistence after processed batches.

Sequence Diagram(s)

sequenceDiagram
    participant Client as EventBatchHandler
    participant EP as EventProcessing
    participant GS as GlobalState
    participant IMS as InMemoryStore
    participant P as Persistence
    participant S as Storage

    Client->>EP: processEventBatch(batch)
    EP->>EP: compute loaderDuration, handlerDuration
    EP->>GS: dispatch EventBatchProcessed
    GS->>IMS: prepareForNextBatch(batch, config, isInReorgThreshold)
    IMS->>P: startWrite(updatedEntities, rawEvents, effectCache, rollbackTarget)
    P->>S: storage.writeBatch(...)
    S-->>P: writtenCheckpointId
    P->>IMS: cleanupAfterWrite(writtenCheckpointId)
    IMS->>IMS: reset totalChangeCount, cleanup indices
    P-->>GS: write complete (async)
Loading
sequenceDiagram
    participant GS as GlobalState
    participant P as Persistence
    participant S as Storage
    participant IMS as InMemoryStore

    GS->>P: flushWrites()
    P->>S: storage.writeBatch(...) (if queued)
    S-->>P: writtenCheckpointId
    P-->>GS: flushed
    GS->>P: prepareRollbackDiff(fromCheckpoint)
    P->>S: storage.getRollbackDiff(...)
    S-->>P: rollbackDiff(entityChanges)
    P-->>GS: rollbackDiff
    GS->>IMS: applyRollbackDiff(rollbackDiff)
    IMS->>IMS: apply per-entity changes via entitySet, set rollbackTargetCheckpointId
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • JonoPrest

"🐰
I hopped through tables, changes tracked each beat,
Queued writes hummed softly, asynchronous and neat.
Rollbacks unfolded, diffs applied with care,
Checkpoints and indices dancing in the air.
Carrots for reviewers, code warmed by rabbit flair."

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and concisely summarizes the main change: implementing background batch writes with in-memory capacity management, which is the core focus of this comprehensive refactoring.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/envio/src/GlobalState.res`:
- Around line 1042-1046: When Persistence.flushWrites returns Error(errHandler)
we currently dispatchAction(ErrorExit(errHandler)) but then continue into the
rollback preparation; change the control flow in the flushWrites match (the
switch handling state.ctx.persistence->Persistence.flushWrites) so that after
dispatchAction(ErrorExit(errHandler)) you immediately abort the surrounding
operation (e.g., return, early-exit, or unit-to-unit short-circuit) instead of
falling through to the rollback-preparation logic; ensure the Ok(_) branch
remains unchanged and that no rollback queries are prepared or executed when
flushWrites failed.
- Around line 878-888: In the ExitWithSuccess branch replace the current
isWriting check + discarding awaitCurrentWrite result with a call to
state.ctx.persistence->Persistence.flushWrites and act on its Result before
dispatching SuccessExit; specifically, invoke Persistence.flushWrites (which
also waits for queued writes), handle the Ok/Err result returned (e.g.
surface/log/propagate the Err instead of ignoring it), then continue to call
updateChainMetadataTable and only dispatchAction(SuccessExit) after flushWrites
returns successfully; update references: ExitWithSuccess branch,
state.ctx.persistence->Persistence.awaitCurrentWrite -> Persistence.flushWrites,
and dispatchAction(SuccessExit).

In `@packages/envio/src/InMemoryStore.res`:
- Around line 178-186: Pre-write cleanup is using
persistence.writtenCheckpointId which can clear the just-queued batch (rawEvents
/ idsToStore) before the pending write runs; change the pre-write cleanup to use
the pending write's checkpoint id (the pendingWrite/writtenCheckpointId kept by
Persistence.startWrite) so cleanupAfterWrite references the checkpoint for the
write that actually persisted, not the last written checkpoint, and ensure you
do not clear rawEvents/idsToStore for the currently queued batch; also replace
the Error(errHandler) => errHandler->ErrorHandling.log branch with logic that
preserves or retries the pending write (or at minimum logs the error together
with the pending write id) instead of silently dropping the data (update calls
around inMemoryStore->cleanupAfterWrite and persistence->Persistence.flushWrites
handling accordingly).
- Around line 163-177: writeArgs captures rollbackTargetCheckpointId from
inMemoryStore but you must clear inMemoryStore.rollbackTargetCheckpointId
immediately after attaching it so subsequent queued batches don't reuse it;
after you build writeArgs (which preserves the value) set
inMemoryStore.rollbackTargetCheckpointId to None (or equivalent) before calling
Persistence.startWrite(~writeArgs) so the rollback target is consumed and not
inherited by later writes.

In `@packages/envio/src/LoadLayer.resi`:
- Line 22: The test calls to the loadByField function in LoadLayer_test.res are
missing the newly required named parameter ~checkpointId: bigint; update each
invocation of loadByField (and any wrapper calls to LoadLayer.loadByField) in
the test file to include a bigint checkpoint id (e.g., ~checkpointId=0n or pass
an existing checkpointId variable) so the call signatures match the updated
LoadLayer.resi definition.

In `@packages/envio/src/Persistence.res`:
- Around line 291-307: The effect-cache count is being updated during batching
(in effectCacheWriteData mapping) and can double-count IDs queued across
concurrent writes and survive failed storage.writeBatch; move the mutation of
effectCacheRecord.count, the computation of shouldInitialize, and the
Prometheus.EffectCacheCount.set call out of effectCacheWriteData and into the
post-commit cleanup path (e.g., inside cleanupAfterWrite or the success branch
after storage.writeBatch) so counts are only adjusted for actually committed IDs
(use the committed idsToStore for that batch and subtract invalidationsCount for
those same items), compute shouldInitialize from the prior stored count before
increment, and ensure idsToStore is cleared only after this successful update to
keep bookkeeping consistent with the DB.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4af27ce5-d136-4732-ad2e-a067ebf18288

📥 Commits

Reviewing files that changed from the base of the PR and between 13eb564 and a1ff163.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (13)
  • packages/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs
  • packages/envio/src/Ctx.res
  • packages/envio/src/Env.res
  • packages/envio/src/EventProcessing.res
  • packages/envio/src/GlobalState.res
  • packages/envio/src/InMemoryStore.res
  • packages/envio/src/InMemoryTable.res
  • packages/envio/src/LoadLayer.res
  • packages/envio/src/LoadLayer.resi
  • packages/envio/src/Main.res
  • packages/envio/src/Persistence.res
  • packages/envio/src/Prometheus.res
  • packages/envio/src/UserContext.res

Comment thread packages/envio/src/GlobalState.res Outdated
Comment thread packages/envio/src/GlobalState.res Outdated
Comment thread packages/envio/src/InMemoryStore.res
Comment thread packages/envio/src/InMemoryStore.res
Comment thread packages/envio/src/LoadLayer.resi
Comment thread packages/envio/src/Persistence.res Outdated
Comment on lines +291 to +307
effectCacheWriteData->Belt.Array.mapU(({effect, items, invalidationsCount}) => {
let effectName = effect.name
let effectCacheRecord = switch cache->Utils.Dict.dangerouslyGetNonOption(
effectName,
) {
| Some(c) => c
| None => {
let c = {effectName, count: 0}
cache->Js.Dict.set(effectName, c)
c
}
}
let shouldInitialize = effectCacheRecord.count === 0
effectCacheRecord.count =
effectCacheRecord.count + items->Js.Array2.length - invalidationsCount
Prometheus.EffectCacheCount.set(~count=effectCacheRecord.count, ~effectName)
{effect, items, shouldInitialize}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Effect-cache count bookkeeping is not tied to committed writes.

idsToStore is only cleared after cleanupAfterWrite, so a payload queued while another write is running can still contain IDs from earlier batches. Applying count = count + items.length - invalidationsCount here double-counts those rows when the later write succeeds, and the mutation also survives a failed storage.writeBatch. That makes shouldInitialize and Prometheus.EffectCacheCount drift away from the DB’s actual state.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/envio/src/Persistence.res` around lines 291 - 307, The effect-cache
count is being updated during batching (in effectCacheWriteData mapping) and can
double-count IDs queued across concurrent writes and survive failed
storage.writeBatch; move the mutation of effectCacheRecord.count, the
computation of shouldInitialize, and the Prometheus.EffectCacheCount.set call
out of effectCacheWriteData and into the post-commit cleanup path (e.g., inside
cleanupAfterWrite or the success branch after storage.writeBatch) so counts are
only adjusted for actually committed IDs (use the committed idsToStore for that
batch and subtract invalidationsCount for those same items), compute
shouldInitialize from the prior stored count before increment, and ensure
idsToStore is cleared only after this successful update to keep bookkeeping
consistent with the DB.

The clone chain (InMemoryTable.clone, Entity.clone, EntityTables.clone,
InMemoryStore.clone) was incorrectly removed as dead code — it's used
by the generated TestHelpers_MockDb template.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/envio/src/InMemoryTable.res`:
- Around line 443-457: The code inside the Updated(update) branch resets history
to only [update.latestChange], dropping intermediate unwritten entries; instead,
compute a filteredHistory by taking row.history (and include update.latestChange
if not already present) and keep all entries whose checkpoint id is greater than
writtenCheckpointId, then set status: Updated({ ...update, history:
filteredHistory, containsRollbackDiffChange: recomputedFlag }) when calling
inMemTable.table.dict->Js.Dict.set; reference Updated(update), row.history,
update.latestChange, writtenCheckpointId and containsRollbackDiffChange and
ensure containsRollbackDiffChange is recomputed from the preserved entries
rather than hardcoding false.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 39f462a4-c39b-4e26-9bf6-06bc922e44f2

📥 Commits

Reviewing files that changed from the base of the PR and between a1ff163 and a09de78.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (2)
  • packages/envio/src/InMemoryStore.res
  • packages/envio/src/InMemoryTable.res

Comment thread packages/envio/src/InMemoryTable.res Outdated
Comment on lines +443 to +457
| Updated(update) =>
remainingChangeCount := remainingChangeCount.contents +. 1.
// Clear already-written history but keep the update status
// since it has changes newer than what was written
inMemTable.table.dict->Js.Dict.set(
key,
{
...row,
status: Updated({
...update,
history: [update.latestChange],
containsRollbackDiffChange: false,
}),
},
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential loss of intermediate history entries.

When an entity has latestChange beyond writtenCheckpointId, the history is reset to only [update.latestChange]. This discards any intermediate changes between writtenCheckpointId and latestChange that haven't been written yet.

Example: if history = [change@3, change@7, change@9] and writtenCheckpointId = 5, the unwritten changes [change@7, change@9] should be preserved, but this code only keeps [change@9].

Consider filtering history to retain all entries with checkpoint > writtenCheckpointId:

🛠️ Proposed fix
         | Updated(update) =>
           remainingChangeCount := remainingChangeCount.contents +. 1.
-          // Clear already-written history but keep the update status
-          // since it has changes newer than what was written
+          // Filter history to keep only unwritten changes
+          let filteredHistory = update.history->Array.keep(change =>
+            change->Change.getCheckpointId > writtenCheckpointId
+          )
           inMemTable.table.dict->Js.Dict.set(
             key,
             {
               ...row,
               status: Updated({
                 ...update,
-                history: [update.latestChange],
+                history: filteredHistory->Array.length > 0
+                  ? filteredHistory
+                  : [update.latestChange],
                 containsRollbackDiffChange: false,
               }),
             },
           )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
| Updated(update) =>
remainingChangeCount := remainingChangeCount.contents +. 1.
// Clear already-written history but keep the update status
// since it has changes newer than what was written
inMemTable.table.dict->Js.Dict.set(
key,
{
...row,
status: Updated({
...update,
history: [update.latestChange],
containsRollbackDiffChange: false,
}),
},
)
| Updated(update) =>
remainingChangeCount := remainingChangeCount.contents +. 1.
// Filter history to keep only unwritten changes
let filteredHistory = update.history->Array.keep(change =>
change->Change.getCheckpointId > writtenCheckpointId
)
inMemTable.table.dict->Js.Dict.set(
key,
{
...row,
status: Updated({
...update,
history: filteredHistory->Array.length > 0
? filteredHistory
: [update.latestChange],
containsRollbackDiffChange: false,
}),
},
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/envio/src/InMemoryTable.res` around lines 443 - 457, The code inside
the Updated(update) branch resets history to only [update.latestChange],
dropping intermediate unwritten entries; instead, compute a filteredHistory by
taking row.history (and include update.latestChange if not already present) and
keep all entries whose checkpoint id is greater than writtenCheckpointId, then
set status: Updated({ ...update, history: filteredHistory,
containsRollbackDiffChange: recomputedFlag }) when calling
inMemTable.table.dict->Js.Dict.set; reference Updated(update), row.history,
update.latestChange, writtenCheckpointId and containsRollbackDiffChange and
ensure containsRollbackDiffChange is recomputed from the preserved entries
rather than hardcoding false.

MockDb was removed on main, so the clone chain
(InMemoryStore.clone, EntityTables.clone, InMemoryTable.clone,
Entity.clone) is now truly dead code and can be removed.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
packages/envio/src/InMemoryStore.res (2)

172-175: ⚠️ Potential issue | 🟠 Major

Verify rollbackTargetCheckpointId is consumed on the successful write path.

Line 196 stores the rollback target on inMemoryStore, and Line 172 forwards whatever is currently there into every writeArgs. I don't see a reset in this file. Because Persistence.startWrite only keeps the latest pendingWrite snapshot (packages/envio/src/Persistence.res:406-411), that target likely has to survive until one rollback-bearing write actually commits — but it also needs to be removed immediately after that, otherwise later batches will keep replaying it.

#!/bin/bash
set -euo pipefail

printf '\n== rollbackTargetCheckpointId usages ==\n'
rg -n -C2 --type-add 'res:*.res' --type res '\brollbackTargetCheckpointId\b'

printf '\n== persistence write lifecycle ==\n'
rg -n -C4 --type-add 'res:*.res' --type res '\b(startWrite|executeWrite|flushWrites|onWriteComplete)\b'

Expected result: a reset or pending-write rewrite tied to the write-success path. If the search only finds the assignment in applyRollbackDiff and the forwarding in prepareForNextBatch, the rollback target will leak into later writes.

Also applies to: 196-196

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/envio/src/InMemoryStore.res` around lines 172 - 175, The
rollbackTargetCheckpointId set by applyRollbackDiff is being forwarded via
prepareForNextBatch into every writeArgs and never cleared; update the
successful-write path so the target is consumed and not leaked into subsequent
batches: modify the onWriteComplete handler (or InMemoryStore.cleanupAfterWrite)
to detect when writtenCheckpointId matches
inMemoryStore.rollbackTargetCheckpointId and then clear/reset
inMemoryStore.rollbackTargetCheckpointId (or rewrite the pendingWrite to remove
the target) so Persistence.startWrite / executeWrite no longer forwards it;
reference rollbackTargetCheckpointId, onWriteComplete,
InMemoryStore.cleanupAfterWrite, Persistence.startWrite, applyRollbackDiff, and
prepareForNextBatch to locate the relevant spots.

106-123: ⚠️ Potential issue | 🔴 Critical

Don't wipe shared raw-event/effect buffers in cleanupAfterWrite.

rawEvents, idsToStore, and invalidationsCount are global buffers for all not-yet-durable batches. Clearing them here is unsafe because prepareForNextBatch snapshots them into writeArgs (packages/envio/src/Persistence.res:147-160), while Persistence.startWrite only retains a single pendingWrite (packages/envio/src/Persistence.res:406-411). If batch B is pending, this cleanup runs, and batch C overwrites pendingWrite, batch B's raw-event/effect writes are gone even though entity updates can still be rebuilt from the tables.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/envio/src/InMemoryStore.res` around lines 106 - 123,
cleanupAfterWrite currently wipes global buffers (inMemoryStore.rawEvents.dict,
each table.idsToStore and table.invalidationsCount) which are shared across
batches; instead, stop clearing these global buffers here and only clear
per-write snapshots that belong to the completed write (i.e., the data captured
in writeArgs/pendingWrite). Concretely, remove the code that deletes keys from
inMemoryStore.rawEvents.dict and the code that zeroes
table.idsToStore/table.invalidationsCount in cleanupAfterWrite, and ensure any
necessary clearing is done when the owning snapshot is created or explicitly
released (see prepareForNextBatch which snapshots rawEvents/effects into
writeArgs and Persistence.startWrite which manages pendingWrite) so that other
pending batches’ buffers are not lost.
packages/envio/src/InMemoryTable.res (1)

438-450: ⚠️ Potential issue | 🟠 Major

Preserve all unwritten history entries through cleanup.

This branch still collapses every surviving update to [update.latestChange]. That drops intermediate unwritten entries, and it also invents a history row when shouldSaveHistory=false left update.history empty. The next write can miss history records after cleanup.

💡 Suggested fix
-        | Updated(update) =>
-          remainingChangeCount := remainingChangeCount.contents +. 1.
+        | Updated(update) =>
+          let remainingHistory =
+            update.history->Array.keep(change =>
+              change->Change.getCheckpointId > writtenCheckpointId
+            )
+          remainingChangeCount := remainingChangeCount.contents +. (
+            remainingHistory->Array.length === 0
+              ? 1.
+              : (remainingHistory->Array.length :> float)
+          )
           // Clear already-written history but keep the update status
           // since it has changes newer than what was written
           inMemTable.table.dict->Js.Dict.set(
             key,
             {
               ...row,
               status: Updated({
                 ...update,
-                history: [update.latestChange],
-                containsRollbackDiffChange: false,
+                history: remainingHistory,
+                containsRollbackDiffChange: update.containsRollbackDiffChange,
               }),
             },
           )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/envio/src/InMemoryTable.res` around lines 438 - 450, The code in the
Updated(update) branch collapses surviving update history to
[update.latestChange], dropping unwritten intermediate entries and fabricating a
history entry when update.history is empty; change the Js.Dict.set call so the
preserved status keeps the original update.history (e.g., history:
update.history) instead of replacing it with [update.latestChange], and keep
containsRollbackDiffChange reset as intended; update the Updated({...})
construction referenced in the inMemTable.table.dict->Js.Dict.set call and
ensure remainingChangeCount handling remains unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/envio/src/InMemoryTable.res`:
- Around line 423-427: The current switch on row.status evicts Loaded rows
unconditionally which can remove indices that remain retained and cause
hasIndex/getUnsafeOnIndex inconsistencies; update the Loaded branch in the
switch that uses row.status so it only calls
inMemTable->deleteEntityFromIndices(~entityId=key,
~entityIndices=row.entityIndices) and
inMemTable.table.dict->Utils.Dict.deleteInPlace(key) when the row has no
retained indices left (e.g., check row.entityIndices or a
retained-count/hasRetainedIndex predicate); ensure the condition aligns with how
LoadLayer.call uses hasIndex and getUnsafeOnIndex so you only delete Loaded
entries after all retained indices are pruned.

---

Duplicate comments:
In `@packages/envio/src/InMemoryStore.res`:
- Around line 172-175: The rollbackTargetCheckpointId set by applyRollbackDiff
is being forwarded via prepareForNextBatch into every writeArgs and never
cleared; update the successful-write path so the target is consumed and not
leaked into subsequent batches: modify the onWriteComplete handler (or
InMemoryStore.cleanupAfterWrite) to detect when writtenCheckpointId matches
inMemoryStore.rollbackTargetCheckpointId and then clear/reset
inMemoryStore.rollbackTargetCheckpointId (or rewrite the pendingWrite to remove
the target) so Persistence.startWrite / executeWrite no longer forwards it;
reference rollbackTargetCheckpointId, onWriteComplete,
InMemoryStore.cleanupAfterWrite, Persistence.startWrite, applyRollbackDiff, and
prepareForNextBatch to locate the relevant spots.
- Around line 106-123: cleanupAfterWrite currently wipes global buffers
(inMemoryStore.rawEvents.dict, each table.idsToStore and
table.invalidationsCount) which are shared across batches; instead, stop
clearing these global buffers here and only clear per-write snapshots that
belong to the completed write (i.e., the data captured in
writeArgs/pendingWrite). Concretely, remove the code that deletes keys from
inMemoryStore.rawEvents.dict and the code that zeroes
table.idsToStore/table.invalidationsCount in cleanupAfterWrite, and ensure any
necessary clearing is done when the owning snapshot is created or explicitly
released (see prepareForNextBatch which snapshots rawEvents/effects into
writeArgs and Persistence.startWrite which manages pendingWrite) so that other
pending batches’ buffers are not lost.

In `@packages/envio/src/InMemoryTable.res`:
- Around line 438-450: The code in the Updated(update) branch collapses
surviving update history to [update.latestChange], dropping unwritten
intermediate entries and fabricating a history entry when update.history is
empty; change the Js.Dict.set call so the preserved status keeps the original
update.history (e.g., history: update.history) instead of replacing it with
[update.latestChange], and keep containsRollbackDiffChange reset as intended;
update the Updated({...}) construction referenced in the
inMemTable.table.dict->Js.Dict.set call and ensure remainingChangeCount handling
remains unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 462dc386-20ee-4c0f-a203-503f7d9de958

📥 Commits

Reviewing files that changed from the base of the PR and between a09de78 and ba1ddbf.

📒 Files selected for processing (3)
  • packages/envio/src/InMemoryStore.res
  • packages/envio/src/InMemoryTable.res
  • packages/envio/src/Main.res
✅ Files skipped from review due to trivial changes (1)
  • packages/envio/src/Main.res

Comment thread packages/envio/src/InMemoryTable.res Outdated
Comment on lines +423 to +427
switch row.status {
| Loaded =>
// Loaded entities are just cache — evict from memory
inMemTable->deleteEntityFromIndices(~entityId=key, ~entityIndices=row.entityIndices)
inMemTable.table.dict->Utils.Dict.deleteInPlace(key)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only evict Loaded rows after they lose all retained indices.

Phase 1 already prunes stale indices. Deleting every Loaded row here also strips entity IDs from indices that survived, so hasIndex can stay true while getUnsafeOnIndex returns an empty result. packages/envio/src/LoadLayer.res:341-445 wires those two functions into LoadManager.call, so this can turn a retained cache entry into a false hit.

💡 Suggested fix
-        | Loaded =>
-          // Loaded entities are just cache — evict from memory
-          inMemTable->deleteEntityFromIndices(~entityId=key, ~entityIndices=row.entityIndices)
-          inMemTable.table.dict->Utils.Dict.deleteInPlace(key)
+        | Loaded =>
+          if row.entityIndices->Utils.Set.size === 0 {
+            // Loaded entity is no longer reachable from any retained index
+            inMemTable.table.dict->Utils.Dict.deleteInPlace(key)
+          }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
switch row.status {
| Loaded =>
// Loaded entities are just cache — evict from memory
inMemTable->deleteEntityFromIndices(~entityId=key, ~entityIndices=row.entityIndices)
inMemTable.table.dict->Utils.Dict.deleteInPlace(key)
switch row.status {
| Loaded =>
if row.entityIndices->Utils.Set.size === 0 {
// Loaded entity is no longer reachable from any retained index
inMemTable.table.dict->Utils.Dict.deleteInPlace(key)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/envio/src/InMemoryTable.res` around lines 423 - 427, The current
switch on row.status evicts Loaded rows unconditionally which can remove indices
that remain retained and cause hasIndex/getUnsafeOnIndex inconsistencies; update
the Loaded branch in the switch that uses row.status so it only calls
inMemTable->deleteEntityFromIndices(~entityId=key,
~entityIndices=row.entityIndices) and
inMemTable.table.dict->Utils.Dict.deleteInPlace(key) when the row has no
retained indices left (e.g., check row.entityIndices or a
retained-count/hasRetainedIndex predicate); ensure the condition aligns with how
LoadLayer.call uses hasIndex and getUnsafeOnIndex so you only delete Loaded
entries after all retained indices are pruned.

claude and others added 3 commits March 31, 2026 10:39
…ield

- Add inMemoryStore field to Ctx.t construction in Mock.res
- Add ~checkpointId=0n to LoadLayer.loadByField calls in LoadLayer_test.res

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
With background writes, getBatchWritePromise must also await
Persistence.flushWrites to ensure DB state is consistent before
test assertions query the database.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
packages/envio/src/GlobalState.res (2)

1039-1045: ⚠️ Potential issue | 🔴 Critical

Rollback continues after flush failure - control flow must abort.

After dispatchAction(ErrorExit(errHandler)) on the Error branch, execution falls through to the rollback preparation logic (lines 1047+). This means rollback queries will run against a potentially inconsistent DB state, and SetRollbackState will mutate the in-memory store despite the pending exit.

🔧 Proposed fix - add early return after ErrorExit
       switch await state.ctx.persistence->Persistence.flushWrites {
-      | Error(errHandler) => dispatchAction(ErrorExit(errHandler))
+      | Error(errHandler) =>
+        dispatchAction(ErrorExit(errHandler))
+        // Abort rollback - don't proceed with rollback queries on inconsistent state
       | Ok(_) => ()
       }
+      // Only continue if flushWrites succeeded
+      if state.ctx.persistence->Persistence.hasError {
+        return
+      }

A cleaner approach would be restructuring with an early return or wrapping the rest in the Ok(_) branch:

       switch await state.ctx.persistence->Persistence.flushWrites {
       | Error(errHandler) =>
         dispatchAction(ErrorExit(errHandler))
-      | Ok(_) => ()
+      | Ok(_) =>
+        let startTime = Hrtime.makeTimer()
+        // ... rest of rollback logic indented here
       }
-
-      let startTime = Hrtime.makeTimer()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/envio/src/GlobalState.res` around lines 1039 - 1045, The Error
branch after awaiting state.ctx.persistence->Persistence.flushWrites dispatches
ErrorExit(errHandler) but then falls through and continues rollback logic;
change control flow so execution aborts immediately after
dispatchAction(ErrorExit(errHandler))—either add an early return/exit right
after that dispatch or restructure the switch so the rollback preparation and
SetRollbackState code live only inside the Ok(_) branch (referencing
Persistence.flushWrites, dispatchAction, ErrorExit and SetRollbackState to
locate the affected block).

878-888: ⚠️ Potential issue | 🔴 Critical

ExitWithSuccess should use flushWrites and handle its result.

The current implementation discards the write result (let _ = await) and uses awaitCurrentWrite which may miss pending writes that haven't started yet. If the background write fails, the process exits with success despite data not being persisted.

🔧 Proposed fix
     | ExitWithSuccess =>
-      // Await any background writes before exiting
-      if state.ctx.persistence->Persistence.isWriting {
-        let _ = await state.ctx.persistence->Persistence.awaitCurrentWrite
-      }
+      // Flush all pending writes and check result before exiting
+      switch await state.ctx.persistence->Persistence.flushWrites {
+      | Error(errHandler) =>
+        dispatchAction(ErrorExit(errHandler))
+        return
+      | Ok(_) => ()
+      }
       updateChainMetadataTable(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/envio/src/GlobalState.res` around lines 878 - 888, Replace the
current use of Persistence.awaitCurrentWrite and discarding its result with a
call to Persistence.flushWrites on state.ctx.persistence from within the
ExitWithSuccess branch; await the flushWrites result, remove the "let _ = await"
pattern and check the returned result so that on success you proceed to call
updateChainMetadataTable and dispatchAction(SuccessExit), and on failure you
handle the error (e.g., log and dispatchAction(FailureExit) or equivalent);
update references: ExitWithSuccess, state.ctx.persistence,
Persistence.awaitCurrentWrite -> Persistence.flushWrites,
updateChainMetadataTable, dispatchAction(SuccessExit).
🧹 Nitpick comments (1)
packages/envio/src/GlobalState.res (1)

1090-1093: Consider defensive handling for DB value parsing.

Float.fromString(diff["events_processed_diff"])->Option.getExn will throw if the DB returns an unparseable value. While unlikely in normal operation, corrupted rollback progress data would crash the indexer during recovery.

💡 Suggested defensive approach
               let eventsProcessedDiff =
-                Float.fromString(diff["events_processed_diff"])->Option.getExn
+                Float.fromString(diff["events_processed_diff"])->Option.getWithDefault(0.)

Or log a warning and use a default to allow rollback to proceed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/envio/src/GlobalState.res` around lines 1090 - 1093, The code
currently uses Float.fromString(diff["events_processed_diff"])->Option.getExn
which will crash on unparseable DB values; change this to safely handle the
Option by pattern-matching or using Option.getWithDefault (or similar) to supply
a fallback (e.g., 0.0) and log a warning when parsing fails, then use the safe
value to update rollbackedProcessedEvents.contents and return it; specifically
update the parsing site around eventsProcessedDiff and replace the direct getExn
with a safe-branch that logs the issue and uses a default to allow rollback to
proceed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@packages/envio/src/GlobalState.res`:
- Around line 1039-1045: The Error branch after awaiting
state.ctx.persistence->Persistence.flushWrites dispatches ErrorExit(errHandler)
but then falls through and continues rollback logic; change control flow so
execution aborts immediately after dispatchAction(ErrorExit(errHandler))—either
add an early return/exit right after that dispatch or restructure the switch so
the rollback preparation and SetRollbackState code live only inside the Ok(_)
branch (referencing Persistence.flushWrites, dispatchAction, ErrorExit and
SetRollbackState to locate the affected block).
- Around line 878-888: Replace the current use of Persistence.awaitCurrentWrite
and discarding its result with a call to Persistence.flushWrites on
state.ctx.persistence from within the ExitWithSuccess branch; await the
flushWrites result, remove the "let _ = await" pattern and check the returned
result so that on success you proceed to call updateChainMetadataTable and
dispatchAction(SuccessExit), and on failure you handle the error (e.g., log and
dispatchAction(FailureExit) or equivalent); update references: ExitWithSuccess,
state.ctx.persistence, Persistence.awaitCurrentWrite -> Persistence.flushWrites,
updateChainMetadataTable, dispatchAction(SuccessExit).

---

Nitpick comments:
In `@packages/envio/src/GlobalState.res`:
- Around line 1090-1093: The code currently uses
Float.fromString(diff["events_processed_diff"])->Option.getExn which will crash
on unparseable DB values; change this to safely handle the Option by
pattern-matching or using Option.getWithDefault (or similar) to supply a
fallback (e.g., 0.0) and log a warning when parsing fails, then use the safe
value to update rollbackedProcessedEvents.contents and return it; specifically
update the parsing site around eventsProcessedDiff and replace the direct getExn
with a safe-branch that logs the issue and uses a default to allow rollback to
proceed.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 07eafdd5-289a-4084-a611-e5a9ed975dea

📥 Commits

Reviewing files that changed from the base of the PR and between 317592d and 1f9fa6b.

📒 Files selected for processing (2)
  • packages/envio/src/GlobalState.res
  • scenarios/test_codegen/test/helpers/Mock.res

claude added 4 commits March 31, 2026 11:21
- Move rollback logic into Ok branch of flushWrites so it doesn't
  execute on write failure
- Use flushWrites instead of awaitCurrentWrite for ExitWithSuccess
  and handle errors properly

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Persistence.executeWrite now throws on failure instead of returning
Error. awaitCurrentWrite and flushWrites propagate exceptions naturally.
Callers use try/catch or let exceptions bubble to the top-level handler.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…eckpointId

prepareForNextBatch now awaits flushWrites so the DB write completes
before EventBatchProcessed fires. This ensures processedBatches only
increments after data is persisted, matching the synchronous write
semantics tests rely on.

Also reset rollbackTargetCheckpointId in cleanupAfterWrite since the
in-memory store is reused across batches.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
claude added 11 commits March 31, 2026 12:23
Revert prepareForNextBatch to background writes with capacity management.
In getBatchWritePromise, await the current writePromise directly (single
await, minimal event loop interference) to ensure DB state is consistent
for test assertions.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Flush background writes in query/queryHistory/queryRaw/queryCheckpoints/
queryEffectCache instead of getBatchWritePromise. This ensures DB
consistency for assertions without interfering with event loop timing
during batch processing (which caused the DC test timeout).

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Only await flushWrites in query methods when a write is actually in
progress. When no write is pending, skip the await entirely to avoid
yielding to the event loop (which interferes with DC test timing).

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
With background writes, empty batch processing timing changes slightly.
Wait for pending getItemsOrThrow calls to appear (indicating queries
have been dispatched) instead of using fixed Utils.delay(0) calls.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
… batches

Replace while loop (which could hang if no queries dispatched) with
getBatchWritePromise() which waits for processedBatches to increment
from the empty batch processing after rollback.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Resolve both empty and real items before waiting for batch processing.
The intermediate DC count assertion (checking rollback hasn't happened
yet) is no longer reliable with background writes since the empty batch
write may complete asynchronously before the check. The final assertion
still validates the correct DC state.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Wait for 2 pending getItemsOrThrow calls (both partitions queried)
before resolving real items. Use Utils.delay(1) for macrotask yields
to let all background write microtasks complete between checks.

Remove intermediate DC count assertion that was incompatible with
background writes (rollback SQL executes during the first batch write,
which may complete asynchronously before the check).

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
The system may not dispatch queries for both partitions simultaneously.
Wait for at least 1 pending getItemsOrThrow call before resolving.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
After rollback, 2 queries are pending (normal + DC partition).
Resolve first with empty items, resolve second with real items.
Both are resolved synchronously before any processing starts,
ensuring items are available before queries get consumed by
background write microtasks.

Wait for 2 batch writes (empty batch + real items batch).

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Revert Mock.res and Rollback_test.res to main, then re-apply only
the minimal changes needed for our API:
- Add inMemoryStore field to Ctx.t in Mock.res
- Add ~checkpointId=0n to LoadLayer.loadByField calls in LoadLayer_test.res

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Resolve merge conflicts keeping our PR logic (background writes,
prepareForNextBatch, exception-based errors, rollback changes) while
accepting main's API modernization (Dict/Array/Promise APIs).

Fix deprecation warnings: Js.Array2.removeCountInPlace → Array.splice,
Promise.done → Promise.ignore.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
- Accept main's reduced polling and isRealtime in fetchNext
- Accept main's config compat check on resume
- Accept main's .gen.ts removal
- Keep our branch's background write logic, split Prometheus metrics,
  structured rollbackDiff, and decoupled Persistence

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…plicate metrics

- getBatchWritePromise now polls for BOTH processedBatches increment
  AND writtenCheckpointId advancement, ensuring DB is up to date
  before test assertions
- Remove duplicate envio_storage_write_seconds/total counters from
  ProcessingBatch (conflicted with main's StorageWrite module that
  has a 'storage' label). Use StorageWrite.increment instead.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Loaded entities may still be referenced by pending LoadManager calls.
Evicting them causes 'Cannot read properties of undefined' when the
LoadManager resolves and calls getUnsafeInMemory on a missing key.

Keep Loaded entities in memory — they're just cache and don't count
toward totalChangeCount. Only evict Updated entities that have been
written to DB.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…tion

Loaded entities were being evicted by cleanupAfterWrite while still
referenced by pending LoadManager calls, causing undefined access.

Re-add lastReferencedCheckpointId to entityWithIndices. It's stamped
when entities are loaded (initValue) or mutated (set). cleanupAfterWrite
only evicts entities whose lastReferencedCheckpointId <= writtenCheckpointId,
ensuring entities referenced by the current batch are preserved.

Add ~checkpointId param to loadById and thread it through to initValue.

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
- InMemoryTable: accept main's simplified updateIndices (getUnsafe
  instead of switch), use our record destructuring for indexWithRelatedIds
- UserContext: accept main's ClickHouse read-only guards, add
  ~checkpointId=params.checkpointId to all loadById calls

https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
@DZakh DZakh closed this Jun 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants