feat(a2a): expose A2AEventStore + A2AReplayProvider injection points#43
Conversation
A2AServerApp gains two optional hooks:
- eventStore: A2AEventStore — durable Dict-backed event journal
- replayProvider: A2AReplayProvider — pull-style replay source for
tasks/resubscribe after web-tier scale-to-zero
Both flow through to A2AServer.Config so the server can persist live
events as they fan out (eventStore.append) and reconstruct event
streams from the persistent journal when an in-memory bus is gone.
Companion change in tjc-agents wires Modal Dict-backed implementations
of both hooks; together they make the v2-session A2A runtime fully
stateless across web-tier evictions: durable completion (artifacts +
status from a Modal Volume), durable session resume (sessionId
rescued from state.json on tasks/get), and durable event replay
(events appended to a Modal Volume JSONL journal during the run,
served back via tasks/resubscribe).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code ReviewNice clean seam — A few things worth a second look before merge: Behavior changes / API
Performance / hot-path concern
Robustness
Test coverageThe new test (
Minor
Overall: solid surface, the seams are in the right place. The hot-path serialization (#4) and the silent terminal-resubscribe semantics shift (#1) are the two I'd want resolved before merging. |
…ns + non-blocking event store Addresses review feedback on #43 — both the autoreviewer's hot-path / behavior-change concerns and the human reviewer's two stream-semantics bugs: **Durable-resubscribe stream semantics (P2 #1, P2 #8)** The previous fallback returned a single TaskSnapshot and closed when no replay source was configured, regardless of whether the task was terminal. For *terminal* tasks that's wire-legal (the snapshot carries the terminal state) but is a behavior change vs. the prior raise — now documented inline. For *non-terminal* tasks it was actively wrong: a client got a Working snapshot and EOF, indistinguishable from a lost run that completed successfully. Restore the unsupportedOperation failure for `non-terminal task + no replay configured`; the client should poll tasks/get instead of mistaking a frozen replay for a complete one. **Cancellation events are persisted (P2 #9)** cancelTask publishes the final canceled TaskStatusUpdate directly to the bus + push sender, bypassing ResultManager.publish — so with an A2AEventStore configured, durable resubscribe after the runtime entry is gone replayed prior events but never the cancellation. Enqueue the canceled event onto the same EventStorePersister chain ResultManager uses, so it lands in the durable log alongside everything else. **Non-blocking persistence on the publish hot path (autoreviewer #4)** `persistEvent` was sequenced before `bus.publish`, so live SSE subscribers waited on a Modal Dict append (~50–200 ms typical, up to 2 s on timeout) for every event. New `EventStorePersister` mirrors `PushNotificationSender`'s per-task delivery-chain pattern: each enqueue forks a daemon that awaits the prior chain link, runs the bounded + defect-trapped append, then signals the next. Append order preserved per `(taskId, tenant)`; live subscribers off the persistence critical path. **Robustness items the reviewer named** * TaskSnapshot filter symmetric across replayProvider and eventStore branches — providers no longer have to remember an implicit contract. * `eventStoreAppendTimeout` (2 s default) and `eventStoreLoadTimeout` (5 s default) promoted to `A2AServer.Config` fields. * `catchAllCause` around the inner append so a misbehaving impl (JS throw, NPE) doesn't crash the daemon chain. * `println` → `ZIO.logWarning` to match `pushSender`'s analogous path. * Scaladoc on `A2AEventStore` and `A2AReplayProvider` covering the `(taskId, tenant)` storage-key contract and the replayProvider-precedence-over-eventStore rule. **Test coverage** * cancellation events persist to the durable store (covers P2 #9) * replayProvider takes precedence over eventStore (locks the contract) * event store failures (defects + timeouts) don't break execution The pre-existing unsupportedOperation symbol takes a String reason; the non-terminal-without-replay failure now points the caller at `tasks/get` for status. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Pushed
New tests (3 added, 14 total in
The non-terminal-without-replay rejection path is exercised manually but not in this commit's unit tests — testing it cleanly requires either exposing the runtime registry or a process-restart simulation, both heavier than warranted. The behavior is a small conditional on top of the otherwise-tested branches. Local verification:
|
ReviewSolid PR, and the second commit visibly responded to prior review feedback (durable-resubscribe semantics, cancel persistence, hot-path off, ordering). Two real correctness issues plus a few nits below. Code quality / design
Bugs / correctness1. In case Some(store) =>
store
.load(task.id, context.tenant, config.eventReplayLimit)
.timeout(config.eventStoreLoadTimeout)
.map(_.getOrElse(Nil)) // <- timeout/None silently becomes empty
.map(events => snapshot ++ ZStream.fromIterable(events.filter(notSnapshot)))This is the same hazard the PR explicitly closes for the Suggest: log on timeout and fail if the task is non-terminal. Roughly: case Some(store) =>
store.load(...).timeout(config.eventStoreLoadTimeout).flatMap {
case Some(events) => ZIO.succeed(snapshot ++ ZStream.fromIterable(events.filter(notSnapshot)))
case None if task.isTerminal =>
ZIO.logWarning(s\"[a2a-event-store] load timed out task=${task.id.value}; returning snapshot only\")
.as(snapshot)
case None =>
ZIO.fail(A2AError.unsupportedOperation(s\"Event store load timed out for non-terminal task ${task.id.value}; poll tasks/get.\"))
}2. Race in previous <- ZIO.succeed(chains.get(key))
current <- Promise.make[Nothing, Unit]
_ <- ZIO.succeed(chains.update(key, current))Two In practice on Scala.js the JS event loop is single-threaded and Two cheap mitigations:
Test coverageCoverage of the new seams is solid (precedence, cancel persistence, defect tolerance). Two gaps worth filling:
Minor
Compatibility
Nice work overall. The two correctness items (#1 silent timeout, #2 chain race) are the only ones I'd want addressed before merge. |
Review — A2A durable event store + replay providerSolid, focused PR. The design cleanly slots into the existing per-task-key delivery-chain pattern, the docstrings (the A few items worth a look: 1. Potential deadlock if
|
Summary
Adds two optional hooks to
A2AServerAppso concrete deployments can plug in durable event persistence and replay:eventStore: Option[A2AEventStore]— the server appends every fan-out event here (in addition to the in-memory bus) so it survives container restarts.replayProvider: Option[A2AReplayProvider]—tasks/resubscribeconsults this when the in-memory bus has nothing (or has been evicted), so subscribers can replay events that were emitted before they connected.Both flow through
A2AServer.Config. No behavior change when the hooks are unset (the existing in-memoryA2AEventBusstill does the job for happy-path live streaming).Why
A companion change in
tjc-agents(TJC's Modal-Sandbox-backed deployment of agents) hits a real durability gap: when an agent runs for 15+ minutes, Modal scales the@modal.web_servercontainer to zero between A2A polls, killing the orchestrator fiber that holdsA2AEventBus.history. Subscribers that arrive after the container comes back get nothing.These two seams let the tjc-agents side wire Modal Dict + Modal Volume implementations:
ModalA2AEventStore— append-only Dict-backed logModalSandboxReplayProvider— pulls events from a per-task JSONL the worker writes to a Modal Volume; survives any web-tier lifecycle eventEnd result, verified live on dev: agent runtime is fully stateless across web-tier evictions for completion + resume + event replay, no data loss.
Compatibility
Both fields default to
None; existing apps see zero behavior change. Tests inA2AServerOperationSpeccover the new wiring under both seams set and unset.Test plan
🤖 Generated with Claude Code