Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions src/benchflow/acp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
FsCapabilities,
InitializeParams,
InitializeResult,
McpServerSpec,
NewSessionParams,
PromptParams,
PromptResult,
Expand Down Expand Up @@ -291,9 +292,24 @@ async def initialize(self) -> InitializeResult:
)
return self._initialize_result

async def session_new(self, cwd: str = "/app") -> ACPSession:
"""Create a new agent session."""
params = NewSessionParams(cwd=cwd, mcp_servers=[])
async def session_new(
self, cwd: str = "/app", mcp_servers: list[McpServerSpec] | None = None
) -> ACPSession:
"""Create a new agent session.

``mcp_servers`` are attached to the session via ``session/new`` so the
agent connects to them (e.g. a task-declared Playwright MCP). Each spec
is projected to its per-transport wire shape by
:meth:`McpServerSpec.to_new_session_param`. ``None`` attaches no
servers — the historical benchmark default.
"""
server_params = [spec.to_new_session_param() for spec in mcp_servers or []]
# Validate through model_validate (not the constructor) so the SDK's
# discriminated mcp_servers union coerces each per-transport dict — the
# constructor's static type rejects the dict list.
params = NewSessionParams.model_validate(
{"cwd": cwd, "mcpServers": server_params}
)
result = await self._send_request(
"session/new", params.model_dump(by_alias=True, exclude_none=True)
)
Expand All @@ -308,10 +324,18 @@ async def session_new(self, cwd: str = "/app") -> ACPSession:
return self._session

async def session_load(
self, session_id: str, cwd: str = "/app"
self,
session_id: str,
cwd: str = "/app",
mcp_servers: list[McpServerSpec] | None = None,
) -> ACPSession: # ACP spec; unused until session resume is wired
"""Load an existing session (used by agents like openclaw that need pre-created sessions)."""
params = {"sessionId": session_id, "cwd": cwd, "mcpServers": []}
"""Load an existing session (used by agents like openclaw that need pre-created sessions).

``mcp_servers`` mirrors :meth:`session_new` — the same task-configured
servers are attached to the resumed session.
"""
server_params = [spec.to_new_session_param() for spec in mcp_servers or []]
params = {"sessionId": session_id, "cwd": cwd, "mcpServers": server_params}
result = await self._send_request("session/load", params)
loaded_id = result.get("sessionId", session_id)
self._session = ACPSession(loaded_id)
Expand Down
9 changes: 8 additions & 1 deletion src/benchflow/acp/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from benchflow.acp.client import ACPClient
from benchflow.acp.container_transport import ContainerTransport
from benchflow.acp.types import McpServerSpec
from benchflow.agents.protocol import ACPSessionAdapter
from benchflow.agents.providers import find_provider, strip_provider_prefix
from benchflow.agents.registry import AGENTS
Expand Down Expand Up @@ -233,6 +234,7 @@ async def connect_acp(
rollout_dir: Path,
environment: str,
agent_cwd: str,
mcp_servers: list[McpServerSpec] | None = None,
) -> tuple[ACPClient, object, ACPSessionAdapter, str]:
"""Create ACP transport, connect, init session, set model.

Expand All @@ -245,6 +247,10 @@ async def connect_acp(
instantiating the adapter here, every handler the kernel registers stayed
dormant and the auto-approve policy ran unconditionally (#382 follow-up).

``mcp_servers`` are the task's configured MCP servers (mapped from
``[[environment.mcp_servers]]``); they are attached to the ACP session at
``session/new`` so the agent can reach them. ``None`` attaches none.

Retries with exponential backoff on ConnectionError (Daytona SSH storms).
"""
# Resolve agent binary path for non-docker environments
Expand Down Expand Up @@ -305,7 +311,8 @@ async def connect_acp(
logger.info(f"ACP agent: {agent_name}")

session = await asyncio.wait_for(
acp_client.session_new(cwd=agent_cwd), timeout=60
acp_client.session_new(cwd=agent_cwd, mcp_servers=mcp_servers),
timeout=60,
)
logger.info(f"Session: {session.session_id}")
break
Expand Down
29 changes: 28 additions & 1 deletion src/benchflow/acp/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,41 @@ class McpServerSpec(BaseModel):

Vendored: BenchFlow uses a single flat shape across stdio/SSE/HTTP, while
the SDK splits these into separate ``McpServerStdio`` / ``SseMcpServer`` /
``HttpMcpServer`` models.
``HttpMcpServer`` models. :meth:`to_new_session_param` projects this flat
shape onto the exact per-transport dict ``session/new`` expects: every
variant carries ``name``; stdio adds ``command``/``args``/``env`` and omits
the ``type`` discriminator (the SDK's ``McpServerStdio`` has none); sse/http
add ``url``/``headers`` and keep ``type``.
"""

name: str # required on every ACP variant; the rest are transport-dependent
type: str = "stdio"
command: str | None = None
args: list[str] = Field(default_factory=list)
env: list[dict[str, str]] = Field(default_factory=list)
url: str | None = None
headers: list[dict[str, str]] = Field(default_factory=list)

def to_new_session_param(self) -> dict[str, Any]:
"""Project onto the per-transport ``session/new`` server dict.

stdio servers carry ``command``/``args``/``env`` and no ``type``; sse and
http servers carry ``url``/``headers`` and the ``type`` discriminator.
The result is shaped for the SDK's discriminated ``mcp_servers`` union.
"""
if self.type == "stdio":
return {
"name": self.name,
"command": self.command,
"args": list(self.args),
"env": list(self.env),
}
return {
"type": self.type,
"name": self.name,
"url": self.url,
"headers": list(self.headers),
}


class CancelParams(BaseModel):
Expand Down
37 changes: 37 additions & 0 deletions src/benchflow/rollout.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from benchflow._types import Role, Scene, Turn
from benchflow._utils.config import normalize_agent_name, normalize_sandbox_user
from benchflow._utils.scoring import classify_error, classify_verifier_error
from benchflow.acp.types import McpServerSpec
from benchflow.contracts import (
AgentProtocolError,
BaseUser,
Expand Down Expand Up @@ -102,6 +103,40 @@ def _task_disallows_internet(task: Any) -> bool:
return getattr(env_config, "allow_internet", True) is False


# task.toml ``transport`` names → ACP ``session/new`` server ``type``. The MCP
# ecosystem calls it "streamable-http"; ACP calls it "http". stdio/sse match.
_MCP_TRANSPORT_TO_ACP_TYPE = {
"stdio": "stdio",
"sse": "sse",
"streamable-http": "http",
}


def _task_mcp_specs(task: Any) -> list[McpServerSpec]:
"""Map the task's ``[[environment.mcp_servers]]`` entries to ACP specs.

This is the composition seam between the task-config layer
(``MCPServerConfig``) and the ACP protocol layer (``McpServerSpec``) — kept
here, in the rollout, so ``acp/`` stays free of any task-config dependency.
The resulting specs are attached to every ACP session the rollout opens
(``session/new``), making task-declared MCP servers — e.g. a Playwright MCP
— reachable by the agent. Returns ``[]`` when the task declares none,
preserving the historical default of attaching no MCP servers.
"""
env_config = getattr(getattr(task, "config", None), "environment", None)
configs = getattr(env_config, "mcp_servers", None) or []
return [
McpServerSpec(
name=config.name,
type=_MCP_TRANSPORT_TO_ACP_TYPE.get(config.transport, config.transport),
command=config.command,
args=list(config.args),
url=config.url,
)
for config in configs
]


def _apply_web_policy(agent_env: dict[str, str], *, disallow: bool) -> dict[str, str]:
"""Inject BenchFlow's no-web policy marker into agent env when requested."""
if not disallow:
Expand Down Expand Up @@ -1523,6 +1558,7 @@ async def connect(self) -> None:
rollout_dir=rollout_dir,
environment=cfg.environment,
agent_cwd=self._agent_cwd,
mcp_servers=_task_mcp_specs(getattr(self, "_task", None)),
)
self._reapply_ask_user_handler()

Expand Down Expand Up @@ -2430,6 +2466,7 @@ async def connect_as(self, role: Role) -> None:
rollout_dir=rollout_dir,
environment=cfg.environment,
agent_cwd=self._agent_cwd,
mcp_servers=_task_mcp_specs(getattr(self, "_task", None)),
)
self._reapply_ask_user_handler()
self._active_role = role
Expand Down
Loading
Loading