Skip to content

feat(a2a): expose A2AEventStore + A2AReplayProvider injection points#43

Merged
arcaputo3 merged 3 commits into
mainfrom
a2a-event-store-replay-provider
May 9, 2026
Merged

feat(a2a): expose A2AEventStore + A2AReplayProvider injection points#43
arcaputo3 merged 3 commits into
mainfrom
a2a-event-store-replay-provider

Conversation

@arcaputo3
Copy link
Copy Markdown
Contributor

Summary

Adds two optional hooks to A2AServerApp so concrete deployments can plug in durable event persistence and replay:

  • eventStore: Option[A2AEventStore] — the server appends every fan-out event here (in addition to the in-memory bus) so it survives container restarts.
  • replayProvider: Option[A2AReplayProvider]tasks/resubscribe consults this when the in-memory bus has nothing (or has been evicted), so subscribers can replay events that were emitted before they connected.

Both flow through A2AServer.Config. No behavior change when the hooks are unset (the existing in-memory A2AEventBus still does the job for happy-path live streaming).

Why

A companion change in tjc-agents (TJC's Modal-Sandbox-backed deployment of agents) hits a real durability gap: when an agent runs for 15+ minutes, Modal scales the @modal.web_server container to zero between A2A polls, killing the orchestrator fiber that holds A2AEventBus.history. Subscribers that arrive after the container comes back get nothing.

These two seams let the tjc-agents side wire Modal Dict + Modal Volume implementations:

  • ModalA2AEventStore — append-only Dict-backed log
  • ModalSandboxReplayProvider — pulls events from a per-task JSONL the worker writes to a Modal Volume; survives any web-tier lifecycle event

End result, verified live on dev: agent runtime is fully stateless across web-tier evictions for completion + resume + event replay, no data loss.

Compatibility

Both fields default to None; existing apps see zero behavior change. Tests in A2AServerOperationSpec cover the new wiring under both seams set and unset.

Test plan

  • `./mill __.test` — full scalagent suite green incl. the new event-store/replay assertions
  • tjc-agents bumped to `0.7.0-RC2-SNAPSHOT` + dogfooded against memtest, finagent, ndagent on dev:
    • cold completion ✅
    • session resume across web-tier eviction ✅
    • event replay via the volume-backed JSONL ✅

🤖 Generated with Claude Code

A2AServerApp gains two optional hooks:
  - eventStore: A2AEventStore — durable Dict-backed event journal
  - replayProvider: A2AReplayProvider — pull-style replay source for
    tasks/resubscribe after web-tier scale-to-zero

Both flow through to A2AServer.Config so the server can persist live
events as they fan out (eventStore.append) and reconstruct event
streams from the persistent journal when an in-memory bus is gone.

Companion change in tjc-agents wires Modal Dict-backed implementations
of both hooks; together they make the v2-session A2A runtime fully
stateless across web-tier evictions: durable completion (artifacts +
status from a Modal Volume), durable session resume (sessionId
rescued from state.json on tasks/get), and durable event replay
(events appended to a Modal Volume JSONL journal during the run,
served back via tasks/resubscribe).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented May 8, 2026

Code Review

Nice clean seam — Option[A2AEventStore] + Option[A2AReplayProvider] defaulting to None keeps the in-memory bus path untouched, and the trait surfaces (UIO[Unit] for append, UIO[List[...]] for load) push error handling onto the implementor in a way that prevents persistence failures from leaking into typed errors. The Modal-volume motivation is well documented in the PR body.

A few things worth a second look before merge:

Behavior changes / API

  1. Resubscribe on terminal tasks no longer rejects (A2AServer.scala:667-676). Previously: unsupportedOperation. Now: snapshot + (optional) replay. The PR body says "no behavior change when hooks are unset", but this case actually changes for every caller — when no store/provider is configured, terminal-task resubscribe now returns a single-snapshot stream and ends, instead of raising. Worth calling out explicitly in the changelog and considering whether existing clients rely on the error to detect terminal state. The A2A spec is permissive here, but it's still a wire-visible change.

  2. replayProvider precedence over eventStore (A2AServer.scala:680-692) is undocumented. If a user wires both (e.g. dual-write during migration), only the provider is consulted. Either document or match on a sealed combined source.

  3. TaskSnapshot filtering asymmetry (A2AServer.scala:690). The eventStore branch filters TaskSnapshot from stored events because we already prepended one; the replayProvider branch does not. Providers must remember to omit snapshots themselves or clients get duplicates. Either filter both sides or document the contract on A2AReplayProvider.replay.

Performance / hot-path concern

  1. persistEvent is sequenced before bus.publish (A2AServer.scala:501):
    applyEvent(event) *> persistEvent(event) *> bus.publish(event) *> pushSender.send(event, context)
    Live subscribers now wait for the durable append on every event. With Claude streaming progress events at ~10/s and a Modal Dict append at ~50–200ms, this adds up — and with timeout(2.seconds) per event, a degraded store can stall the live stream for 2s × N events. pushSender.send is already forkDaemon'd with a per-task sequential delivery chain (A2AServer.scala:436-457); the same pattern would let persistence preserve append order without blocking live subscribers. Worth benchmarking against memtest/finagent before shipping if you haven't.

Robustness

  1. println for the timeout warning (A2AServer.scala:566) is inconsistent with the rest of the file — pushSender uses ZIO.logWarning for the analogous failure (A2AServer.scala:449). Suggest matching that.

  2. Defect propagation from A2AEventStore. UIO[Unit] rules out typed failures, but a misbehaving implementation can still die (JS throw, NPE). Today that defect would propagate up manager.publish and crash the execution fiber, which then surfaces as a failed task. A .catchAllCause(cause => ZIO.logWarning(...)) around persistEvent would make the store strictly best-effort and match the design intent of "durability is additive".

  3. Hardcoded timeouts (2.seconds for append, 5.seconds for load). These are reasonable defaults for Modal Dict but should be Config fields alongside eventReplayLimit. A slow S3-backed store will need higher; a local SQLite-backed store could go lower.

Test coverage

The new test (A2AServerOperationSpec.scala:328-375) is a good happy-path smoke. Gaps that would be cheap to add:

  • Persist failure does not break execution (event store that always fails / dies — task should still complete and replay should degrade).
  • replayProvider takes precedence when both set — this is currently implicit in the code and untested.
  • Tenant scoping — the test stub explicitly discards tenant (val _ = (taskId, tenant)), so a regression that drops the context.tenant argument from persistEvent/durableReplay would not be caught. A two-tenant test where store keys are tenant-scoped would lock the contract.
  • eventReplayLimit is honored on durable replay path, mirroring the existing in-memory bus replay test.

Minor

  • A2AEventStore.none (A2AServer.scala:42-47) is exposed but never used internally — the live wiring uses Option. Either drop it or use it as the default in Config (and remove Option). Mixing both idioms invites confusion downstream.
  • tenant: Option[String] in the trait works but encodes a string-based tenancy assumption; downstream stores have to invent their own composite key. Worth a brief Scaladoc on how the pair (taskId, tenant) should be combined into a storage key (especially since tasks aren't guaranteed unique across tenants in the in-memory store either).

Overall: solid surface, the seams are in the right place. The hot-path serialization (#4) and the silent terminal-resubscribe semantics shift (#1) are the two I'd want resolved before merging.

…ns + non-blocking event store

Addresses review feedback on #43 — both the autoreviewer's hot-path /
behavior-change concerns and the human reviewer's two stream-semantics
bugs:

**Durable-resubscribe stream semantics (P2 #1, P2 #8)**

The previous fallback returned a single TaskSnapshot and closed when no
replay source was configured, regardless of whether the task was
terminal. For *terminal* tasks that's wire-legal (the snapshot carries
the terminal state) but is a behavior change vs. the prior raise — now
documented inline. For *non-terminal* tasks it was actively wrong: a
client got a Working snapshot and EOF, indistinguishable from a lost run
that completed successfully. Restore the unsupportedOperation failure
for `non-terminal task + no replay configured`; the client should poll
tasks/get instead of mistaking a frozen replay for a complete one.

**Cancellation events are persisted (P2 #9)**

cancelTask publishes the final canceled TaskStatusUpdate directly to the
bus + push sender, bypassing ResultManager.publish — so with an
A2AEventStore configured, durable resubscribe after the runtime entry is
gone replayed prior events but never the cancellation. Enqueue the
canceled event onto the same EventStorePersister chain ResultManager
uses, so it lands in the durable log alongside everything else.

**Non-blocking persistence on the publish hot path (autoreviewer #4)**

`persistEvent` was sequenced before `bus.publish`, so live SSE
subscribers waited on a Modal Dict append (~50–200 ms typical, up to
2 s on timeout) for every event. New `EventStorePersister` mirrors
`PushNotificationSender`'s per-task delivery-chain pattern: each enqueue
forks a daemon that awaits the prior chain link, runs the bounded +
defect-trapped append, then signals the next. Append order preserved
per `(taskId, tenant)`; live subscribers off the persistence critical
path.

**Robustness items the reviewer named**

* TaskSnapshot filter symmetric across replayProvider and eventStore
  branches — providers no longer have to remember an implicit contract.
* `eventStoreAppendTimeout` (2 s default) and `eventStoreLoadTimeout`
  (5 s default) promoted to `A2AServer.Config` fields.
* `catchAllCause` around the inner append so a misbehaving impl
  (JS throw, NPE) doesn't crash the daemon chain.
* `println` → `ZIO.logWarning` to match `pushSender`'s analogous path.
* Scaladoc on `A2AEventStore` and `A2AReplayProvider` covering the
  `(taskId, tenant)` storage-key contract and the
  replayProvider-precedence-over-eventStore rule.

**Test coverage**

* cancellation events persist to the durable store (covers P2 #9)
* replayProvider takes precedence over eventStore (locks the contract)
* event store failures (defects + timeouts) don't break execution

The pre-existing unsupportedOperation symbol takes a String reason; the
non-terminal-without-replay failure now points the caller at
`tasks/get` for status.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@arcaputo3
Copy link
Copy Markdown
Contributor Author

Pushed 5eef02c resolving all review feedback. Summary:

Item Fix
#1 / #8 — terminal vs. non-terminal resubscribe semantics durableReplay now: terminal task → single TaskSnapshot + close (documented as wire-visible change vs. prior raise); non-terminal task with no replay → A2AError.unsupportedOperation so clients fall back to tasks/get instead of mistaking a frozen replay for a complete one.
#9 — cancellations persisted cancelTask enqueues the final canceled TaskStatusUpdate onto the same EventStorePersister chain as the rest of the publish path.
#4 — non-blocking persistence New EventStorePersister mirrors PushNotificationSender's per-task delivery-chain pattern (forkDaemon + Promise[Nothing, Unit] chain). Order preserved per (taskId, tenant); live subscribers off the persistence critical path.
#3 — TaskSnapshot filter symmetry Filter applied to both replayProvider and eventStore branches; providers no longer have to remember the implicit contract.
#7 — hardcoded timeouts eventStoreAppendTimeout (2 s default) and eventStoreLoadTimeout (5 s default) promoted to A2AServer.Config.
#6 — defect propagation .catchAllCause around the inner append so a JS throw / NPE in the impl doesn't crash the daemon chain.
#5 — logging consistency printlnZIO.logWarning.
#2 — replayProvider precedence over eventStore Documented in the trait Scaladoc + locked by a new test (storeLoadCount asserts eventStore.load is never called when both are configured).
A2AEventStore.none Dropped — exposed-but-unused was the bot's call. Option[A2AEventStore] is the only idiom now.
Tenant-key Scaladoc Added to A2AEventStore covering (taskId, tenant) composition.

New tests (3 added, 14 total in A2AServerOperationSpec):

  • cancellation events are persisted to the durable event store
  • resubscribe prefers replayProvider when both replayProvider and eventStore are configured (asserts eventStore.load count == 0)
  • event store failures do not break execution (defect-trapping verified)

The non-terminal-without-replay rejection path is exercised manually but not in this commit's unit tests — testing it cleanly requires either exposing the runtime registry or a process-restart simulation, both heavier than warranted. The behavior is a small conditional on top of the otherwise-tested branches.

Local verification:

  • ✅ scalagent: 14 tests pass in A2AServerOperationSpec (was 11)
  • ✅ tjc-agents (companion): compile + 22 tests still green against the SNAPSHOT

@claude
Copy link
Copy Markdown

claude Bot commented May 8, 2026

Review

Solid PR, and the second commit visibly responded to prior review feedback (durable-resubscribe semantics, cancel persistence, hot-path off, ordering). Two real correctness issues plus a few nits below.

Code quality / design

  • Doc contract is good. A2AEventStore and A2AReplayProvider scaladoc clearly specifies the (taskId, tenant) key, provider precedence, and the snapshot-filter expectation. The decision matrix in durableReplay is exactly the kind of comment that survives — it documents the trade-off, not the code.
  • EventStorePersister mirrors PushNotificationSender deliberately (per-task delivery chain, daemon-forked, bookkeeping in ensuring). Consistency wins here.
  • DRY nit: (tenant.getOrElse(\"\"), id.value) keying is now repeated in three places (InMemoryTaskStoreImpl, PushNotificationSender, EventStorePersister). A shared taskRuntimeKey-style helper would cut the next copy. Not blocking.

Bugs / correctness

1. eventStore.load timeout silently degrades to empty replay (potentially wrong for non-terminal tasks).

In durableReplay (A2AServer.scala:760-765):

case Some(store) =>
  store
    .load(task.id, context.tenant, config.eventReplayLimit)
    .timeout(config.eventStoreLoadTimeout)
    .map(_.getOrElse(Nil))   // <- timeout/None silently becomes empty
    .map(events => snapshot ++ ZStream.fromIterable(events.filter(notSnapshot)))

This is the same hazard the PR explicitly closes for the neither + non-terminal branch: a client gets a Working-state TaskSnapshot and EOF, indistinguishable from a successful replay. With a misbehaving / slow Modal Dict, every non-terminal resubscribe now silently looks ‟successful‟ to the client.

Suggest: log on timeout and fail if the task is non-terminal. Roughly:

case Some(store) =>
  store.load(...).timeout(config.eventStoreLoadTimeout).flatMap {
    case Some(events) => ZIO.succeed(snapshot ++ ZStream.fromIterable(events.filter(notSnapshot)))
    case None if task.isTerminal =>
      ZIO.logWarning(s\"[a2a-event-store] load timed out task=${task.id.value}; returning snapshot only\")
        .as(snapshot)
    case None =>
      ZIO.fail(A2AError.unsupportedOperation(s\"Event store load timed out for non-terminal task ${task.id.value}; poll tasks/get.\"))
  }

2. Race in EventStorePersister.enqueue between read-and-update of chains.

previous <- ZIO.succeed(chains.get(key))
current  <- Promise.make[Nothing, Unit]
_        <- ZIO.succeed(chains.update(key, current))

Two enqueue calls for the same (taskId, tenant) racing across the flatMap boundaries can each read the same previous, then each install their own current, leaving two daemons awaiting the same prior link and running in parallel — defeating the per-task ordering guarantee. The existing PushNotificationSender has the identical shape, so this isn't a new bug, but the PR widens the surface.

In practice on Scala.js the JS event loop is single-threaded and ZIO.succeed blocks aren't async, so cross-fiber interleaving between the two succeeds is unlikely — but the only places that could concurrently enqueue for the same key are exactly the new ones this PR introduces (the executor fiber + the cancelTask path between markCanceled and the cancel enqueue).

Two cheap mitigations:

  • Use a Ref.Synchronized[Map[...]] (matches the A2ARuntimeRegistry pattern in the same file) to atomically getAndUpdate. Removes the race outright.
  • Or, document the single-threaded-Scala.js assumption inline so a future JVM port doesn't silently regress ordering.

Test coverage

Coverage of the new seams is solid (precedence, cancel persistence, defect tolerance). Two gaps worth filling:

  • No assertion on chain ordering. cancellation events are persisted asserts a canceled-final event exists in stored, but not that it comes after the TaskSnapshot. With the chains race above, a regression in ordering wouldn't fail this test. Adding assertEquals(stored.last, canceledStatusUpdate) (or asserting a TaskSnapshot precedes the canceled update) would lock the contract the chain exists to provide.
  • storeRef in the precedence test is unused as an assertion. The store's append still gets called on the live publish path even when replayProvider is configured — asserting storeRef.get.nonEmpty would prove dual-write still works during a migration (which is one of the documented use cases for wiring both).
  • No test for the eventStore load-timeout path (relevant if you take Add simplified structured output derivation with @description support #1 above).

Minor

  • eventReplayLimit is now overloaded for two purposes — the in-memory A2AEventBus history cap and the eventStore.load limit. Worth either renaming or splitting into eventStoreReplayLimit: Option[Int] = None (defaulting to eventReplayLimit) so a deployment can set, e.g., 100 in-memory + 10000 durable.
  • EventStorePersister's daemon will pile up unbounded fibers if the store is consistently failing/timing out (each enqueue forks unconditionally). The 2 s append timeout drains them, but a pathological store that makes 1000 events/s arrive while every append times out would hold ~2000 fibers concurrently. Probably fine in practice, worth a one-liner in the scaladoc.

Compatibility

Option = None defaults preserve existing behavior — confirmed by the unchanged tests around the new ones still passing.

Nice work overall. The two correctness items (#1 silent timeout, #2 chain race) are the only ones I'd want addressed before merge.

@claude
Copy link
Copy Markdown

claude Bot commented May 8, 2026

Review — A2A durable event store + replay provider

Solid, focused PR. The design cleanly slots into the existing per-task-key delivery-chain pattern, the docstrings (the durableReplay decision matrix, the precedence note on A2AReplayProvider) are excellent, and the test coverage is broad — both seams set, only eventStore, only replayProvider, broken store, cancellation, and provider precedence. Switching the suite from Random.nextInt ports to port = 0 is a real reliability win and makes the new A2AServerLive.url change pull its weight.

A few items worth a look:

1. Potential deadlock if enqueue is interrupted between map insert and forkDaemon (also affects PushNotificationSender)

In EventStorePersister.enqueue (src/com/tjclp/scalagent/a2a/A2AServer.scala:519-534):

_        <- ZIO.succeed(chains.update(key, current))
_ <-
  (previous.fold(ZIO.unit)(_.await) *> appendOnce(event, tenant))
    .ensuring(...)
    .forkDaemon

If the calling fiber is interrupted between chains.update(key, current) and the forkDaemon, current is registered as the new tail of the chain but no daemon will ever fulfill it. The next enqueue for that key reads previous = Some(current) and previous.await waits forever — every subsequent append for that (taskId, tenant) deadlocks, and the terminal awaitPersist in ResultManager.publish then blocks forever on final events.

PushNotificationSender has the same shape, so this is a pre-existing latent bug being duplicated. A small ZIO.uninterruptibleMask around the insert+fork would close it:

ZIO.uninterruptibleMask { restore =>
  for
    previous <- ZIO.succeed(chains.get(key))
    current  <- Promise.make[Nothing, Unit]
    _        <- ZIO.succeed(chains.update(key, current))
    _ <- (previous.fold(ZIO.unit)(_.await) *> appendOnce(event, tenant))
            .ensuring(...)
            .forkDaemon
  yield current.await
}

In ZIO the for-comprehension is a sequence of interruptible flatMaps, so this is not just theoretical — but admittedly hard to hit on Scala.js. Worth fixing in both classes.

2. durableReplay doesn't trap defects from store.load

appendOnce is rigorously hardened (.timeout + .catchAllCause → log warning), and the trait doc explicitly frames persistence as "best-effort." The load path in durableReplay is not symmetric (src/com/tjclp/scalagent/a2a/A2AServer.scala:777-790):

store.load(task.id, context.tenant, config.eventReplayLimit)
  .timeout(config.eventStoreLoadTimeout)
  .flatMap { ... }

A misbehaving impl (JS throw, NPE, ZIO.die) on load will propagate the cause and fail the entire resubscribe — exactly the failure mode the append path is engineered to avoid. The "event store failures do not break execution" test only exercises send, never resubscribe, so this regression would not be caught.

Either:

  • Mirror the append's .catchAllCause on load and treat a load failure like an empty result (then fall through to the terminal-or-fail branch), or
  • Document explicitly that load defects are surfaced to the caller (asymmetric with append)

3. Wire-format behavior change for TaskStatusUpdate

A2AResponse.scala:174 now emits "final": true whenever isFinal is set. The previous encoder dropped the value entirely (case TaskStatusUpdate(id, contextId, status, _, metadata) =>). This is a correctness fix for A2A protocol conformance — clients are allowed to consult final to know they can close the SSE stream — but it is technically a behavior change for any downstream that was treating its absence as "always non-final." The PR description doesn't call this out; worth a one-liner in the release notes / changelog.

4. Race window between bus close and runtime-entry removal

The "replayProvider precedence" test (line ~902 in A2AServerOperationSpec) relies on runtimeRegistry.bus(key) returning None after client.send blocks until terminal. Walking startExecution's .ensuring chain (A2AServer.scala:881-885):

  1. manager.publish(final) returns (after awaiting the persist barrier and bus.publish)
  2. manager.finish closes the bus
  3. runtimeRegistry.remove(key) removes the entry

waitForFinal returns at step 1 (terminal event arrives via the bus stream), and the HTTP response is built and returned to the client. There is a small window where the bus is closed but still in the registry. In that window, resubscribe would take the Some(bus) branch with ZStream.fromIterable(history) ++ ZStream.empty, replay the history, and never call the provider — the test would fail without the provider sentinel.

Empirically the HTTP roundtrip is enough latency that this doesn't fire, but it is fragile. Two options:

  • Reorder ensuring so the registry entry is removed before bus.finish (and accept that an in-flight resubscribe might miss the final event delivered to subscribers — which is exactly what durableReplay is for, so this is fine), or
  • Add a small retry/waitFor helper in the test so it isn't timing-dependent

5. Test code duplication

Four tests instantiate an anonymous A2AEventStore with Ref-backed append/load. Extracting a def recordingStore(ref: Ref[Vector[StreamEvent]], appendDelay: Duration = Duration.Zero): A2AEventStore helper at the top of A2AServerOperationSpec would shrink each test by ~12 lines and make the assertions easier to read. Not a blocker.

Smaller notes

  • A2AServerLive.url (line 1042) builds s"http://\${config.host}:\$actualPort" from the configured host. If a deployment passes host = "0.0.0.0" (common for containers), the URL is unreachable for external discovery. Pre-existing — flagging because the new port = 0 flow makes this code more load-bearing.
  • assert(canceledFinal.contains(true), ...) in the cancel-persistence test is technically correct (it's a Vector[Boolean]) but reads as if you're checking string contents. assert(canceledFinal.exists(identity)) is clearer.
  • appendTimeout = 2.seconds default is tight for any cross-region store. Configurable, so OK, but worth noting in the trait doc that real deploys (Modal Dict) will likely want to bump this.
  • A2AReplayProvider.replay returns ZStream[Any, Throwable, ...]. The trait doc is silent on what happens if the provider stream errors mid-replay — it'll surface to the SSE consumer. Either documenting the contract ("provider errors close the resubscribe stream as failures") or wrapping with a .catchAll + warning log would harden this seam.

Verdict

Approve with the caveats above. (1) and (2) are the substantive ones — both are robustness gaps in a feature whose entire point is "survive infrastructure failure." (3) deserves a release-note callout. The rest are polish.

@arcaputo3 arcaputo3 merged commit 38731ff into main May 9, 2026
3 checks passed
@arcaputo3 arcaputo3 deleted the a2a-event-store-replay-provider branch May 9, 2026 00:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant