Implement background batch writes with in-memory capacity management#1062
Implement background batch writes with in-memory capacity management#1062DZakh wants to merge 41 commits into
Conversation
- Add lastReferencedCheckpointId and entityCount tracking to InMemoryTable.Entity - Add cleanupAfterWrite to InMemoryTable.Entity and InMemoryStore to reset written entities to Loaded status and evict stale Loaded entities - Create BackgroundWriter module with auto-chaining writes, force/await API - Move DB write out of EventProcessing.processEventBatch into GlobalState orchestration - Reuse in-memory store across batches instead of creating fresh each time - Queue background writes after non-rollback batches; force synchronous writes for rollbacks - Wait for background writes when in-memory store exceeds ENVIO_MAX_IN_MEMORY_ENTITIES - Await background writes before rollback execution and process exit - Reset in-memory store after rollback batches since diff store invalidates cache https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
changeCount increments on every set() call (every mutation), not just when a new entity key is added. initValue (loading from DB) no longer increments the counter. cleanupAfterWrite resets to count of remaining unwritten changes. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
- Merge BackgroundWriter fields (writePromise, pendingWrite, writtenCheckpointId) into Persistence.t and move all write functions there. Delete BackgroundWriter module. - Swap dispatch order: UpdateQueues before StartProcessingBatch to prevent double processing. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…add capacity management - Add InMemoryStore.entitySet to centralize all entity write operations - Remove diffInMemoryStore from rollback flow: store raw rollback data in rollbackState and apply to main inMemoryStore at processing time - Add Persistence.writeAndManageCapacity: prune written entities when >50% capacity used, await write completion if still over threshold - Remove unused isRollingBack, simplify InMemoryStore.make https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
This is a soft threshold, not a hard limit — "target" better conveys that the store can temporarily exceed this value during background flushes. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
… reset - Inline loader/handler prom metrics in EventProcessing, register db write metrics separately in Persistence where duration is measured - Move inMemoryStore from GlobalState.t to Ctx.t - Replace InMemoryStore recreation on rollback with reset + applyRollbackDiff - Add InMemoryStore.reset to clear cached data before rollback - Store totalChangeCount as float on InMemoryStore.t, increment on entitySet - Remove InMemoryTable.Entity.getChangeCount, access field directly - Add comment clarifying lastReferencedCheckpointId is for index-based gets https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Track lastReferencedCheckpointId per index entry so cleanupAfterWrite can prune stale indices first, then evict entities with no active index. Also throw on empty batch checkpoint IDs instead of silently returning 0n. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…ackground writes - Move dispatches (UpdateQueues, StartProcessingBatch) before any await to prevent triggering processing twice - Move rollbackDiff types and applyRollbackDiff from Persistence to InMemoryStore where it belongs (mutates in-memory state, not persistence concern) - Remove full InMemoryStore.reset on rollback — only apply the diff since entities not in the diff are unchanged and remain valid in cache - Remove rollback forceWrite special case — checkpoint IDs always increment so background writeAndManageCapacity works correctly for rollbacks too https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Replace direct writePromise field access with Persistence.isWriting and Persistence.awaitCurrentWrite in the NoExit handler. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…tracking - Move writeAndManageCapacity logic from Persistence into InMemoryStore.prepareForNextBatch so Persistence no longer depends on InMemoryStore - Add Persistence.flushWrites to ensure all pending writes complete - Force flush writes before rollback instead of only awaiting if currently writing - Remove lastReferencedCheckpointId from entityWithIndices (derive from status instead) - Rename entityDiffs to entityChanges https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
- Use Promise.thenResolve/done for background write error check so updateChainMetadataTable runs without blocking - Move applyRollbackDiff from ProcessEventBatch to SetRollbackState reducer; remove rollbackDiff from RollbackReady state variant - Remove InMemoryStore.t from Persistence.writeArgs; extract entity, raw event, and effect cache data in GlobalState before write - Use onWriteComplete callback for post-write cleanup instead of Persistence calling InMemoryStore.cleanupAfterWrite directly - Move rollbackDiff/rollbackEntityDiff types from InMemoryStore to Persistence to fully remove the Persistence→InMemoryStore dependency https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…ForNextBatch Consolidates write data extraction (entities, effects, raw events), writeArgs construction, and Persistence.startWrite into prepareForNextBatch, simplifying the GlobalState ProcessEventBatch callsite. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
- Swap dispatch order: StartProcessingBatch before UpdateQueues - Add comment explaining rollback flush relies on prepareForNextBatch having already queued the write - Remove dead clone chain (InMemoryStore.clone, EntityTables.clone, InMemoryTable.clone, InMemoryTable.Entity.clone) - Remove unused Persistence.forceWrite https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a live in-memory store into the runtime context and refactors in-memory table/store behavior to track change counts, apply rollback diffs, and queue background persistence writes. Threads checkpointId through load/index lookups and splits batch metrics so DB write timing is recorded separately. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as EventBatchHandler
participant EP as EventProcessing
participant GS as GlobalState
participant IMS as InMemoryStore
participant P as Persistence
participant S as Storage
Client->>EP: processEventBatch(batch)
EP->>EP: compute loaderDuration, handlerDuration
EP->>GS: dispatch EventBatchProcessed
GS->>IMS: prepareForNextBatch(batch, config, isInReorgThreshold)
IMS->>P: startWrite(updatedEntities, rawEvents, effectCache, rollbackTarget)
P->>S: storage.writeBatch(...)
S-->>P: writtenCheckpointId
P->>IMS: cleanupAfterWrite(writtenCheckpointId)
IMS->>IMS: reset totalChangeCount, cleanup indices
P-->>GS: write complete (async)
sequenceDiagram
participant GS as GlobalState
participant P as Persistence
participant S as Storage
participant IMS as InMemoryStore
GS->>P: flushWrites()
P->>S: storage.writeBatch(...) (if queued)
S-->>P: writtenCheckpointId
P-->>GS: flushed
GS->>P: prepareRollbackDiff(fromCheckpoint)
P->>S: storage.getRollbackDiff(...)
S-->>P: rollbackDiff(entityChanges)
P-->>GS: rollbackDiff
GS->>IMS: applyRollbackDiff(rollbackDiff)
IMS->>IMS: apply per-entity changes via entitySet, set rollbackTargetCheckpointId
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/envio/src/GlobalState.res`:
- Around line 1042-1046: When Persistence.flushWrites returns Error(errHandler)
we currently dispatchAction(ErrorExit(errHandler)) but then continue into the
rollback preparation; change the control flow in the flushWrites match (the
switch handling state.ctx.persistence->Persistence.flushWrites) so that after
dispatchAction(ErrorExit(errHandler)) you immediately abort the surrounding
operation (e.g., return, early-exit, or unit-to-unit short-circuit) instead of
falling through to the rollback-preparation logic; ensure the Ok(_) branch
remains unchanged and that no rollback queries are prepared or executed when
flushWrites failed.
- Around line 878-888: In the ExitWithSuccess branch replace the current
isWriting check + discarding awaitCurrentWrite result with a call to
state.ctx.persistence->Persistence.flushWrites and act on its Result before
dispatching SuccessExit; specifically, invoke Persistence.flushWrites (which
also waits for queued writes), handle the Ok/Err result returned (e.g.
surface/log/propagate the Err instead of ignoring it), then continue to call
updateChainMetadataTable and only dispatchAction(SuccessExit) after flushWrites
returns successfully; update references: ExitWithSuccess branch,
state.ctx.persistence->Persistence.awaitCurrentWrite -> Persistence.flushWrites,
and dispatchAction(SuccessExit).
In `@packages/envio/src/InMemoryStore.res`:
- Around line 178-186: Pre-write cleanup is using
persistence.writtenCheckpointId which can clear the just-queued batch (rawEvents
/ idsToStore) before the pending write runs; change the pre-write cleanup to use
the pending write's checkpoint id (the pendingWrite/writtenCheckpointId kept by
Persistence.startWrite) so cleanupAfterWrite references the checkpoint for the
write that actually persisted, not the last written checkpoint, and ensure you
do not clear rawEvents/idsToStore for the currently queued batch; also replace
the Error(errHandler) => errHandler->ErrorHandling.log branch with logic that
preserves or retries the pending write (or at minimum logs the error together
with the pending write id) instead of silently dropping the data (update calls
around inMemoryStore->cleanupAfterWrite and persistence->Persistence.flushWrites
handling accordingly).
- Around line 163-177: writeArgs captures rollbackTargetCheckpointId from
inMemoryStore but you must clear inMemoryStore.rollbackTargetCheckpointId
immediately after attaching it so subsequent queued batches don't reuse it;
after you build writeArgs (which preserves the value) set
inMemoryStore.rollbackTargetCheckpointId to None (or equivalent) before calling
Persistence.startWrite(~writeArgs) so the rollback target is consumed and not
inherited by later writes.
In `@packages/envio/src/LoadLayer.resi`:
- Line 22: The test calls to the loadByField function in LoadLayer_test.res are
missing the newly required named parameter ~checkpointId: bigint; update each
invocation of loadByField (and any wrapper calls to LoadLayer.loadByField) in
the test file to include a bigint checkpoint id (e.g., ~checkpointId=0n or pass
an existing checkpointId variable) so the call signatures match the updated
LoadLayer.resi definition.
In `@packages/envio/src/Persistence.res`:
- Around line 291-307: The effect-cache count is being updated during batching
(in effectCacheWriteData mapping) and can double-count IDs queued across
concurrent writes and survive failed storage.writeBatch; move the mutation of
effectCacheRecord.count, the computation of shouldInitialize, and the
Prometheus.EffectCacheCount.set call out of effectCacheWriteData and into the
post-commit cleanup path (e.g., inside cleanupAfterWrite or the success branch
after storage.writeBatch) so counts are only adjusted for actually committed IDs
(use the committed idsToStore for that batch and subtract invalidationsCount for
those same items), compute shouldInitialize from the prior stored count before
increment, and ensure idsToStore is cleared only after this successful update to
keep bookkeeping consistent with the DB.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4af27ce5-d136-4732-ad2e-a067ebf18288
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (13)
packages/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbspackages/envio/src/Ctx.respackages/envio/src/Env.respackages/envio/src/EventProcessing.respackages/envio/src/GlobalState.respackages/envio/src/InMemoryStore.respackages/envio/src/InMemoryTable.respackages/envio/src/LoadLayer.respackages/envio/src/LoadLayer.resipackages/envio/src/Main.respackages/envio/src/Persistence.respackages/envio/src/Prometheus.respackages/envio/src/UserContext.res
| effectCacheWriteData->Belt.Array.mapU(({effect, items, invalidationsCount}) => { | ||
| let effectName = effect.name | ||
| let effectCacheRecord = switch cache->Utils.Dict.dangerouslyGetNonOption( | ||
| effectName, | ||
| ) { | ||
| | Some(c) => c | ||
| | None => { | ||
| let c = {effectName, count: 0} | ||
| cache->Js.Dict.set(effectName, c) | ||
| c | ||
| } | ||
| } | ||
| let shouldInitialize = effectCacheRecord.count === 0 | ||
| effectCacheRecord.count = | ||
| effectCacheRecord.count + items->Js.Array2.length - invalidationsCount | ||
| Prometheus.EffectCacheCount.set(~count=effectCacheRecord.count, ~effectName) | ||
| {effect, items, shouldInitialize} |
There was a problem hiding this comment.
Effect-cache count bookkeeping is not tied to committed writes.
idsToStore is only cleared after cleanupAfterWrite, so a payload queued while another write is running can still contain IDs from earlier batches. Applying count = count + items.length - invalidationsCount here double-counts those rows when the later write succeeds, and the mutation also survives a failed storage.writeBatch. That makes shouldInitialize and Prometheus.EffectCacheCount drift away from the DB’s actual state.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/envio/src/Persistence.res` around lines 291 - 307, The effect-cache
count is being updated during batching (in effectCacheWriteData mapping) and can
double-count IDs queued across concurrent writes and survive failed
storage.writeBatch; move the mutation of effectCacheRecord.count, the
computation of shouldInitialize, and the Prometheus.EffectCacheCount.set call
out of effectCacheWriteData and into the post-commit cleanup path (e.g., inside
cleanupAfterWrite or the success branch after storage.writeBatch) so counts are
only adjusted for actually committed IDs (use the committed idsToStore for that
batch and subtract invalidationsCount for those same items), compute
shouldInitialize from the prior stored count before increment, and ensure
idsToStore is cleared only after this successful update to keep bookkeeping
consistent with the DB.
The clone chain (InMemoryTable.clone, Entity.clone, EntityTables.clone, InMemoryStore.clone) was incorrectly removed as dead code — it's used by the generated TestHelpers_MockDb template. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/envio/src/InMemoryTable.res`:
- Around line 443-457: The code inside the Updated(update) branch resets history
to only [update.latestChange], dropping intermediate unwritten entries; instead,
compute a filteredHistory by taking row.history (and include update.latestChange
if not already present) and keep all entries whose checkpoint id is greater than
writtenCheckpointId, then set status: Updated({ ...update, history:
filteredHistory, containsRollbackDiffChange: recomputedFlag }) when calling
inMemTable.table.dict->Js.Dict.set; reference Updated(update), row.history,
update.latestChange, writtenCheckpointId and containsRollbackDiffChange and
ensure containsRollbackDiffChange is recomputed from the preserved entries
rather than hardcoding false.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 39f462a4-c39b-4e26-9bf6-06bc922e44f2
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (2)
packages/envio/src/InMemoryStore.respackages/envio/src/InMemoryTable.res
| | Updated(update) => | ||
| remainingChangeCount := remainingChangeCount.contents +. 1. | ||
| // Clear already-written history but keep the update status | ||
| // since it has changes newer than what was written | ||
| inMemTable.table.dict->Js.Dict.set( | ||
| key, | ||
| { | ||
| ...row, | ||
| status: Updated({ | ||
| ...update, | ||
| history: [update.latestChange], | ||
| containsRollbackDiffChange: false, | ||
| }), | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Potential loss of intermediate history entries.
When an entity has latestChange beyond writtenCheckpointId, the history is reset to only [update.latestChange]. This discards any intermediate changes between writtenCheckpointId and latestChange that haven't been written yet.
Example: if history = [change@3, change@7, change@9] and writtenCheckpointId = 5, the unwritten changes [change@7, change@9] should be preserved, but this code only keeps [change@9].
Consider filtering history to retain all entries with checkpoint > writtenCheckpointId:
🛠️ Proposed fix
| Updated(update) =>
remainingChangeCount := remainingChangeCount.contents +. 1.
- // Clear already-written history but keep the update status
- // since it has changes newer than what was written
+ // Filter history to keep only unwritten changes
+ let filteredHistory = update.history->Array.keep(change =>
+ change->Change.getCheckpointId > writtenCheckpointId
+ )
inMemTable.table.dict->Js.Dict.set(
key,
{
...row,
status: Updated({
...update,
- history: [update.latestChange],
+ history: filteredHistory->Array.length > 0
+ ? filteredHistory
+ : [update.latestChange],
containsRollbackDiffChange: false,
}),
},
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| | Updated(update) => | |
| remainingChangeCount := remainingChangeCount.contents +. 1. | |
| // Clear already-written history but keep the update status | |
| // since it has changes newer than what was written | |
| inMemTable.table.dict->Js.Dict.set( | |
| key, | |
| { | |
| ...row, | |
| status: Updated({ | |
| ...update, | |
| history: [update.latestChange], | |
| containsRollbackDiffChange: false, | |
| }), | |
| }, | |
| ) | |
| | Updated(update) => | |
| remainingChangeCount := remainingChangeCount.contents +. 1. | |
| // Filter history to keep only unwritten changes | |
| let filteredHistory = update.history->Array.keep(change => | |
| change->Change.getCheckpointId > writtenCheckpointId | |
| ) | |
| inMemTable.table.dict->Js.Dict.set( | |
| key, | |
| { | |
| ...row, | |
| status: Updated({ | |
| ...update, | |
| history: filteredHistory->Array.length > 0 | |
| ? filteredHistory | |
| : [update.latestChange], | |
| containsRollbackDiffChange: false, | |
| }), | |
| }, | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/envio/src/InMemoryTable.res` around lines 443 - 457, The code inside
the Updated(update) branch resets history to only [update.latestChange],
dropping intermediate unwritten entries; instead, compute a filteredHistory by
taking row.history (and include update.latestChange if not already present) and
keep all entries whose checkpoint id is greater than writtenCheckpointId, then
set status: Updated({ ...update, history: filteredHistory,
containsRollbackDiffChange: recomputedFlag }) when calling
inMemTable.table.dict->Js.Dict.set; reference Updated(update), row.history,
update.latestChange, writtenCheckpointId and containsRollbackDiffChange and
ensure containsRollbackDiffChange is recomputed from the preserved entries
rather than hardcoding false.
MockDb was removed on main, so the clone chain (InMemoryStore.clone, EntityTables.clone, InMemoryTable.clone, Entity.clone) is now truly dead code and can be removed. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
packages/envio/src/InMemoryStore.res (2)
172-175:⚠️ Potential issue | 🟠 MajorVerify
rollbackTargetCheckpointIdis consumed on the successful write path.Line 196 stores the rollback target on
inMemoryStore, and Line 172 forwards whatever is currently there into everywriteArgs. I don't see a reset in this file. BecausePersistence.startWriteonly keeps the latestpendingWritesnapshot (packages/envio/src/Persistence.res:406-411), that target likely has to survive until one rollback-bearing write actually commits — but it also needs to be removed immediately after that, otherwise later batches will keep replaying it.#!/bin/bash set -euo pipefail printf '\n== rollbackTargetCheckpointId usages ==\n' rg -n -C2 --type-add 'res:*.res' --type res '\brollbackTargetCheckpointId\b' printf '\n== persistence write lifecycle ==\n' rg -n -C4 --type-add 'res:*.res' --type res '\b(startWrite|executeWrite|flushWrites|onWriteComplete)\b'Expected result: a reset or pending-write rewrite tied to the write-success path. If the search only finds the assignment in
applyRollbackDiffand the forwarding inprepareForNextBatch, the rollback target will leak into later writes.Also applies to: 196-196
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/envio/src/InMemoryStore.res` around lines 172 - 175, The rollbackTargetCheckpointId set by applyRollbackDiff is being forwarded via prepareForNextBatch into every writeArgs and never cleared; update the successful-write path so the target is consumed and not leaked into subsequent batches: modify the onWriteComplete handler (or InMemoryStore.cleanupAfterWrite) to detect when writtenCheckpointId matches inMemoryStore.rollbackTargetCheckpointId and then clear/reset inMemoryStore.rollbackTargetCheckpointId (or rewrite the pendingWrite to remove the target) so Persistence.startWrite / executeWrite no longer forwards it; reference rollbackTargetCheckpointId, onWriteComplete, InMemoryStore.cleanupAfterWrite, Persistence.startWrite, applyRollbackDiff, and prepareForNextBatch to locate the relevant spots.
106-123:⚠️ Potential issue | 🔴 CriticalDon't wipe shared raw-event/effect buffers in
cleanupAfterWrite.
rawEvents,idsToStore, andinvalidationsCountare global buffers for all not-yet-durable batches. Clearing them here is unsafe becauseprepareForNextBatchsnapshots them intowriteArgs(packages/envio/src/Persistence.res:147-160), whilePersistence.startWriteonly retains a singlependingWrite(packages/envio/src/Persistence.res:406-411). If batch B is pending, this cleanup runs, and batch C overwritespendingWrite, batch B's raw-event/effect writes are gone even though entity updates can still be rebuilt from the tables.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/envio/src/InMemoryStore.res` around lines 106 - 123, cleanupAfterWrite currently wipes global buffers (inMemoryStore.rawEvents.dict, each table.idsToStore and table.invalidationsCount) which are shared across batches; instead, stop clearing these global buffers here and only clear per-write snapshots that belong to the completed write (i.e., the data captured in writeArgs/pendingWrite). Concretely, remove the code that deletes keys from inMemoryStore.rawEvents.dict and the code that zeroes table.idsToStore/table.invalidationsCount in cleanupAfterWrite, and ensure any necessary clearing is done when the owning snapshot is created or explicitly released (see prepareForNextBatch which snapshots rawEvents/effects into writeArgs and Persistence.startWrite which manages pendingWrite) so that other pending batches’ buffers are not lost.packages/envio/src/InMemoryTable.res (1)
438-450:⚠️ Potential issue | 🟠 MajorPreserve all unwritten history entries through cleanup.
This branch still collapses every surviving update to
[update.latestChange]. That drops intermediate unwritten entries, and it also invents a history row whenshouldSaveHistory=falseleftupdate.historyempty. The next write can miss history records after cleanup.💡 Suggested fix
- | Updated(update) => - remainingChangeCount := remainingChangeCount.contents +. 1. + | Updated(update) => + let remainingHistory = + update.history->Array.keep(change => + change->Change.getCheckpointId > writtenCheckpointId + ) + remainingChangeCount := remainingChangeCount.contents +. ( + remainingHistory->Array.length === 0 + ? 1. + : (remainingHistory->Array.length :> float) + ) // Clear already-written history but keep the update status // since it has changes newer than what was written inMemTable.table.dict->Js.Dict.set( key, { ...row, status: Updated({ ...update, - history: [update.latestChange], - containsRollbackDiffChange: false, + history: remainingHistory, + containsRollbackDiffChange: update.containsRollbackDiffChange, }), }, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/envio/src/InMemoryTable.res` around lines 438 - 450, The code in the Updated(update) branch collapses surviving update history to [update.latestChange], dropping unwritten intermediate entries and fabricating a history entry when update.history is empty; change the Js.Dict.set call so the preserved status keeps the original update.history (e.g., history: update.history) instead of replacing it with [update.latestChange], and keep containsRollbackDiffChange reset as intended; update the Updated({...}) construction referenced in the inMemTable.table.dict->Js.Dict.set call and ensure remainingChangeCount handling remains unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/envio/src/InMemoryTable.res`:
- Around line 423-427: The current switch on row.status evicts Loaded rows
unconditionally which can remove indices that remain retained and cause
hasIndex/getUnsafeOnIndex inconsistencies; update the Loaded branch in the
switch that uses row.status so it only calls
inMemTable->deleteEntityFromIndices(~entityId=key,
~entityIndices=row.entityIndices) and
inMemTable.table.dict->Utils.Dict.deleteInPlace(key) when the row has no
retained indices left (e.g., check row.entityIndices or a
retained-count/hasRetainedIndex predicate); ensure the condition aligns with how
LoadLayer.call uses hasIndex and getUnsafeOnIndex so you only delete Loaded
entries after all retained indices are pruned.
---
Duplicate comments:
In `@packages/envio/src/InMemoryStore.res`:
- Around line 172-175: The rollbackTargetCheckpointId set by applyRollbackDiff
is being forwarded via prepareForNextBatch into every writeArgs and never
cleared; update the successful-write path so the target is consumed and not
leaked into subsequent batches: modify the onWriteComplete handler (or
InMemoryStore.cleanupAfterWrite) to detect when writtenCheckpointId matches
inMemoryStore.rollbackTargetCheckpointId and then clear/reset
inMemoryStore.rollbackTargetCheckpointId (or rewrite the pendingWrite to remove
the target) so Persistence.startWrite / executeWrite no longer forwards it;
reference rollbackTargetCheckpointId, onWriteComplete,
InMemoryStore.cleanupAfterWrite, Persistence.startWrite, applyRollbackDiff, and
prepareForNextBatch to locate the relevant spots.
- Around line 106-123: cleanupAfterWrite currently wipes global buffers
(inMemoryStore.rawEvents.dict, each table.idsToStore and
table.invalidationsCount) which are shared across batches; instead, stop
clearing these global buffers here and only clear per-write snapshots that
belong to the completed write (i.e., the data captured in
writeArgs/pendingWrite). Concretely, remove the code that deletes keys from
inMemoryStore.rawEvents.dict and the code that zeroes
table.idsToStore/table.invalidationsCount in cleanupAfterWrite, and ensure any
necessary clearing is done when the owning snapshot is created or explicitly
released (see prepareForNextBatch which snapshots rawEvents/effects into
writeArgs and Persistence.startWrite which manages pendingWrite) so that other
pending batches’ buffers are not lost.
In `@packages/envio/src/InMemoryTable.res`:
- Around line 438-450: The code in the Updated(update) branch collapses
surviving update history to [update.latestChange], dropping unwritten
intermediate entries and fabricating a history entry when update.history is
empty; change the Js.Dict.set call so the preserved status keeps the original
update.history (e.g., history: update.history) instead of replacing it with
[update.latestChange], and keep containsRollbackDiffChange reset as intended;
update the Updated({...}) construction referenced in the
inMemTable.table.dict->Js.Dict.set call and ensure remainingChangeCount handling
remains unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 462dc386-20ee-4c0f-a203-503f7d9de958
📒 Files selected for processing (3)
packages/envio/src/InMemoryStore.respackages/envio/src/InMemoryTable.respackages/envio/src/Main.res
✅ Files skipped from review due to trivial changes (1)
- packages/envio/src/Main.res
| switch row.status { | ||
| | Loaded => | ||
| // Loaded entities are just cache — evict from memory | ||
| inMemTable->deleteEntityFromIndices(~entityId=key, ~entityIndices=row.entityIndices) | ||
| inMemTable.table.dict->Utils.Dict.deleteInPlace(key) |
There was a problem hiding this comment.
Only evict Loaded rows after they lose all retained indices.
Phase 1 already prunes stale indices. Deleting every Loaded row here also strips entity IDs from indices that survived, so hasIndex can stay true while getUnsafeOnIndex returns an empty result. packages/envio/src/LoadLayer.res:341-445 wires those two functions into LoadManager.call, so this can turn a retained cache entry into a false hit.
💡 Suggested fix
- | Loaded =>
- // Loaded entities are just cache — evict from memory
- inMemTable->deleteEntityFromIndices(~entityId=key, ~entityIndices=row.entityIndices)
- inMemTable.table.dict->Utils.Dict.deleteInPlace(key)
+ | Loaded =>
+ if row.entityIndices->Utils.Set.size === 0 {
+ // Loaded entity is no longer reachable from any retained index
+ inMemTable.table.dict->Utils.Dict.deleteInPlace(key)
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| switch row.status { | |
| | Loaded => | |
| // Loaded entities are just cache — evict from memory | |
| inMemTable->deleteEntityFromIndices(~entityId=key, ~entityIndices=row.entityIndices) | |
| inMemTable.table.dict->Utils.Dict.deleteInPlace(key) | |
| switch row.status { | |
| | Loaded => | |
| if row.entityIndices->Utils.Set.size === 0 { | |
| // Loaded entity is no longer reachable from any retained index | |
| inMemTable.table.dict->Utils.Dict.deleteInPlace(key) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/envio/src/InMemoryTable.res` around lines 423 - 427, The current
switch on row.status evicts Loaded rows unconditionally which can remove indices
that remain retained and cause hasIndex/getUnsafeOnIndex inconsistencies; update
the Loaded branch in the switch that uses row.status so it only calls
inMemTable->deleteEntityFromIndices(~entityId=key,
~entityIndices=row.entityIndices) and
inMemTable.table.dict->Utils.Dict.deleteInPlace(key) when the row has no
retained indices left (e.g., check row.entityIndices or a
retained-count/hasRetainedIndex predicate); ensure the condition aligns with how
LoadLayer.call uses hasIndex and getUnsafeOnIndex so you only delete Loaded
entries after all retained indices are pruned.
…ield - Add inMemoryStore field to Ctx.t construction in Mock.res - Add ~checkpointId=0n to LoadLayer.loadByField calls in LoadLayer_test.res https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
With background writes, getBatchWritePromise must also await Persistence.flushWrites to ensure DB state is consistent before test assertions query the database. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
There was a problem hiding this comment.
♻️ Duplicate comments (2)
packages/envio/src/GlobalState.res (2)
1039-1045:⚠️ Potential issue | 🔴 CriticalRollback continues after flush failure - control flow must abort.
After
dispatchAction(ErrorExit(errHandler))on the Error branch, execution falls through to the rollback preparation logic (lines 1047+). This means rollback queries will run against a potentially inconsistent DB state, andSetRollbackStatewill mutate the in-memory store despite the pending exit.🔧 Proposed fix - add early return after ErrorExit
switch await state.ctx.persistence->Persistence.flushWrites { - | Error(errHandler) => dispatchAction(ErrorExit(errHandler)) + | Error(errHandler) => + dispatchAction(ErrorExit(errHandler)) + // Abort rollback - don't proceed with rollback queries on inconsistent state | Ok(_) => () } + // Only continue if flushWrites succeeded + if state.ctx.persistence->Persistence.hasError { + return + }A cleaner approach would be restructuring with an early return or wrapping the rest in the
Ok(_)branch:switch await state.ctx.persistence->Persistence.flushWrites { | Error(errHandler) => dispatchAction(ErrorExit(errHandler)) - | Ok(_) => () + | Ok(_) => + let startTime = Hrtime.makeTimer() + // ... rest of rollback logic indented here } - - let startTime = Hrtime.makeTimer()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/envio/src/GlobalState.res` around lines 1039 - 1045, The Error branch after awaiting state.ctx.persistence->Persistence.flushWrites dispatches ErrorExit(errHandler) but then falls through and continues rollback logic; change control flow so execution aborts immediately after dispatchAction(ErrorExit(errHandler))—either add an early return/exit right after that dispatch or restructure the switch so the rollback preparation and SetRollbackState code live only inside the Ok(_) branch (referencing Persistence.flushWrites, dispatchAction, ErrorExit and SetRollbackState to locate the affected block).
878-888:⚠️ Potential issue | 🔴 CriticalExitWithSuccess should use
flushWritesand handle its result.The current implementation discards the write result (
let _ = await) and usesawaitCurrentWritewhich may miss pending writes that haven't started yet. If the background write fails, the process exits with success despite data not being persisted.🔧 Proposed fix
| ExitWithSuccess => - // Await any background writes before exiting - if state.ctx.persistence->Persistence.isWriting { - let _ = await state.ctx.persistence->Persistence.awaitCurrentWrite - } + // Flush all pending writes and check result before exiting + switch await state.ctx.persistence->Persistence.flushWrites { + | Error(errHandler) => + dispatchAction(ErrorExit(errHandler)) + return + | Ok(_) => () + } updateChainMetadataTable(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/envio/src/GlobalState.res` around lines 878 - 888, Replace the current use of Persistence.awaitCurrentWrite and discarding its result with a call to Persistence.flushWrites on state.ctx.persistence from within the ExitWithSuccess branch; await the flushWrites result, remove the "let _ = await" pattern and check the returned result so that on success you proceed to call updateChainMetadataTable and dispatchAction(SuccessExit), and on failure you handle the error (e.g., log and dispatchAction(FailureExit) or equivalent); update references: ExitWithSuccess, state.ctx.persistence, Persistence.awaitCurrentWrite -> Persistence.flushWrites, updateChainMetadataTable, dispatchAction(SuccessExit).
🧹 Nitpick comments (1)
packages/envio/src/GlobalState.res (1)
1090-1093: Consider defensive handling for DB value parsing.
Float.fromString(diff["events_processed_diff"])->Option.getExnwill throw if the DB returns an unparseable value. While unlikely in normal operation, corrupted rollback progress data would crash the indexer during recovery.💡 Suggested defensive approach
let eventsProcessedDiff = - Float.fromString(diff["events_processed_diff"])->Option.getExn + Float.fromString(diff["events_processed_diff"])->Option.getWithDefault(0.)Or log a warning and use a default to allow rollback to proceed.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/envio/src/GlobalState.res` around lines 1090 - 1093, The code currently uses Float.fromString(diff["events_processed_diff"])->Option.getExn which will crash on unparseable DB values; change this to safely handle the Option by pattern-matching or using Option.getWithDefault (or similar) to supply a fallback (e.g., 0.0) and log a warning when parsing fails, then use the safe value to update rollbackedProcessedEvents.contents and return it; specifically update the parsing site around eventsProcessedDiff and replace the direct getExn with a safe-branch that logs the issue and uses a default to allow rollback to proceed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@packages/envio/src/GlobalState.res`:
- Around line 1039-1045: The Error branch after awaiting
state.ctx.persistence->Persistence.flushWrites dispatches ErrorExit(errHandler)
but then falls through and continues rollback logic; change control flow so
execution aborts immediately after dispatchAction(ErrorExit(errHandler))—either
add an early return/exit right after that dispatch or restructure the switch so
the rollback preparation and SetRollbackState code live only inside the Ok(_)
branch (referencing Persistence.flushWrites, dispatchAction, ErrorExit and
SetRollbackState to locate the affected block).
- Around line 878-888: Replace the current use of Persistence.awaitCurrentWrite
and discarding its result with a call to Persistence.flushWrites on
state.ctx.persistence from within the ExitWithSuccess branch; await the
flushWrites result, remove the "let _ = await" pattern and check the returned
result so that on success you proceed to call updateChainMetadataTable and
dispatchAction(SuccessExit), and on failure you handle the error (e.g., log and
dispatchAction(FailureExit) or equivalent); update references: ExitWithSuccess,
state.ctx.persistence, Persistence.awaitCurrentWrite -> Persistence.flushWrites,
updateChainMetadataTable, dispatchAction(SuccessExit).
---
Nitpick comments:
In `@packages/envio/src/GlobalState.res`:
- Around line 1090-1093: The code currently uses
Float.fromString(diff["events_processed_diff"])->Option.getExn which will crash
on unparseable DB values; change this to safely handle the Option by
pattern-matching or using Option.getWithDefault (or similar) to supply a
fallback (e.g., 0.0) and log a warning when parsing fails, then use the safe
value to update rollbackedProcessedEvents.contents and return it; specifically
update the parsing site around eventsProcessedDiff and replace the direct getExn
with a safe-branch that logs the issue and uses a default to allow rollback to
proceed.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 07eafdd5-289a-4084-a611-e5a9ed975dea
📒 Files selected for processing (2)
packages/envio/src/GlobalState.resscenarios/test_codegen/test/helpers/Mock.res
- Move rollback logic into Ok branch of flushWrites so it doesn't execute on write failure - Use flushWrites instead of awaitCurrentWrite for ExitWithSuccess and handle errors properly https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Persistence.executeWrite now throws on failure instead of returning Error. awaitCurrentWrite and flushWrites propagate exceptions naturally. Callers use try/catch or let exceptions bubble to the top-level handler. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…eckpointId prepareForNextBatch now awaits flushWrites so the DB write completes before EventBatchProcessed fires. This ensures processedBatches only increments after data is persisted, matching the synchronous write semantics tests rely on. Also reset rollbackTargetCheckpointId in cleanupAfterWrite since the in-memory store is reused across batches. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Revert prepareForNextBatch to background writes with capacity management. In getBatchWritePromise, await the current writePromise directly (single await, minimal event loop interference) to ensure DB state is consistent for test assertions. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Flush background writes in query/queryHistory/queryRaw/queryCheckpoints/ queryEffectCache instead of getBatchWritePromise. This ensures DB consistency for assertions without interfering with event loop timing during batch processing (which caused the DC test timeout). https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Only await flushWrites in query methods when a write is actually in progress. When no write is pending, skip the await entirely to avoid yielding to the event loop (which interferes with DC test timing). https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
With background writes, empty batch processing timing changes slightly. Wait for pending getItemsOrThrow calls to appear (indicating queries have been dispatched) instead of using fixed Utils.delay(0) calls. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
… batches Replace while loop (which could hang if no queries dispatched) with getBatchWritePromise() which waits for processedBatches to increment from the empty batch processing after rollback. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Resolve both empty and real items before waiting for batch processing. The intermediate DC count assertion (checking rollback hasn't happened yet) is no longer reliable with background writes since the empty batch write may complete asynchronously before the check. The final assertion still validates the correct DC state. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Wait for 2 pending getItemsOrThrow calls (both partitions queried) before resolving real items. Use Utils.delay(1) for macrotask yields to let all background write microtasks complete between checks. Remove intermediate DC count assertion that was incompatible with background writes (rollback SQL executes during the first batch write, which may complete asynchronously before the check). https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
The system may not dispatch queries for both partitions simultaneously. Wait for at least 1 pending getItemsOrThrow call before resolving. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
After rollback, 2 queries are pending (normal + DC partition). Resolve first with empty items, resolve second with real items. Both are resolved synchronously before any processing starts, ensuring items are available before queries get consumed by background write microtasks. Wait for 2 batch writes (empty batch + real items batch). https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Revert Mock.res and Rollback_test.res to main, then re-apply only the minimal changes needed for our API: - Add inMemoryStore field to Ctx.t in Mock.res - Add ~checkpointId=0n to LoadLayer.loadByField calls in LoadLayer_test.res https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Resolve merge conflicts keeping our PR logic (background writes, prepareForNextBatch, exception-based errors, rollback changes) while accepting main's API modernization (Dict/Array/Promise APIs). Fix deprecation warnings: Js.Array2.removeCountInPlace → Array.splice, Promise.done → Promise.ignore. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
- Accept main's reduced polling and isRealtime in fetchNext - Accept main's config compat check on resume - Accept main's .gen.ts removal - Keep our branch's background write logic, split Prometheus metrics, structured rollbackDiff, and decoupled Persistence https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…plicate metrics - getBatchWritePromise now polls for BOTH processedBatches increment AND writtenCheckpointId advancement, ensuring DB is up to date before test assertions - Remove duplicate envio_storage_write_seconds/total counters from ProcessingBatch (conflicted with main's StorageWrite module that has a 'storage' label). Use StorageWrite.increment instead. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Loaded entities may still be referenced by pending LoadManager calls. Evicting them causes 'Cannot read properties of undefined' when the LoadManager resolves and calls getUnsafeInMemory on a missing key. Keep Loaded entities in memory — they're just cache and don't count toward totalChangeCount. Only evict Updated entities that have been written to DB. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
…tion Loaded entities were being evicted by cleanupAfterWrite while still referenced by pending LoadManager calls, causing undefined access. Re-add lastReferencedCheckpointId to entityWithIndices. It's stamped when entities are loaded (initValue) or mutated (set). cleanupAfterWrite only evicts entities whose lastReferencedCheckpointId <= writtenCheckpointId, ensuring entities referenced by the current batch are preserved. Add ~checkpointId param to loadById and thread it through to initValue. https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
- InMemoryTable: accept main's simplified updateIndices (getUnsafe instead of switch), use our record destructuring for indexWithRelatedIds - UserContext: accept main's ClickHouse read-only guards, add ~checkpointId=params.checkpointId to all loadById calls https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Summary
This PR refactors the batch writing mechanism to execute asynchronously in the background while the indexer continues processing events. It introduces in-memory capacity management to prevent unbounded memory growth and improves the overall throughput of the indexing pipeline.
Key Changes
Background Write Queue: Introduced
writePromiseandpendingWritefields toPersistence.tto queue and execute batch writes asynchronously. Writes are executed sequentially with automatic retry if a new write is queued during an in-progress write.Write Data Extraction: Moved data extraction logic from
writeBatchintoInMemoryStore.prepareForNextBatch, which prepares write arguments and queues the background write before the next batch is processed.In-Memory Capacity Management: Added
totalChangeCounttracking toInMemoryStoreand implementedcleanupAfterWriteto evict stale entities and indices after writes complete. When capacity exceeds 50% of the target, the indexer flushes pending writes to free memory.Rollback Diff Refactoring: Changed
prepareRollbackDiffto return a structuredrollbackDifftype instead of an in-memory store, and introducedapplyRollbackDiffto apply rollback changes to the existing in-memory store without creating a new one.Index Lifecycle Tracking: Enhanced
InMemoryTable.Entity.indexWithRelatedIdsto tracklastReferencedCheckpointId, enabling cleanup of stale indices that are no longer referenced after a write checkpoint.Entity Cleanup Logic: Implemented sophisticated cleanup in
InMemoryTable.Entity.cleanupAfterWritethat:Metrics Separation: Decoupled database write duration metrics from batch processing metrics, as writes now occur asynchronously in the background.
Error Handling: Added error checking in the main loop to detect and handle background write failures without blocking event processing.
Checkpoint Tracking: Added
writtenCheckpointIdto track the last successfully written checkpoint, enabling proper cleanup and capacity management decisions.Notable Implementation Details
executeWritefunction recursively processes pending writes, ensuring sequential execution and automatic retry if writes are queued during processing.flushWritesprovides a way to await all pending and in-progress writes, used before rollbacks and graceful shutdown.https://claude.ai/code/session_01TeJKpVDQq7KxXbAAKBsQGx
Summary by CodeRabbit
New Features
Performance Improvements
Behavior / Reliability