feat(a2a): native A2A v1 protocol implementation#42
Conversation
The ReadableStream cancel callback ran the interrupt fiber synchronously via `runtime.unsafe.run(fiber.interrupt).getOrThrowFiberFailure()`. ZIO's unsafe.run uses an internal `OneShot[T]` to wait for the result, which calls `OneShot.get` and throws under Scala.js with "Cannot block for result to be set in JavaScript" — JS is single-threaded and cannot block a synchronous call to wait for an async result. Switch to `runtime.unsafe.fork(fiber.interrupt)` — fire-and-forget. The interrupt still propagates; the JS event loop continues; the process can unwind cleanly when the example app finishes. Surfaces in any caller that aborts an SSE stream mid-flight (e.g. `tjc run --detach` from the tjc-agents Go CLI). Without this fix, the ZIO runtime crashes the Bun host process at shutdown with the OneShot error, leaving no exit code 0 path through A2AExample. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- A2ATaskStore: extract trait from InMemoryTaskStore so A2A hosts can plug a durable backend (Modal Dict, Redis, etc). Adds explicit delete; the protocol does not GC tasks implicitly. - A2AServer.Config.taskStore + A2AServerApp.taskStore/taskStoreZIO: override hooks for the durable backend. - ClaudeEventMapper: make EventMapper public + re-export at package level (parallel to CodexEventMapper). Lets external callers normalize AgentMessage to AgentEvent the same way ClaudeInterpreter does internally. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
External A2ATaskStore implementations compiled with -language:strictEquality (e.g. tjc-agents) need explicit CanEqual instances to compare TaskState enum cases or opaque ContextId/TaskId/MessageId values via ==. Add the trivial same-type givens so == "just works" in callers and stays free for callers that don't care. Also promote A2ATaskStore.applyHistoryLength to a public helper so durable backends can match the in-memory store's history projection byte-for-byte (used by ModalDictTaskStore.list in tjc-agents). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code Review — A2A v1 Native Protocol ImplementationSolid, well-structured PR. The architecture is thoughtful and tests are comprehensive. Here are findings grouped by severity. Blockers1. Empty task ID from malformed path case ("GET", taskPath) if taskPath.startsWith("/tasks/") && !taskPath.contains(":") =>
val rawId = taskPath.stripPrefix("/tasks/")
if rawId.isEmpty then Some(errorResponse(A2AError.invalidParams("Missing task ID")))
else Some(json(requestHandler.getTask(...)))2. SSRF in push notification sender private def isExternalUrl(url: String): Boolean = ...
// reject 127.x, 10.x, 192.168.x, 172.16–31.x, localhost, ::13. Unsafe val parts = pushPath.stripPrefix("/tasks/").split("/pushNotificationConfigs/", 2)
Some(json(requestHandler.getPushConfig(TaskId(parts(0)), parts(1), ...)))If the separator isn't matched, Medium Severity4. Mutable maps in the request handler hot path private val buses = mutable.Map.empty[TaskRuntimeKey, A2AEventBus]
private val activeRuns = mutable.Map.empty[TaskRuntimeKey, Fiber.Runtime[Throwable, Unit]]These are mutated across fiber boundaries. Scala.js is single-threaded but async reordering can still corrupt state (e.g., a fiber is forked but the key removal races with a retry). Prefer private val buses = Ref.make(Map.empty[TaskRuntimeKey, A2AEventBus])5. Unbounded event history in private var history = Vector.empty[A2AResponse.StreamEvent]For long-running tasks with many tool calls, this grows indefinitely. Consider capping replay history (e.g., last 1000 events) or making it configurable in 6. 7. 8. Pagination with invalid pageToken.flatMap(_.toIntOption).getOrElse(0)A non-numeric token silently falls back to offset 0. Either document that tokens must be numeric offsets, or fail explicitly with Minor / Style9. var fiber: Fiber.Runtime[Throwable, Unit] = nullUse 10. Security Notes
What's Working Well
Missing Tests
OverallArchitecture and test coverage are strong. The four blockers are all small, targeted fixes. The medium items (mutable state, unbounded history) can follow in a fast-follow PR if you want to ship quickly. Happy to re-review after the blockers are addressed. |
Code Review: feat(a2a): native A2A v1 protocol implementationThis is a large, well-structured PR (+5502/-904) that replaces the Code QualityStrengths
Issues
{ "schemes": { "bearer": { "list": ["tasks:read"] } } }But A2A v1 uses the OpenAPI flat form:
Potential Bugs
Performance
values.toList.map(_.as[A]).foldRight[Either[String, List[A]]](Right(Nil)) { ... }
Security
These reveal a developer's username and directory layout. Please remove or replace with public URLs before merge.
IPv4-translated IPv6 not blocked
Test CoverageExcellent overall. Nine test suites covering codecs, operations, streaming, validation, push notifications, and REST transport. Gaps worth noting:
API Design
Required Fixes Before Merge
Summary
Good work on this implementation — the native A2A v1 port is a significant improvement and the test coverage is commendable. Please address the four required fixes above before merging. 🤖 Generated with Claude Code |
Summary
OneShot.getcrash under Scala.js when callers abort mid-stream (7a877b1)A2ATaskStoretrait + publicClaudeEventMapperso external hosts can plug durable backends (Modal Dict, Redis) and normalize events (46814ae)CanEqualonTaskState/IDs and exposeapplyHistoryLengthfor-language:strictEqualitycallers and durable backends matching in-memory history projection (6fc6e2a)Test plan
./mill agent.testpasses (A2A spec suites: Codec, Interop, Progress, PushNotification, RestTransport, ServerOperation, ServerStream, ServerValidation, Client)./mill examples.run a2aexits cleanly (noOneShotcrash on stream cancel)tjc run --detachfrom tjc-agents against this build — SSE cancel completes without runtime crash-language:strictEqualitycan compareTaskState/TaskId/ContextId/MessageIdvia==A2ATaskStoreimpl (e.g. ModalDictTaskStore) wires throughA2AServerApp.taskStore🤖 Generated with Claude Code