Pipeline batch SQL commits to run in parallel with next batch's CPU phase#1207
Pipeline batch SQL commits to run in parallel with next batch's CPU phase#1207moose-code wants to merge 1 commit into
Conversation
processEventBatch previously awaited the SQL transaction before the next ProcessEventBatch task could begin. For DB-bound backfills this meant the indexer paid every batch's commit latency serially, even though the next batch's decode + handler work could have run in parallel. Split processEventBatch into two phases: - processEventBatchCpuOrThrow: preload + handlers, no SQL. - commitBatchOrThrow: SQL commit only, runs in background. GlobalState gains a `pipeline.pendingCommit` holder (stable record, not a `mutable` field on the spread-copied state). After a batch's CPU finishes, its commit is fired without awaiting and tracked there; the next batch's CPU phase reads it as `transit` so loaders see staged writes. Before starting its own commit the next batch awaits the prior one — keeps SQL writes strictly ordered. LoadLayer.transit bundles the staged InMemoryStore and the commit promise. loadById consults the staged store directly (no await). loadByField drains the commit before its SELECT because the in- memory secondary index doesn't carry across batches. Rollback and SuccessExit drain pendingCommit before proceeding so post-commit reads see the right Postgres state. ErrorExit lets the indexer fail fast — the in-flight write is discarded; the next run resumes from the last committed checkpoint. MockIndexer.getBatchWritePromise now awaits pendingCommit too so test queries that hit Postgres immediately after see the committed state, not pre-commit. Status: WriteRead loader tests pass. Some E2E and rollback tests still fail — needs investigation before landing. https://claude.ai/code/session_01ENbBusRSerrzNiexfj5kWG
📝 WalkthroughWalkthroughThis PR decouples event batch processing into independent CPU and SQL phases while enabling handlers to read staged data from the previous batch's in-flight commit. A new ChangesEvent batch CPU/SQL pipelining with prior-batch write fallback
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
scenarios/test_codegen/test/helpers/MockIndexer.res (1)
386-390: 💤 Low valueConsider checking the commit result in tests.
The pending commit's result is discarded (
let _ = await commitPromise). If the commit fails withError(errHandler), the test might not catch it here—it would fail elsewhere in the indexer. For better test debuggability, consider matching on the result and explicitly failing if an error occurs:switch (gsManager->GlobalStateManager.getState).pipeline.pendingCommit { | Some({commitPromise}) => - let _ = await commitPromise + switch await commitPromise { + | Ok() => () + | Error(errHandler) => + JsError.throwWithMessage( + `Pending commit failed during test: ${errHandler.exn->Utils.prettifyExn}` + ) + } | None => () }This would surface commit failures immediately rather than relying on downstream test assertions.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@scenarios/test_codegen/test/helpers/MockIndexer.res` around lines 386 - 390, The test currently discards the pending commit result (let _ = await commitPromise) so failures are hidden; change the await to capture the result from gsManager->GlobalStateManager.getState.pipeline.pendingCommit.commitPromise, pattern-match the awaited value (e.g., Ok vs Error or Result variant used by your runtime) and explicitly fail the test or raise an assertion if the commit returned Error(errHandler) so commit failures surface immediately in the MockIndexer test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@scenarios/test_codegen/test/helpers/MockIndexer.res`:
- Around line 386-390: The test currently discards the pending commit result
(let _ = await commitPromise) so failures are hidden; change the await to
capture the result from
gsManager->GlobalStateManager.getState.pipeline.pendingCommit.commitPromise,
pattern-match the awaited value (e.g., Ok vs Error or Result variant used by
your runtime) and explicitly fail the test or raise an assertion if the commit
returned Error(errHandler) so commit failures surface immediately in the
MockIndexer test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: da1c11b6-b418-479d-a3a0-cee42bd418ac
📒 Files selected for processing (7)
packages/envio/src/EventProcessing.respackages/envio/src/GlobalState.respackages/envio/src/LoadLayer.respackages/envio/src/LoadLayer.resipackages/envio/src/UserContext.resscenarios/test_codegen/test/EventOrigin_test.resscenarios/test_codegen/test/helpers/MockIndexer.res
Summary
Implements batch processing pipelining to overlap the previous batch's SQL commit with the next batch's CPU phase (preload + handlers). This allows the indexer to hide database write latency and improve throughput.
Key Changes
GlobalState: Added
pipelineStaterecord with mutablependingCommitfield to track in-flight commits across state updates. ThependingCommitholds both the stagedInMemoryStore(for loaders to see pending writes) and a promise that resolves when the SQL transaction lands.EventProcessing: Split batch processing into two phases:
processEventBatchCpuOrThrow: Runs preload + handlers, returns elapsed durations without committingcommitBatchOrThrow: Performs the deferred SQL commit, designed to run in the backgroundLoadLayer: Added
transittype to expose the previous batch's staged writes and commit promise. UpdatedloadByIdto check the transit in-memory table as a fallback so it sees entities from the prior batch before their SQL lands. UpdatedloadByFieldto await the transit commit promise before querying the database, since secondary indexes don't carry across batches.UserContext: Threaded
transitthrough handler context so loaders can access staged writes from the previous batch.Batch processing flow: After handlers complete, fire the SQL commit as a background promise and immediately dispatch
EventBatchProcessed. The next batch's CPU phase runs in parallel. Before the next batch's commit, await the prior commit to maintain strict SQL write ordering. Before rollback, drain any pending commit to ensure rollback queries see consistent state.Exit handling: Before
SuccessExit, drain the final pending commit so declared endblock state actually lands in Postgres.Test helpers: Updated
MockIndexerto drain pending commits when waiting for batch processing, ensuring tests see committed state.Notable Implementation Details
pipelineStatelives outside the spread-copied state record so mutations survive{...state, ...}patterns.loadByIdchecks transit as a fallback;loadByFieldmust await the commit first because in-memory secondary indexes don't carry across batches.https://claude.ai/code/session_01ENbBusRSerrzNiexfj5kWG
Summary by CodeRabbit
Release Notes