Skip to content

Pipeline batch SQL commits to run in parallel with next batch's CPU phase#1207

Closed
moose-code wants to merge 1 commit into
mainfrom
claude/perf-pipeline-overlap
Closed

Pipeline batch SQL commits to run in parallel with next batch's CPU phase#1207
moose-code wants to merge 1 commit into
mainfrom
claude/perf-pipeline-overlap

Conversation

@moose-code
Copy link
Copy Markdown
Member

@moose-code moose-code commented May 14, 2026

Summary

Implements batch processing pipelining to overlap the previous batch's SQL commit with the next batch's CPU phase (preload + handlers). This allows the indexer to hide database write latency and improve throughput.

Key Changes

  • GlobalState: Added pipelineState record with mutable pendingCommit field to track in-flight commits across state updates. The pendingCommit holds both the staged InMemoryStore (for loaders to see pending writes) and a promise that resolves when the SQL transaction lands.

  • EventProcessing: Split batch processing into two phases:

    • processEventBatchCpuOrThrow: Runs preload + handlers, returns elapsed durations without committing
    • commitBatchOrThrow: Performs the deferred SQL commit, designed to run in the background
  • LoadLayer: Added transit type to expose the previous batch's staged writes and commit promise. Updated loadById to check the transit in-memory table as a fallback so it sees entities from the prior batch before their SQL lands. Updated loadByField to await the transit commit promise before querying the database, since secondary indexes don't carry across batches.

  • UserContext: Threaded transit through handler context so loaders can access staged writes from the previous batch.

  • Batch processing flow: After handlers complete, fire the SQL commit as a background promise and immediately dispatch EventBatchProcessed. The next batch's CPU phase runs in parallel. Before the next batch's commit, await the prior commit to maintain strict SQL write ordering. Before rollback, drain any pending commit to ensure rollback queries see consistent state.

  • Exit handling: Before SuccessExit, drain the final pending commit so declared endblock state actually lands in Postgres.

  • Test helpers: Updated MockIndexer to drain pending commits when waiting for batch processing, ensuring tests see committed state.

Notable Implementation Details

  • The pipelineState lives outside the spread-copied state record so mutations survive {...state, ...} patterns.
  • Pipelining is disabled during rollback replay (no transit passed) to keep the diff store exclusively visible.
  • loadById checks transit as a fallback; loadByField must await the commit first because in-memory secondary indexes don't carry across batches.
  • Metrics are stitched after the deferred commit completes using durations captured during the CPU phase.

https://claude.ai/code/session_01ENbBusRSerrzNiexfj5kWG

Summary by CodeRabbit

Release Notes

  • Refactor
    • Optimized batch processing pipeline by separating CPU execution and database commit phases for improved efficiency and reduced latency.
    • Enhanced concurrent batch operations to allow safe access to data from previous uncommitted batch writes.
    • Improved rollback handling to properly manage in-flight database operations and error scenarios.

Review Change Stack

processEventBatch previously awaited the SQL transaction before the
next ProcessEventBatch task could begin. For DB-bound backfills this
meant the indexer paid every batch's commit latency serially, even
though the next batch's decode + handler work could have run in
parallel.

Split processEventBatch into two phases:

- processEventBatchCpuOrThrow: preload + handlers, no SQL.
- commitBatchOrThrow: SQL commit only, runs in background.

GlobalState gains a `pipeline.pendingCommit` holder (stable record,
not a `mutable` field on the spread-copied state). After a batch's
CPU finishes, its commit is fired without awaiting and tracked there;
the next batch's CPU phase reads it as `transit` so loaders see
staged writes. Before starting its own commit the next batch awaits
the prior one — keeps SQL writes strictly ordered.

LoadLayer.transit bundles the staged InMemoryStore and the commit
promise. loadById consults the staged store directly (no await).
loadByField drains the commit before its SELECT because the in-
memory secondary index doesn't carry across batches.

Rollback and SuccessExit drain pendingCommit before proceeding so
post-commit reads see the right Postgres state. ErrorExit lets the
indexer fail fast — the in-flight write is discarded; the next run
resumes from the last committed checkpoint.

MockIndexer.getBatchWritePromise now awaits pendingCommit too so
test queries that hit Postgres immediately after see the committed
state, not pre-commit.

Status: WriteRead loader tests pass. Some E2E and rollback tests
still fail — needs investigation before landing.

https://claude.ai/code/session_01ENbBusRSerrzNiexfj5kWG
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 14, 2026

📝 Walkthrough

Walkthrough

This PR decouples event batch processing into independent CPU and SQL phases while enabling handlers to read staged data from the previous batch's in-flight commit. A new transit parameter threads information about prior commits through loaders, allowing cross-batch read fallback before data is fully persisted.

Changes

Event batch CPU/SQL pipelining with prior-batch write fallback

Layer / File(s) Summary
Transit type and LoadLayer interface
packages/envio/src/LoadLayer.resi
Introduce transit type capturing in-memory store and commit promise; extend loadById and loadByField signatures with optional ~transit: transit=? parameter.
LoadLayer fallback implementation
packages/envio/src/LoadLayer.res
loadById checks active batch first, then falls back to prior batch's transit in-memory table; loadByField explicitly drains prior commit promise before issuing database query and logs/raises on commit failure.
UserContext transit threading
packages/envio/src/UserContext.res
Add transit: option<LoadLayer.transit> field to contextParams; pass transit through getWhereHandler field-load operators, entityTraps entity-get methods, and entity context building to expose prior batch's staged data to user handlers.
EventProcessing handler execution wiring
packages/envio/src/EventProcessing.res
Thread ~transit parameter through runEventHandlerOrThrow, runHandlerOrThrow, preloadBatchOrThrow, and runBatchHandlersOrThrow, passing transit into handler context construction for all handler types.
EventProcessing batch split (CPU + commit phases)
packages/envio/src/EventProcessing.res
Introduce cpuPhaseResult type; create processEventBatchCpuOrThrow that runs preload + handlers and returns { loaderDuration, handlerDuration } without database write; create commitBatchOrThrow for persistence write and metrics registration.
GlobalState pipeline tracking and ordered commits
packages/envio/src/GlobalState.res
Add pipeline field to GlobalState tracking pending commit state; implement CPU/SQL pipelining in ProcessEventBatch that creates transit from prior commit, runs CPU phase, waits for prior commit before scheduling current batch commit; drain pipeline on successful exit and during rollback.
Test and mock updates
scenarios/test_codegen/test/EventOrigin_test.res, scenarios/test_codegen/test/helpers/MockIndexer.res
Update handler context initialization to include transit: None; update getBatchWritePromise to drain pending commit promise before resolving batch-write promise.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • enviodev/hyperindex#775: Threads additional per-batch state (~chains) through the same handler-context construction paths in EventProcessing and UserContext.

Suggested reviewers

  • JonoPrest
  • DZakh
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly describes the main architectural change: pipelining SQL commits to overlap with the next batch's CPU phase, which is the core objective of this PR.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
scenarios/test_codegen/test/helpers/MockIndexer.res (1)

386-390: 💤 Low value

Consider checking the commit result in tests.

The pending commit's result is discarded (let _ = await commitPromise). If the commit fails with Error(errHandler), the test might not catch it here—it would fail elsewhere in the indexer. For better test debuggability, consider matching on the result and explicitly failing if an error occurs:

 switch (gsManager->GlobalStateManager.getState).pipeline.pendingCommit {
 | Some({commitPromise}) =>
-  let _ = await commitPromise
+  switch await commitPromise {
+  | Ok() => ()
+  | Error(errHandler) =>
+    JsError.throwWithMessage(
+      `Pending commit failed during test: ${errHandler.exn->Utils.prettifyExn}`
+    )
+  }
 | None => ()
 }

This would surface commit failures immediately rather than relying on downstream test assertions.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@scenarios/test_codegen/test/helpers/MockIndexer.res` around lines 386 - 390,
The test currently discards the pending commit result (let _ = await
commitPromise) so failures are hidden; change the await to capture the result
from
gsManager->GlobalStateManager.getState.pipeline.pendingCommit.commitPromise,
pattern-match the awaited value (e.g., Ok vs Error or Result variant used by
your runtime) and explicitly fail the test or raise an assertion if the commit
returned Error(errHandler) so commit failures surface immediately in the
MockIndexer test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@scenarios/test_codegen/test/helpers/MockIndexer.res`:
- Around line 386-390: The test currently discards the pending commit result
(let _ = await commitPromise) so failures are hidden; change the await to
capture the result from
gsManager->GlobalStateManager.getState.pipeline.pendingCommit.commitPromise,
pattern-match the awaited value (e.g., Ok vs Error or Result variant used by
your runtime) and explicitly fail the test or raise an assertion if the commit
returned Error(errHandler) so commit failures surface immediately in the
MockIndexer test.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: da1c11b6-b418-479d-a3a0-cee42bd418ac

📥 Commits

Reviewing files that changed from the base of the PR and between 8ebf7ff and 4af4258.

📒 Files selected for processing (7)
  • packages/envio/src/EventProcessing.res
  • packages/envio/src/GlobalState.res
  • packages/envio/src/LoadLayer.res
  • packages/envio/src/LoadLayer.resi
  • packages/envio/src/UserContext.res
  • scenarios/test_codegen/test/EventOrigin_test.res
  • scenarios/test_codegen/test/helpers/MockIndexer.res

@moose-code moose-code closed this May 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants