Skip to content

Add WebSocket API for real-time subscriptions #17

@kitplummer

Description

@kitplummer

Summary

Add a WebSocket endpoint alongside the Connect RPC API for real-time document change subscriptions. This makes peat-node accessible from browsers, CLI tools, and lightweight integrations without any RPC client library.

Motivation

The Subscribe RPC is already server-streaming, but consuming it requires a Connect/gRPC client. A plain WebSocket endpoint streaming JSON would enable:

  • Browser-based fleet dashboards with real-time updates (no gRPC-Web complexity)
  • Simple CLI monitoring tools (websocat ws://localhost:50051/ws/subscribe)
  • Language-agnostic integrations that can open a WebSocket but don't have a Connect RPC library
  • SSE (Server-Sent Events) as an alternative for HTTP/1.1-only environments

Proposed Design

WebSocket endpoint

GET /ws/subscribe?collections=platforms,commands
Upgrade: websocket

After upgrade, the server pushes JSON messages matching the DocumentChange proto schema:

{"collection":"platforms","docId":"alpha-agent","changeType":"CHANGE_TYPE_UPSERT","jsonData":"{...}"}
{"collection":"commands","docId":"cmd-1","changeType":"CHANGE_TYPE_UPSERT","jsonData":"{...}"}

Implementation

Since we're already using Axum (via connect-rust), adding WebSocket support is straightforward:

use axum::{extract::ws::WebSocket, routing::get};

let app = axum::Router::new()
    .route("/ws/subscribe", get(ws_subscribe_handler))
    .fallback_service(connect_router.into_axum_service());

The handler would:

  1. Accept query param collections (comma-separated, empty = all)
  2. Upgrade to WebSocket
  3. Subscribe to the node's broadcast channel (same as the RPC Subscribe)
  4. Stream JSON-serialized DocumentChange messages
  5. Optionally accept client messages for dynamic filter changes

Optional: Server-Sent Events (SSE)

For environments where WebSocket isn't available (some proxies, HTTP/1.1 only):

GET /sse/subscribe?collections=platforms
Accept: text/event-stream

Axum supports SSE via axum::response::sse::Sse.

References

  • src/service.rs — existing subscribe() RPC implementation (broadcast channel → stream)
  • src/main.rs — Axum router setup (add WebSocket route alongside Connect RPC)
  • connect-rust Axum integration already in place

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions