Skip to content

feat(a2a): native A2A v1 protocol implementation#42

Merged
arcaputo3 merged 5 commits into
mainfrom
a2a-v1-native-experimental
May 5, 2026
Merged

feat(a2a): native A2A v1 protocol implementation#42
arcaputo3 merged 5 commits into
mainfrom
a2a-v1-native-experimental

Conversation

@arcaputo3
Copy link
Copy Markdown
Contributor

Summary

  • Implement native A2A v1 protocol: agent cards, JSON-RPC + REST transports, SSE streaming, push notifications, task lifecycle (4dd6e03)
  • Non-blocking fiber interrupt on SSE stream cancel — fixes OneShot.get crash under Scala.js when callers abort mid-stream (7a877b1)
  • Pluggable A2ATaskStore trait + public ClaudeEventMapper so external hosts can plug durable backends (Modal Dict, Redis) and normalize events (46814ae)
  • Derive CanEqual on TaskState/IDs and expose applyHistoryLength for -language:strictEquality callers and durable backends matching in-memory history projection (6fc6e2a)

Test plan

  • ./mill agent.test passes (A2A spec suites: Codec, Interop, Progress, PushNotification, RestTransport, ServerOperation, ServerStream, ServerValidation, Client)
  • ./mill examples.run a2a exits cleanly (no OneShot crash on stream cancel)
  • Verify tjc run --detach from tjc-agents against this build — SSE cancel completes without runtime crash
  • External consumers compiled with -language:strictEquality can compare TaskState/TaskId/ContextId/MessageId via ==
  • Custom A2ATaskStore impl (e.g. ModalDictTaskStore) wires through A2AServerApp.taskStore

🤖 Generated with Claude Code

arcaputo3 and others added 4 commits May 4, 2026 17:25
The ReadableStream cancel callback ran the interrupt fiber synchronously
via `runtime.unsafe.run(fiber.interrupt).getOrThrowFiberFailure()`. ZIO's
unsafe.run uses an internal `OneShot[T]` to wait for the result, which
calls `OneShot.get` and throws under Scala.js with
"Cannot block for result to be set in JavaScript" — JS is single-threaded
and cannot block a synchronous call to wait for an async result.

Switch to `runtime.unsafe.fork(fiber.interrupt)` — fire-and-forget. The
interrupt still propagates; the JS event loop continues; the process can
unwind cleanly when the example app finishes.

Surfaces in any caller that aborts an SSE stream mid-flight (e.g.
`tjc run --detach` from the tjc-agents Go CLI). Without this fix, the
ZIO runtime crashes the Bun host process at shutdown with the OneShot
error, leaving no exit code 0 path through A2AExample.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- A2ATaskStore: extract trait from InMemoryTaskStore so A2A hosts can
  plug a durable backend (Modal Dict, Redis, etc). Adds explicit
  delete; the protocol does not GC tasks implicitly.
- A2AServer.Config.taskStore + A2AServerApp.taskStore/taskStoreZIO:
  override hooks for the durable backend.
- ClaudeEventMapper: make EventMapper public + re-export at package
  level (parallel to CodexEventMapper). Lets external callers
  normalize AgentMessage to AgentEvent the same way ClaudeInterpreter
  does internally.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
External A2ATaskStore implementations compiled with -language:strictEquality
(e.g. tjc-agents) need explicit CanEqual instances to compare TaskState
enum cases or opaque ContextId/TaskId/MessageId values via ==. Add the
trivial same-type givens so == "just works" in callers and stays free for
callers that don't care.

Also promote A2ATaskStore.applyHistoryLength to a public helper so durable
backends can match the in-memory store's history projection byte-for-byte
(used by ModalDictTaskStore.list in tjc-agents).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented May 5, 2026

Code Review — A2A v1 Native Protocol Implementation

Solid, well-structured PR. The architecture is thoughtful and tests are comprehensive. Here are findings grouped by severity.


Blockers

1. Empty task ID from malformed path
GET /tasks/ (trailing slash, no ID) strips the prefix to an empty string and constructs TaskId(""), which silently propagates to the task store. Add a non-empty guard:

case ("GET", taskPath) if taskPath.startsWith("/tasks/") && !taskPath.contains(":") =>
  val rawId = taskPath.stripPrefix("/tasks/")
  if rawId.isEmpty then Some(errorResponse(A2AError.invalidParams("Missing task ID")))
  else Some(json(requestHandler.getTask(...)))

2. SSRF in push notification sender
PushNotificationSender calls fetch(config.url, ...) without validating the URL. A client could register http://localhost:6379 or any internal endpoint. Add a host allowlist or block private IP ranges before dispatching:

private def isExternalUrl(url: String): Boolean = ...
  // reject 127.x, 10.x, 192.168.x, 172.16–31.x, localhost, ::1

3. Unsafe split index access
In the REST push-config route:

val parts = pushPath.stripPrefix("/tasks/").split("/pushNotificationConfigs/", 2)
Some(json(requestHandler.getPushConfig(TaskId(parts(0)), parts(1), ...)))

If the separator isn't matched, parts(1) throws ArrayIndexOutOfBoundsException. Add if parts.length >= 2 before accessing parts(1).


Medium Severity

4. Mutable maps in the request handler hot path

private val buses = mutable.Map.empty[TaskRuntimeKey, A2AEventBus]
private val activeRuns = mutable.Map.empty[TaskRuntimeKey, Fiber.Runtime[Throwable, Unit]]

These are mutated across fiber boundaries. Scala.js is single-threaded but async reordering can still corrupt state (e.g., a fiber is forked but the key removal races with a retry). Prefer ZIO.Ref[Map[...]]:

private val buses = Ref.make(Map.empty[TaskRuntimeKey, A2AEventBus])

5. Unbounded event history in A2AEventBus

private var history = Vector.empty[A2AResponse.StreamEvent]

For long-running tasks with many tool calls, this grows indefinitely. Consider capping replay history (e.g., last 1000 events) or making it configurable in A2AServer.Config.

6. waitForFinal error message is misleading
If the event bus closes without ever emitting a final event, runHead returns None and the code fails with A2AError.taskNotFound(taskId) — confusing, since the task does exist. A dedicated error like "Terminal event never received for task {id}" would help with debugging.

7. applyHistoryLength duplicated
It's defined in A2ATaskStore and again in A2ARequestHandler. Pull it up to the shared trait or a companion object.

8. Pagination with invalid pageToken

pageToken.flatMap(_.toIntOption).getOrElse(0)

A non-numeric token silently falls back to offset 0. Either document that tokens must be numeric offsets, or fail explicitly with A2AError.invalidParams(s"Invalid pageToken").


Minor / Style

9. null fiber reference

var fiber: Fiber.Runtime[Throwable, Unit] = null

Use Option[Fiber.Runtime[Throwable, Unit]] = None + fiber.foreach(_.interrupt). The null-guard pattern is fragile.

10. unsupportedOperation for terminal task cancellation
Failing a cancel-of-terminal task with unsupportedOperation is semantically off. If the A2A spec has a dedicated error code for "task already terminal/not cancelable", use it. If not, invalidRequest + a clear message is clearer.


Security Notes

  • No rate limiting / body-size limit: JSON parsing is unbounded. Bun's default body limit (1 MB) helps but this should be explicit and configurable.
  • Auth not enforced on /extendedAgentCard: Document clearly that this endpoint (and the full server) must be placed behind an authenticating API gateway in production.
  • Task/config IDs not format-validated: A2ATaskStore should document that implementations are responsible for sanitizing ID inputs before use in any external storage.

What's Working Well

  • Clean A2ARequestHandler / transport separation — adding a new transport is straightforward.
  • executionOverride seam in the server config makes unit testing without a real Claude agent clean.
  • Full codec round-trip test coverage (A2ACodecSpec).
  • Tenant scoping baked into model types, not bolted on.
  • Legacy names (A2AClientV03, old field aliases) properly preserved for backward compatibility.
  • ZIO idioms are generally correct — ZIO.scoped, ZStream, forkDaemon used appropriately.

Missing Tests

  • Empty task ID path (/tasks/)
  • Push notification with a private/localhost URL (SSRF guard once added)
  • Event bus replay after 1000+ events (memory sanity check)
  • Concurrent sendMessage with the same taskId from two clients
  • Interop smoke test against @a2a-js/sdk v1 reference client/server

Overall

Architecture and test coverage are strong. The four blockers are all small, targeted fixes. The medium items (mutable state, unbounded history) can follow in a fast-follow PR if you want to ship quickly. Happy to re-review after the blockers are addressed.

@claude
Copy link
Copy Markdown

claude Bot commented May 5, 2026

Code Review: feat(a2a): native A2A v1 protocol implementation

This is a large, well-structured PR (+5502/-904) that replaces the @a2a-js/sdk-backed A2A 0.3 implementation with a fully native Scala.js A2A v1 implementation. Overall the code is high quality and broadly consistent with the project's ZIO/type-safety conventions. Below are findings organized by area.


Code Quality

Strengths

  • Http.pumpSse is correctly tail-recursive via JS Promise .then, avoiding stack overflow on Scala.js.
  • A2AEventBus.stream correctly uses ZIO.acquireRelease to remove the queue from subscribers.
  • A2ARuntimeRegistry correctly uses Ref.Synchronized for its modify-then-fork semantics.
  • PushNotificationSender serializes per-task delivery via a Promise chain, preserving ordering without a separate worker fiber.
  • CanEqual derivation and applyHistoryLength exposure are minimal, correct additions.

Issues

A2AClientLive mutable state (fragile, low risk in single-threaded JS)
A2AClientLive uses two vars — currentCard and requestId — mutated in nextRequestId() outside ZIO. Safe in Scala.js today, but a Ref[Long] would remove the abstraction mismatch and protect a future JVM port.

SecurityRequirement encoder/decoder mismatch (protocol divergence)
The encoder (AgentCard.scala) emits:

{ "schemes": { "bearer": { "list": ["tasks:read"] } } }

But A2A v1 uses the OpenAPI flat form: { "bearer": ["tasks:read"] }. The decoder accepts both forms, so round-trips within Scalagent work — but an external A2A agent receiving re-emitted cards will see a non-standard structure. This needs to be fixed before interop with other A2A v1 implementations.

sseResponse fiber ordering relies on implicit JS event-loop guarantees
In A2AServerLive.sseResponse, two mutable vars (fiber and canceled) are set in async JS callbacks. The ordering invariant (start fires synchronously before cancel can fire) is correct but extremely non-obvious. A comment explaining why this is safe is needed.


Potential Bugs

waitForFinal hangs with eventReplayLimit = 0 (latent bug)
waitForFinal subscribes to the event bus before startExecution fires, relying on A2AEventBus.history replay for fast-completing tasks. With the default eventReplayLimit = 1000 this is safe. But if a server is configured with eventReplayLimit = 0, a task that completes before the stream consumer subscribes will hang forever. Recommend adding a require(eventReplayLimit > 0) guard or a doc note on A2ARequestHandler.Config.

MessageSendConfiguration.blocking compat shim changes V03 semantics
A2ARequest.scala: def blocking: Option[Boolean] = Some(!returnImmediately) always returns Some(...). In A2AClientV03Live.send, the guard value.blocking.isDefined will now always be true, meaning the V03 client will always pass effectiveConfig = Some(value) instead of taking the second branch. Please verify this is the intended behavior and add a note.

cancelTask has a narrow window for a stale event
Between runtimeRegistry.markCanceled and the execution fiber's next isCanceled check, the fiber can still publish one more event before seeing the cancel. This is cosmetic (the event arrives before the cancel event rather than after), but noting it in a comment would help future maintainers.


Performance

foldRight stack pressure on large payloads
Several decoders use:

values.toList.map(_.as[A]).foldRight[Either[String, List[A]]](Right(Nil)) { ... }

foldRight is not tail-recursive on Scala.js for large lists. For typical A2A payloads this is fine, but replacing with foldLeft + .reverse would be safer.

InMemoryTaskStoreImpl.list is O(n)
This is acceptable for the in-memory default, and the docstring correctly warns durable implementors. No action needed, but a @note in scaladoc would help.


