From 8cfc3127e855f5229a6893053cc1470b40c0efd7 Mon Sep 17 00:00:00 2001 From: Daniel van Flymen Date: Thu, 21 May 2026 15:34:30 +0200 Subject: [PATCH 1/2] Clean up CLI and Docs --- README.md | 108 ++++++++++- examples/local_mdns_swarm/ask.py | 1 + synapse_p2p/cli.py | 284 +++++++++++++++++++++++++--- synapse_p2p/node.py | 68 +++++-- synapse_p2p/tests/test_broadcast.py | 168 ++++++++++++++++ synapse_p2p/tests/test_cli.py | 82 +++++++- synapse_p2p/tests/test_cli_watch.py | 11 +- 7 files changed, 675 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index d372f84..8835cf5 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,33 @@ Synapse is not an agent framework. It is the clean network layer underneath one. --- +## Index + +- [Install](#install) +- [Why Synapse](#why) +- [30-second RPC](#30-second-rpc) +- [Swarms](#swarms) +- [Discovery](#local-discovery-mdns) + - [Local discovery: mDNS](#local-discovery-mdns) + - [Remote discovery: seeds](#remote-discovery-seeds) +- [Capabilities](#capabilities) +- [Ask](#ask) +- [Broadcast conversations](#broadcast) +- [Heartbeats and liveness](#heartbeats) +- [CLI](#cli) + - [`sn watch`](#sn-watch) + - [`sn broadcast`](#sn-broadcast) + - [`sn list-swarms`](#sn-list-swarms) +- [Typed peer API](#typed-peer-api) +- [Examples](#examples) +- [Built-in endpoints](#built-in-endpoints) +- [Wire protocol](#wire-protocol) +- [Logging](#logging) +- [What Synapse is not](#what-synapse-is-not) +- [Roadmap](#roadmap) + +--- + ## Install ```bash @@ -68,6 +95,8 @@ The goal is not to decide how agents think. The goal is to let them communicate. ## 30-second RPC +This is the smallest useful Synapse program: one node exposes an RPC endpoint, and one client calls it. + Create a node: ```python @@ -104,7 +133,7 @@ asyncio.run(main()) ## Swarms -A swarm is a group of nodes with the same swarm name. +A swarm is a group of nodes with the same swarm name. Nodes only join and heartbeat peers in the same swarm. ```python node = Node( @@ -127,6 +156,8 @@ Need subgroups? Use optional `team`. It defaults to `"default"`. ## Local discovery: mDNS +Use mDNS for local, zero-config discovery on the same LAN. + For local machines on the same network: ```python @@ -151,10 +182,14 @@ _synapse._tcp.local. mDNS is local by design. It usually does not cross routers, VPN boundaries, or restrictive firewalls. +[SCREENSHOT: Add two terminals running local mDNS nodes and a third terminal running `sn watch foo.electron.network`, showing peers appearing automatically.] + --- ## Remote discovery: seeds +Use seeds when mDNS is not enough: private networks, remote machines, explicit bootstrap nodes, or internet-reachable hosts. + For private networks or internet-reachable hosts, use seeds: ```python @@ -168,7 +203,7 @@ await node.start() await node.join() ``` -A seed is just another Synapse node. Once joined, nodes exchange known peers. +A seed is just another Synapse node. It is a first contact point, not a coordinator. Once joined, nodes exchange known peers and can talk directly. By default, nodes listen on `0.0.0.0` and advertise an auto-detected reachable local address. For same-machine-only experiments, use `bind="127.0.0.1"`. @@ -237,7 +272,7 @@ result = await Client.from_peer(peer).call( ## Broadcast -Broadcast starts a swarm conversation. +Broadcast starts a swarm conversation. It is the simplest way to ask the whole swarm a question and let any capable node reply. ```python broadcast = await node.broadcast("team.question", "Who can review this diff?") @@ -309,31 +344,88 @@ Offline means “not seen within `peer_timeout`.” ## CLI +The CLI is `sn`. + +```bash +sn --help +``` + +`sn` uses mDNS by default, so local swarms work with zero configuration. Use `--seed host:port` for seed discovery, or `--no-mdns` to disable local discovery. + +[SCREENSHOT: Add the CLI `sn --help` output showing the available `watch`, `broadcast`, and `list-swarms` commands.] + +### `sn watch` + Watch a swarm live: ```bash sn watch foo.electron.network ``` -Broadcast and stream replies: +`sn watch` opens an in-place terminal dashboard: + +- left pane: swarm name, this watcher, peers, online dots, addresses, capabilities +- right pane: chatter/debug log for joins, messages, replies, offline events, and optional heartbeats + +Peer dots: + +| Dot | Meaning | +| --- | --- | +| bright green | fresh join/heartbeat pulse | +| muted green | online | +| yellow | stale, waiting for timeout | +| red | offline | + +Useful options: + +```bash +sn watch foo.electron.network --show-heartbeats +sn watch foo.electron.network --seed 192.168.1.25:9000 --no-mdns +sn watch foo.electron.network --team backend +sn watch foo.electron.network --no-capabilities +``` + +[SCREENSHOT: Add the CLI `sn watch foo.electron.network` dashboard showing the swarm pane on the left, chatter pane on the right, colored JOINED/MESSAGE/REPLY/OFFLINE pills, online dots, and peer ip:port values.] + +### `sn broadcast` + +Broadcast a message to known swarm peers and stream replies: ```bash sn broadcast foo.electron.network "Who can review this diff?" ``` -Keep listening: +Keep listening for late replies: ```bash sn broadcast foo.electron.network "Who can help?" --forever ``` +Tune discovery and reply timeout: + +```bash +sn broadcast foo.electron.network "Ship status?" --discover 2 --timeout 10 +``` + +Broadcast replies are grouped by the broadcast nonce, so all agents can participate in one shared conversation. + +[SCREENSHOT: Add the CLI `sn broadcast foo.electron.network "Who can help ship this feature?"` output showing the broadcast nonce and multiple replies from coder/reviewer/product nodes.] + +### `sn list-swarms` + List local mDNS-visible swarms: ```bash sn list-swarms ``` -`sn` uses mDNS by default. Use `--seed host:port` for seed discovery, or `--no-mdns` to disable local discovery. +Scan for longer: + +```bash +sn list-swarms --seconds 5 +``` + +[SCREENSHOT: Add the CLI `sn list-swarms` output showing visible swarm names, teams, and node names.] --- @@ -359,6 +451,8 @@ from synapse_p2p import Broadcast, BroadcastReply, Capability, Client, Node, Pee See [`examples/`](./examples). +[SCREENSHOT: Add a terminal grid showing `local_mdns_swarm/reviewer.py`, `local_mdns_swarm/coder.py`, and `local_mdns_swarm/ask.py` with successful replies.] + ```bash # two nodes, one delegates to the other python examples/isolated_agents/agent_alpha.py @@ -379,6 +473,8 @@ python examples/pydantic_ai_team/ask.py The Pydantic AI example uses `TestModel` by default, so it runs without API keys. Set `PYDANTIC_AI_MODEL`, for example `openai:gpt-5.2`, to use a real model. +[SCREENSHOT: Add the Pydantic AI team example showing reviewer/coder/product nodes replying to one broadcast conversation.] + --- ## Built-in endpoints diff --git a/examples/local_mdns_swarm/ask.py b/examples/local_mdns_swarm/ask.py index b843660..e324b1d 100644 --- a/examples/local_mdns_swarm/ask.py +++ b/examples/local_mdns_swarm/ask.py @@ -40,6 +40,7 @@ async def main() -> None: print("asking swarm: Who can help ship this feature?") broadcast = await node.broadcast("team.question", "Who can help ship this feature?") + print(f"conversation: {broadcast.nonce}") replies = await wait_for_replies(broadcast, minimum=len(peers), timeout=5) if not replies: diff --git a/synapse_p2p/cli.py b/synapse_p2p/cli.py index 1dac1a3..8c29a8b 100644 --- a/synapse_p2p/cli.py +++ b/synapse_p2p/cli.py @@ -1,9 +1,13 @@ import asyncio +import time from collections import defaultdict from typing import Annotated, Any import typer +from rich.layout import Layout from rich.live import Live +from rich.markup import escape +from rich.panel import Panel from zeroconf import ServiceStateChange from zeroconf.asyncio import AsyncServiceBrowser, AsyncZeroconf @@ -17,15 +21,99 @@ no_args_is_help=True, ) +OFFLINE_PEER_RETENTION = 180 +CHATTER_LIMIT = 80 + def _parse_seed(seed: list[str] | None) -> list[Seed]: return list(seed or []) -def _peer_line(peer: Peer, *, capabilities: bool) -> str: - label = peer.name or peer.id[:8] - caps = f" caps={','.join(peer.capabilities)}" if capabilities and peer.capabilities else "" - return f"- {label:<16} {peer.address}:{peer.port:<5} {peer.node_kind.value}{caps}" +def _online_dot( + last_seen: float, + timeout: float, + *, + offline: bool = False, + pulse_window: float = 0.25, +) -> str: + if offline: + return "[red]●[/]" + age = time.time() - last_seen + if age <= pulse_window: + return "[bold bright_green]●[/]" + if age <= timeout: + return "[green3]●[/]" + return "[yellow]●[/]" + + +def _short_nonce(nonce: str) -> str: + return nonce.split("-", 1)[0] + + +def _format_result(result: Any) -> str: + if isinstance(result, dict): + source = result.get("from") + answer = result.get("answer") + if source and answer: + return f"{source}: {answer}" + if answer: + return str(answer) + return str(result) + + +def _event(kind: str, text: str) -> str: + styles = { + "joined": "bold white on dark_green", + "heartbeat": "grey50 on grey15", + "offline": "bold white on dark_red", + "message": "bold white on purple4", + "reply": "bold black on bright_cyan", + } + labels = { + "joined": "JOINED", + "heartbeat": "BEAT", + "offline": "OFFLINE", + "message": "ASK", + "reply": "REPLY", + } + style = styles.get(kind, "bold white on grey23") + label = labels.get(kind, kind.upper()) + pill = f"[{style}] {label:^7} [/]" + body = f"[grey50]{escape(text)}[/]" if kind == "heartbeat" else escape(text) + return f"{pill} {body}" + + +def _peer_addr(peer: Peer) -> str: + return f"{peer.address}:{peer.port}" + + +def _peer_name(peer: Peer) -> str: + return peer.name or peer.id[:8] + + +def _peer_label(peer: Peer) -> str: + return f"{_peer_name(peer)} @ {_peer_addr(peer)}" + + +def _peer_line( + peer: Peer, + *, + capabilities: bool, + timeout: float = 20, + offline: bool = False, + pulse_window: float = 0.4, + pulse_until: float | None = None, +) -> str: + pulsing = pulse_until is not None and time.time() <= pulse_until + last_seen = time.time() if pulsing else peer.last_seen + dot = _online_dot(last_seen, timeout, offline=offline, pulse_window=pulse_window) + name = escape(f"{_peer_name(peer):<16}") + address = escape(f"{_peer_addr(peer):<21}") + kind = escape(peer.node_kind.value) + caps = "" + if capabilities and peer.capabilities: + caps = f" [dim]caps={escape(','.join(peer.capabilities))}[/]" + return f"{dot} [bold]{name}[/] [cyan]{address}[/] [dim]{kind}[/]{caps}" def _swarm_label(swarm: str | None, team: str | None) -> str: @@ -34,29 +122,106 @@ def _swarm_label(swarm: str | None, team: str | None) -> str: return swarm or "-" -def _swarm_text(node: Node, *, capabilities: bool, events: list[str]) -> str: +def _swarm_text( + node: Node, + *, + capabilities: bool, + events: list[str], + peers: dict[str, Peer] | None = None, + offline_peer_ids: set[str] | None = None, + pulse_until: dict[str, float] | None = None, +) -> str: + visible_peers = peers if peers is not None else node.peers + offline_ids = offline_peer_ids or set() + pulses = pulse_until or {} lines = [ - f"watching {_swarm_label(node.swarm, node.team)}", - f"self: {node.name or node.node_id[:8]} @ {node.address}:{node.port}", + f"[bold]watching[/] [cyan]{escape(_swarm_label(node.swarm, node.team))}[/]", + f"[green3]●[/] self: [bold]{escape(node.name or node.node_id[:8])}[/] " + f"@ [cyan]{escape(f'{node.address}:{node.port}')}[/]", "", ] - if node.peers: - lines.append("peers:") - for peer in sorted(node.peers.values(), key=lambda item: item.name or item.id): - lines.append(_peer_line(peer, capabilities=capabilities)) + if visible_peers: + lines.append("[bold]peers[/]") + for peer in sorted(visible_peers.values(), key=lambda item: item.name or item.id): + lines.append( + _peer_line( + peer, + capabilities=capabilities, + timeout=node.peer_timeout, + offline=peer.id in offline_ids, + pulse_until=pulses.get(peer.id), + ) + ) else: - lines.append("peers: none yet") + lines.append("[dim]peers: none yet[/]") if events: lines.extend(["", "events:", *events[-8:]]) - lines.extend(["", "press Ctrl+C to stop"]) + lines.extend(["", "[dim]press Ctrl+C to stop[/]"]) return "\n".join(lines) -def _render_swarm(node: Node, *, capabilities: bool, events: list[str]) -> str: - return _swarm_text(node, capabilities=capabilities, events=events) +def _chatter_text(events: list[str]) -> str: + if not events: + return "quiet so far" + return "\n".join(events[-CHATTER_LIMIT:]) + + +def _render_watch( + node: Node, + *, + capabilities: bool, + events: list[str], + peers: dict[str, Peer] | None = None, + offline_peer_ids: set[str] | None = None, + pulse_until: dict[str, float] | None = None, +) -> Layout: + layout = Layout(name="watch") + layout.split_row( + Layout( + Panel( + _swarm_text( + node, + capabilities=capabilities, + events=[], + peers=peers, + offline_peer_ids=offline_peer_ids, + pulse_until=pulse_until, + ), + title="[bold cyan]swarm[/]", + border_style="cyan", + ), + name="swarm", + ratio=1, + ), + Layout( + Panel(_chatter_text(events), title="[bold magenta]chatter[/]", border_style="magenta"), + name="chatter", + ratio=1, + ), + ) + return layout + + +def _render_swarm( + node: Node, + *, + capabilities: bool, + events: list[str], + peers: dict[str, Peer] | None = None, + offline_peer_ids: set[str] | None = None, + pulse_until: dict[str, float] | None = None, +) -> Layout: + return _render_watch( + node, + capabilities=capabilities, + events=events, + peers=peers, + offline_peer_ids=offline_peer_ids, + pulse_until=pulse_until, + ) async def _watch( @@ -67,6 +232,7 @@ async def _watch( mdns: bool, capabilities: bool, interval: float, + show_heartbeats: bool, ) -> None: node = Node( name=name, @@ -76,37 +242,103 @@ async def _watch( capabilities=["watch"], seeds=seeds, mdns=mdns, + heartbeat_interval=2, + peer_timeout=6, ) events: list[str] = [] + seen_peers: dict[str, Peer] = {} + offline_peer_ids: set[str] = set() + offline_since: dict[str, float] = {} + pulse_until: dict[str, float] = {} @node.endpoint("synapse.message", description="Receive a CLI swarm message") async def receive(message: str, broadcast: Broadcast) -> dict[str, str]: - events.append(f"message {broadcast.nonce} from {broadcast.origin.name}: {message}") + events.append( + _event("message", f"{broadcast.nonce} from {_peer_label(broadcast.origin)}: {message}") + ) await node.reply(broadcast, {"received_by": node.name}) return {"ok": "true"} @node.on("peer.joined") async def joined(peer: Peer) -> None: - events.append(f"joined: {peer.name or peer.id[:8]} @ {peer.address}:{peer.port}") + seen_peers[peer.id] = peer + offline_peer_ids.discard(peer.id) + offline_since.pop(peer.id, None) + pulse_until[peer.id] = time.time() + 0.4 + events.append(_event("joined", _peer_label(peer))) + + @node.on("peer.heartbeat") + async def heartbeat(peer: Peer) -> None: + seen_peers[peer.id] = peer + offline_peer_ids.discard(peer.id) + offline_since.pop(peer.id, None) + pulse_until[peer.id] = time.time() + 0.4 + if show_heartbeats: + events.append(_event("heartbeat", f"{_peer_name(peer)} {_peer_addr(peer)}")) @node.on("peer.offline") async def offline(peer: Peer) -> None: - events.append(f"offline: {peer.name or peer.id[:8]}") + seen_peers[peer.id] = peer + offline_peer_ids.add(peer.id) + offline_since[peer.id] = time.time() + pulse_until.pop(peer.id, None) + events.append(_event("offline", _peer_label(peer))) + + @node.on("broadcast.received") + async def broadcast_received(event: dict[str, Any]) -> None: + broadcast = event["broadcast"] + args = " ".join(str(arg) for arg in event.get("args", [])) + text = ( + f"{_short_nonce(broadcast.nonce)} {_peer_name(broadcast.origin)} → " + f"{event['endpoint']}: {args}" + ) + events.append(_event("message", text)) @node.on("broadcast.reply") async def reply(reply: BroadcastReply) -> None: - events.append(f"reply {reply.nonce} from {reply.peer.name}: {reply.result}") + text = ( + f"{_short_nonce(reply.nonce)} {_peer_name(reply.peer)} → " + f"{_format_result(reply.result)}" + ) + events.append(_event("reply", text)) await node.start() await node.join() try: with Live( - _render_swarm(node, capabilities=capabilities, events=events), - refresh_per_second=4, + _render_swarm( + node, + capabilities=capabilities, + events=events, + peers=seen_peers, + offline_peer_ids=offline_peer_ids, + pulse_until=pulse_until, + ), + refresh_per_second=8, screen=True, ) as live: while True: - live.update(_render_swarm(node, capabilities=capabilities, events=events)) + now = time.time() + expired = [ + peer_id + for peer_id, since in offline_since.items() + if now - since > OFFLINE_PEER_RETENTION + ] + for peer_id in expired: + offline_since.pop(peer_id, None) + offline_peer_ids.discard(peer_id) + seen_peers.pop(peer_id, None) + + live.update( + _render_swarm( + node, + capabilities=capabilities, + events=events, + peers=seen_peers, + offline_peer_ids=offline_peer_ids, + pulse_until=pulse_until, + ) + ) await asyncio.sleep(interval) finally: await node.stop() @@ -136,9 +368,15 @@ def watch( float, typer.Option("--interval", help="Refresh interval in seconds"), ] = 0.5, + show_heartbeats: Annotated[ + bool, + typer.Option("--show-heartbeats", help="Show heartbeat events in chatter"), + ] = False, ) -> None: """Watch a swarm in real time.""" - asyncio.run(_watch(swarm, team, name, _parse_seed(seed), mdns, capabilities, interval)) + asyncio.run( + _watch(swarm, team, name, _parse_seed(seed), mdns, capabilities, interval, show_heartbeats) + ) async def _broadcast( diff --git a/synapse_p2p/node.py b/synapse_p2p/node.py index 0932ab1..b28c4a0 100644 --- a/synapse_p2p/node.py +++ b/synapse_p2p/node.py @@ -150,15 +150,24 @@ def _normalize_seed(self, seed: Seed) -> tuple[str, int]: return address, int(port) async def _dispatch(self, rpc: RPCRequest, connection: Connection): + kwargs = dict(rpc.kwargs) + if "broadcast" in kwargs and isinstance(kwargs["broadcast"], dict): + kwargs["broadcast"] = Broadcast.from_dict(kwargs["broadcast"]) + self._emit_lifecycle( + "broadcast.received", + { + "endpoint": rpc.endpoint, + "broadcast": kwargs["broadcast"], + "args": list(rpc.args), + "kwargs": {key: value for key, value in kwargs.items() if key != "broadcast"}, + }, + ) + endpoint = self.endpoint_directory.get(rpc.endpoint) if endpoint is None: raise InvalidMessageError(f"Unregistered endpoint called: {rpc.endpoint}") signature = inspect.signature(endpoint) - kwargs = dict(rpc.kwargs) - if "broadcast" in kwargs and isinstance(kwargs["broadcast"], dict): - kwargs["broadcast"] = Broadcast.from_dict(kwargs["broadcast"]) - injected = {"rpc": rpc, "connection": connection} for name, value in injected.items(): if name in signature.parameters: @@ -305,7 +314,7 @@ def on(self, event: str) -> Callable: """Register a lifecycle event handler. Supported events are ``peer.joined``, ``peer.heartbeat``, ``peer.offline``, - and ``broadcast.reply``. + ``broadcast.received``, and ``broadcast.reply``. """ def decorator( @@ -397,12 +406,23 @@ async def send(peer: Peer) -> None: return broadcast async def reply(self, broadcast: Broadcast, result: Any) -> None: - """Reply to a broadcast using its nonce and origin peer.""" - await Client.from_peer(broadcast.origin).call( - "_synapse.broadcast.reply", - broadcast.nonce, - self.self_peer().to_dict(), - result=result, + """Reply to a broadcast using its nonce and share the reply with known peers.""" + recipients = {broadcast.origin.id: broadcast.origin} + for peer in self.peers.values(): + recipients.setdefault(peer.id, peer) + recipients.pop(self.node_id, None) + + async def send(peer: Peer) -> None: + await Client.from_peer(peer).call( + "_synapse.broadcast.reply", + broadcast.nonce, + self.self_peer().to_dict(), + result=result, + ) + + await asyncio.gather( + *(send(peer) for peer in recipients.values()), + return_exceptions=True, ) def replies(self, broadcast: Broadcast | str) -> list[BroadcastReply]: @@ -493,10 +513,34 @@ async def heartbeat(peer: dict) -> dict: async def broadcast_reply(nonce: str, peer: dict, result=None) -> dict: incoming = Peer.from_dict(peer) self._validate_peer_membership(incoming) + + replies = self.broadcast_replies.setdefault(nonce, []) + if any(reply.peer.id == incoming.id for reply in replies): + return {"ok": True} + self.add_peer(incoming) reply = BroadcastReply(nonce=nonce, peer=incoming, result=result) - self.broadcast_replies.setdefault(nonce, []).append(reply) + replies.append(reply) self._emit_lifecycle("broadcast.reply", reply) + + recipients = { + known.id: known + for known in self.peers.values() + if known.id not in {self.node_id, incoming.id} + } + + async def forward(known: Peer) -> None: + await Client.from_peer(known).call( + "_synapse.broadcast.reply", + nonce, + incoming.to_dict(), + result=result, + ) + + await asyncio.gather( + *(forward(known) for known in recipients.values()), + return_exceptions=True, + ) return {"ok": True} def endpoint( diff --git a/synapse_p2p/tests/test_broadcast.py b/synapse_p2p/tests/test_broadcast.py index 08f13ce..7b17ded 100644 --- a/synapse_p2p/tests/test_broadcast.py +++ b/synapse_p2p/tests/test_broadcast.py @@ -53,6 +53,174 @@ async def answer(question: str, broadcast: Broadcast) -> dict: await origin.stop() +@pytest.mark.asyncio +async def test_broadcast_received_event_fires_for_unknown_endpoint(): + watcher = Node( + name="watcher", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + origin = Node( + name="origin", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + received = asyncio.Queue() + + @watcher.on("broadcast.received") + async def on_received(event: dict) -> None: + await received.put(event) + + await watcher.start() + await origin.start() + origin.add_peer(watcher.self_peer()) + + try: + broadcast = await origin.broadcast("team.question", "who can help?") + event = await asyncio.wait_for(received.get(), 0.2) + + assert event["endpoint"] == "team.question" + assert event["args"] == ["who can help?"] + assert event["broadcast"].nonce == broadcast.nonce + finally: + await origin.stop() + await watcher.stop() + + +@pytest.mark.asyncio +async def test_broadcast_reply_is_shared_with_known_watchers(): + origin = Node( + name="origin", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + worker = Node( + name="worker", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + watcher = Node( + name="watcher", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + watcher_reply = asyncio.Queue() + + @watcher.on("broadcast.reply") + async def on_watcher_reply(reply: BroadcastReply) -> None: + await watcher_reply.put(reply) + + @worker.endpoint("team.question") + async def answer(question: str, broadcast: Broadcast) -> dict: + await worker.reply(broadcast, {"answer": question}) + return {"accepted": True} + + await origin.start() + await worker.start() + await watcher.start() + origin.add_peer(worker.self_peer()) + worker.add_peer(watcher.self_peer()) + + try: + broadcast = await origin.broadcast("team.question", "who can help?") + reply = await asyncio.wait_for(watcher_reply.get(), 0.2) + + assert reply.nonce == broadcast.nonce + assert reply.peer.name == "worker" + assert reply.result == {"answer": "who can help?"} + finally: + await watcher.stop() + await worker.stop() + await origin.stop() + + +@pytest.mark.asyncio +async def test_broadcast_reply_endpoint_forwards_replies_to_known_watchers(): + origin = Node( + name="origin", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + worker = Node( + name="worker", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + watcher = Node( + name="watcher", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + watcher_reply = asyncio.Queue() + + @watcher.on("broadcast.reply") + async def on_watcher_reply(reply: BroadcastReply) -> None: + await watcher_reply.put(reply) + + await origin.start() + await worker.start() + await watcher.start() + origin.add_peer(watcher.self_peer()) + + try: + await Client.from_peer(origin.self_peer()).call( + "_synapse.broadcast.reply", + "nonce", + worker.self_peer().to_dict(), + result={"answer": "forwarded"}, + ) + reply = await asyncio.wait_for(watcher_reply.get(), 0.2) + + assert reply.nonce == "nonce" + assert reply.peer.name == "worker" + assert reply.result == {"answer": "forwarded"} + finally: + await watcher.stop() + await worker.stop() + await origin.stop() + + +@pytest.mark.asyncio +async def test_broadcast_reply_endpoint_deduplicates_forwarded_replies(): + node = Node( + name="node", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + worker = Node( + name="worker", + swarm="foo.electron.network", + bind="127.0.0.1", + heartbeat_interval=None, + ) + + await node.start() + await worker.start() + + try: + for _ in range(2): + await Client.from_peer(node.self_peer()).call( + "_synapse.broadcast.reply", + "nonce", + worker.self_peer().to_dict(), + result={"answer": "once"}, + ) + + assert len(node.replies("nonce")) == 1 + finally: + await worker.stop() + await node.stop() + + @pytest.mark.asyncio async def test_broadcast_reply_endpoint_validates_membership(): origin = Node( diff --git a/synapse_p2p/tests/test_cli.py b/synapse_p2p/tests/test_cli.py index a403952..1dd6912 100644 --- a/synapse_p2p/tests/test_cli.py +++ b/synapse_p2p/tests/test_cli.py @@ -1,13 +1,31 @@ +import time + import pytest from synapse_p2p import Broadcast, Node, Peer -from synapse_p2p.cli import _broadcast, _parse_seed, _peer_line, _swarm_label, _swarm_text +from synapse_p2p.cli import ( + CHATTER_LIMIT, + OFFLINE_PEER_RETENTION, + _broadcast, + _chatter_text, + _format_result, + _parse_seed, + _peer_label, + _peer_line, + _short_nonce, + _swarm_label, + _swarm_text, +) def test_parse_seed_defaults_to_empty_list(): assert _parse_seed(None) == [] +def test_offline_peer_retention_is_a_few_minutes(): + assert OFFLINE_PEER_RETENTION == 180 + + def test_swarm_label_hides_default_team(): assert _swarm_label("foo.electron.network", "default") == "foo.electron.network" assert _swarm_label("foo.electron.network", "backend") == "foo.electron.network / backend" @@ -27,14 +45,74 @@ def test_swarm_text_contains_peers_and_recent_events_without_terminal_codes(): text = _swarm_text(node, capabilities=True, events=["joined: reviewer"]) - assert "watching foo.electron.network" in text + assert "watching" in text + assert "foo.electron.network" in text + assert "self:" in text assert "reviewer" in text + assert "127.0.0.1:8888" in text assert "caps=code-review" in text assert "joined: reviewer" in text assert "\033" not in text +def test_peer_label_includes_name_ip_and_port(): + peer = Peer(id="peer-1", name="reviewer", address="127.0.0.1", port=9999) + + assert _peer_label(peer) == "reviewer @ 127.0.0.1:9999" + + +def test_event_uses_readable_pill_label(): + from synapse_p2p.cli import _event + + text = _event("joined", "reviewer @ 127.0.0.1:9999") + + assert "JOINED" in text + assert "on dark_green" in text + + +def test_chatter_helpers_compact_nonce_and_result(): + assert _short_nonce("019e4ab0-1d0d-709a") == "019e4ab0" + assert _format_result({"from": "coder", "answer": "I can implement it."}) == ( + "coder: I can implement it." + ) + + +def test_chatter_text_shows_recent_events(): + events = [f"event {index}" for index in range(CHATTER_LIMIT + 5)] + + text = _chatter_text(events) + + assert f"event {CHATTER_LIMIT + 4}" in text + assert "event 0" not in text + + +def test_peer_line_shows_stale_dot_after_timeout(): + peer = Peer( + id="peer-1", + name="reviewer", + address="127.0.0.1", + port=9999, + last_seen=time.time() - 60, + ) + + assert _peer_line(peer, capabilities=False, timeout=20).startswith("[yellow]●[/]") + assert _peer_line(peer, capabilities=False, timeout=20, offline=True).startswith("[red]●[/]") + + +def test_peer_line_pulses_after_recent_heartbeat(): + peer = Peer( + id="peer-1", + name="reviewer", + address="127.0.0.1", + port=9999, + last_seen=time.time(), + ) + + assert _peer_line(peer, capabilities=False).startswith("[bold bright_green]●[/]") + assert _peer_line(peer, capabilities=False, pulse_window=0).startswith("[green3]●[/]") + + def test_peer_line_can_include_or_hide_capabilities(): peer = Peer( id="peer-1", diff --git a/synapse_p2p/tests/test_cli_watch.py b/synapse_p2p/tests/test_cli_watch.py index d2689d8..bccf210 100644 --- a/synapse_p2p/tests/test_cli_watch.py +++ b/synapse_p2p/tests/test_cli_watch.py @@ -9,11 +9,11 @@ class FakeLive: instances: list["FakeLive"] = [] - def __init__(self, renderable: str, *, refresh_per_second: int, screen: bool) -> None: + def __init__(self, renderable: Any, *, refresh_per_second: int, screen: bool) -> None: self.renderable = renderable self.refresh_per_second = refresh_per_second self.screen = screen - self.updates: list[str] = [] + self.updates: list[Any] = [] FakeLive.instances.append(self) def __enter__(self) -> "FakeLive": @@ -22,7 +22,7 @@ def __enter__(self) -> "FakeLive": def __exit__(self, exc_type: Any, exc: Any, traceback: Any) -> None: return None - def update(self, renderable: str) -> None: + def update(self, renderable: Any) -> None: self.updates.append(renderable) @@ -40,6 +40,7 @@ async def test_watch_uses_rich_live_alternate_screen(monkeypatch): False, True, 0.01, + False, ) ) await asyncio.sleep(0.03) @@ -50,4 +51,6 @@ async def test_watch_uses_rich_live_alternate_screen(monkeypatch): live = FakeLive.instances[0] assert live.screen is True assert live.updates - assert "watching foo.electron.network" in live.updates[-1] + assert isinstance(live.updates[-1], cli.Layout) + assert live.updates[-1]["swarm"] is not None + assert live.updates[-1]["chatter"] is not None From 517ab3337acea974f449c51f884f51e4a3b0a4ea Mon Sep 17 00:00:00 2001 From: Daniel van Flymen Date: Thu, 21 May 2026 15:38:33 +0200 Subject: [PATCH 2/2] Update screenshots --- README.md | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 8835cf5..60e1657 100644 --- a/README.md +++ b/README.md @@ -29,9 +29,17 @@ await node.join() Synapse gives you one primitive: **Node**. -A node can discover peers, publish capabilities, expose endpoints, receive work, broadcast questions, reply into shared conversations, heartbeat peers, and notice when peers disappear. +A node can discover peers, publish capabilities, expose endpoints, receive work, broadcast questions, reply into shared conversations, heartbeat peers, and notice when peers disappear. Synapse is not an agent framework. It is the clean network layer underneath one. -Synapse is not an agent framework. It is the clean network layer underneath one. +It also ships with a CLI tool to monitor your swarms: + +```bash +> sn watch foo.electron.network +``` + + +image + --- @@ -182,8 +190,6 @@ _synapse._tcp.local. mDNS is local by design. It usually does not cross routers, VPN boundaries, or restrictive firewalls. -[SCREENSHOT: Add two terminals running local mDNS nodes and a third terminal running `sn watch foo.electron.network`, showing peers appearing automatically.] - --- ## Remote discovery: seeds @@ -352,8 +358,6 @@ sn --help `sn` uses mDNS by default, so local swarms work with zero configuration. Use `--seed host:port` for seed discovery, or `--no-mdns` to disable local discovery. -[SCREENSHOT: Add the CLI `sn --help` output showing the available `watch`, `broadcast`, and `list-swarms` commands.] - ### `sn watch` Watch a swarm live: @@ -385,7 +389,6 @@ sn watch foo.electron.network --team backend sn watch foo.electron.network --no-capabilities ``` -[SCREENSHOT: Add the CLI `sn watch foo.electron.network` dashboard showing the swarm pane on the left, chatter pane on the right, colored JOINED/MESSAGE/REPLY/OFFLINE pills, online dots, and peer ip:port values.] ### `sn broadcast` @@ -409,7 +412,6 @@ sn broadcast foo.electron.network "Ship status?" --discover 2 --timeout 10 Broadcast replies are grouped by the broadcast nonce, so all agents can participate in one shared conversation. -[SCREENSHOT: Add the CLI `sn broadcast foo.electron.network "Who can help ship this feature?"` output showing the broadcast nonce and multiple replies from coder/reviewer/product nodes.] ### `sn list-swarms` @@ -425,7 +427,6 @@ Scan for longer: sn list-swarms --seconds 5 ``` -[SCREENSHOT: Add the CLI `sn list-swarms` output showing visible swarm names, teams, and node names.] --- @@ -451,7 +452,6 @@ from synapse_p2p import Broadcast, BroadcastReply, Capability, Client, Node, Pee See [`examples/`](./examples). -[SCREENSHOT: Add a terminal grid showing `local_mdns_swarm/reviewer.py`, `local_mdns_swarm/coder.py`, and `local_mdns_swarm/ask.py` with successful replies.] ```bash # two nodes, one delegates to the other @@ -473,7 +473,6 @@ python examples/pydantic_ai_team/ask.py The Pydantic AI example uses `TestModel` by default, so it runs without API keys. Set `PYDANTIC_AI_MODEL`, for example `openai:gpt-5.2`, to use a real model. -[SCREENSHOT: Add the Pydantic AI team example showing reviewer/coder/product nodes replying to one broadcast conversation.] ---