Skip to content

Commit 8268469

Browse files
committed
Make framing configurable in jsonrpc server transport, because MCP requires newline framing.
1 parent 278e38d commit 8268469

2 files changed

Lines changed: 79 additions & 19 deletions

File tree

finecode_jsonrpc/src/finecode_jsonrpc/server_transport.py

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,17 @@ class ServerStdioTransport:
2929
3030
Reading uses short timeouts so that :meth:`stop` can interrupt the loop
3131
gracefully without waiting for the next byte from stdin.
32+
33+
*framing* controls the wire format:
34+
35+
- ``"content-length"`` (default): LSP-style ``Content-Length`` header framing.
36+
- ``"newline"``: newline-delimited JSON — one JSON object per line, no headers.
37+
Required by the MCP stdio transport spec.
3238
"""
3339

34-
def __init__(self, readable_id: str = "") -> None:
40+
def __init__(self, readable_id: str = "", framing: str = "content-length") -> None:
3541
self._readable_id = readable_id
42+
self._framing = framing
3643
self._stop_event = asyncio.Event()
3744
self._out_queue: asyncio.Queue[bytes | None] = asyncio.Queue()
3845
self._on_message: (
@@ -130,13 +137,16 @@ async def stop(self) -> None:
130137
# ------------------------------------------------------------------
131138

132139
def send(self, message: dict[str, typing.Any]) -> None:
133-
"""Serialize *message* to JSON with Content-Length header and enqueue."""
134-
body = json.dumps(message)
135-
header = (
136-
f"Content-Length: {len(body)}\r\n"
137-
f"Content-Type: {CONTENT_TYPE}; charset={CHARSET}\r\n\r\n"
138-
)
139-
data = (header + body).encode(CHARSET)
140+
"""Serialize *message* and enqueue for writing."""
141+
if self._framing == "newline":
142+
data = (json.dumps(message) + "\n").encode(CHARSET)
143+
else:
144+
body = json.dumps(message)
145+
header = (
146+
f"Content-Length: {len(body)}\r\n"
147+
f"Content-Type: {CONTENT_TYPE}; charset={CHARSET}\r\n\r\n"
148+
)
149+
data = (header + body).encode(CHARSET)
140150

141151
if self._loop is not None and self._loop.is_running():
142152
self._loop.call_soon_threadsafe(self._out_queue.put_nowait, data)
@@ -164,6 +174,66 @@ async def _write_messages(self, write_transport: asyncio.BaseTransport) -> None:
164174
async def _read_messages(self, reader: asyncio.StreamReader) -> None:
165175
"""Read messages from stdin with short timeouts to allow graceful stop."""
166176
logger.debug(f"Start reading messages | {self._readable_id}")
177+
178+
if self._framing == "newline":
179+
await self._read_messages_newline(reader)
180+
else:
181+
await self._read_messages_content_length(reader)
182+
183+
logger.debug(f"End reading messages | {self._readable_id}")
184+
185+
if self._on_exit is not None:
186+
try:
187+
await self._on_exit()
188+
except Exception as exc:
189+
logger.exception(
190+
f"Error in exit handler | {self._readable_id}: {exc}"
191+
)
192+
193+
async def _read_messages_newline(self, reader: asyncio.StreamReader) -> None:
194+
"""Read newline-delimited JSON messages (MCP stdio transport)."""
195+
try:
196+
while not self._stop_event.is_set():
197+
try:
198+
line = await asyncio.wait_for(reader.readline(), timeout=0.1)
199+
except asyncio.TimeoutError:
200+
continue
201+
except (ValueError, ConnectionResetError) as exc:
202+
logger.warning(f"Read error | {self._readable_id}: {exc}")
203+
break
204+
205+
if not line:
206+
if reader.at_eof():
207+
logger.debug(f"Reader reached EOF | {self._readable_id}")
208+
break
209+
continue
210+
211+
line = line.strip()
212+
if not line:
213+
continue
214+
215+
try:
216+
message = json.loads(line)
217+
except json.JSONDecodeError as exc:
218+
logger.error(f"JSON parse error | {self._readable_id}: {exc}")
219+
continue
220+
221+
if not isinstance(message, dict):
222+
logger.error(f"Expected dict message | {self._readable_id}")
223+
continue
224+
225+
if self._on_message is not None:
226+
try:
227+
await self._on_message(message)
228+
except Exception as exc:
229+
logger.exception(
230+
f"Error in message handler | {self._readable_id}: {exc}"
231+
)
232+
except asyncio.CancelledError:
233+
pass
234+
235+
async def _read_messages_content_length(self, reader: asyncio.StreamReader) -> None:
236+
"""Read Content-Length-framed JSON messages (LSP-style)."""
167237
content_length = 0
168238

169239
try:
@@ -226,16 +296,6 @@ async def _read_messages(self, reader: asyncio.StreamReader) -> None:
226296
except asyncio.CancelledError:
227297
pass
228298

229-
logger.debug(f"End reading messages | {self._readable_id}")
230-
231-
if self._on_exit is not None:
232-
try:
233-
await self._on_exit()
234-
except Exception as exc:
235-
logger.exception(
236-
f"Error in exit handler | {self._readable_id}: {exc}"
237-
)
238-
239299

240300
class TcpServerTransport:
241301
"""Server-side transport wrapping a TCP connection accepted by asyncio.start_server.

src/finecode/mcp_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ def start(workdir: pathlib.Path, port_file: pathlib.Path | None = None) -> None:
400400

401401
async def _run() -> None:
402402
global _session
403-
transport = finecode_jsonrpc.ServerStdioTransport(readable_id="mcp_server")
403+
transport = finecode_jsonrpc.ServerStdioTransport(readable_id="mcp_server", framing="newline")
404404
_session = finecode_jsonrpc.JsonRpcServerSession()
405405
_session.attach(transport)
406406
_session.on_request("initialize", _handle_initialize)

0 commit comments

Comments
 (0)