Security

SPEC.md leaks absolute local filesystem paths (should fix before merge)
SPEC.md lines 86-91 contain:

/Users/rcaputo3/git/A2A/specification/a2a.proto
/Users/rcaputo3/git/a2a-js

These reveal a developer's username and directory layout. Please remove or replace with public URLs before merge.

externalOnly SSRF policy is correct for RFC 1918 (minor completeness note)
The isBlockedIpv4 check correctly covers loopback, link-local, RFC 1918, and multicast. TEST-NET ranges (198.51.100.x, 203.0.113.x) are missing, but these are non-routable documentation ranges so there is no practical security impact.

IPv4-translated IPv6 not blocked
Only ::ffff: (IPv4-mapped) is checked; ::ffff:0: (IPv4-translated) is not. Bun's URL parser likely normalizes these, but a defense-in-depth comment would be helpful.

X-A2A-Notification-Token header is undocumented in the v1 spec
The header is noted as a "legacy token header used by the a2a-js reference implementation." This should be clearly documented and callers should be steered toward authentication.scheme/authentication.credentials for new code.


Test Coverage

Excellent overall. Nine test suites covering codecs, operations, streaming, validation, push notifications, and REST transport. Gaps worth noting:

  1. getTask with historyLength = -1 via JSON-RPC — validation tests cover listTasks but not getTask directly.
  2. resubscribe replay ordering — tests verify the tail of the replay buffer but not that replay events arrive before live events.
  3. listTasks multi-page pagination — no test exercises offset + nextPageToken for multi-page results.
  4. Push delivery failure path — no test for when the callback server returns non-2xx. The error is caught and dropped; a test would confirm the parent task is unaffected.
  5. SecurityRequirement encoder round-trip — add a test that a card received from an external agent with a flat OpenAPI SecurityRequirement round-trips back to the flat form (this would catch the encoder mismatch).

API Design

taskStoreZIO: Task[Option[A2ATaskStore]] is slightly opaque
None meaning "use default" is easy to misread. Consider Task[A2ATaskStore] returning A2ATaskStore.inMemory as the default — removes the Option indirection.

ClaudeEventMapper naming inconsistency
val ClaudeEventMapper = interop.claude.EventMapper is exposed at package level. A2AEventMapper (private) uses a different naming convention. If both are to be public, prefer consistent naming.

PushNotificationConfig compatibility alias is clean and correct.
The unapply always returns Some[...], which is correct for pattern matching — worth a scaladoc note.


Required Fixes Before Merge

  1. Remove absolute local paths from SPEC.md (lines 86-91) — information disclosure.
  2. Fix SecurityRequirement encoder to emit the flat OpenAPI array form instead of the { "list": [...] } wrapper, or add an explicit interop test documenting the divergence.
  3. Document the eventReplayLimit = 0 incompatibility with waitForFinal — add a require guard or a prominent scaladoc note on A2ARequestHandler.Config.
  4. Verify V03 blocking compat semantics — confirm that blocking.isDefined always evaluating to true in A2AClientV03Live.send is intentional.

Summary

Area Rating Key Finding
Code Quality Good SecurityRequirement encoder non-standard; mutable vars in client/bus (safe in JS)
Potential Bugs Moderate waitForFinal hangs at eventReplayLimit=0; V03 blocking compat semantic shift
Performance Good foldRight stack pressure on large lists; O(n) scan acceptable and documented
Security Good SPEC.md leaks local paths; SecurityRequirement encoder non-spec-compliant
Test Coverage Excellent Minor gaps: getTask validation, pagination, push failure path
API Design Good taskStoreZIO: Task[Option[...]] opaque; ClaudeEventMapper naming inconsistency

Good work on this implementation — the native A2A v1 port is a significant improvement and the test coverage is commendable. Please address the four required fixes above before merging.

🤖 Generated with Claude Code

@arcaputo3 arcaputo3 merged commit 4283020 into main May 5, 2026
3 checks passed
@arcaputo3 arcaputo3 deleted the a2a-v1-native-experimental branch May 5, 2026 21:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant