diff --git a/pyproject.toml b/pyproject.toml index 62ad909..bbcc9c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ "httpx>=0.27.0", "pyyaml>=6.0", "python-dateutil>=2.8.0", + "langfuse>=2.0.0", ] [project.optional-dependencies] diff --git a/scripts/test_tracing_e2e.py b/scripts/test_tracing_e2e.py new file mode 100755 index 0000000..7b3c064 --- /dev/null +++ b/scripts/test_tracing_e2e.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +"""End-to-end test for Langfuse tracing integration. + +Spins up the mcp-acp MCP server as a subprocess, connects via stdio, +exercises several tools against a live ACP cluster, and verifies that +Langfuse traces are created (when credentials are configured). + +Prerequisites: + - A running ACP cluster (kind or remote) with public-api accessible + - A clusters.yaml config OR env vars pointing to the cluster + - Langfuse credentials sourced into env + +Usage: + # 1. Create a clusters.yaml if you don't have one: + uv run python scripts/test_tracing_e2e.py --setup-config + + # 2. Source Langfuse creds and run: + source ../langfuse.env + uv run python scripts/test_tracing_e2e.py + + # 3. Verify kill switch disables tracing: + MCP_ACP_TRACING_ENABLED=false uv run python scripts/test_tracing_e2e.py +""" + +import argparse +import asyncio +import os +import subprocess +import sys +import textwrap +from pathlib import Path + + +def setup_kind_config() -> Path: + """Auto-generate a clusters.yaml for the local kind cluster.""" + config_dir = Path.home() / ".config" / "acp" + config_path = config_dir / "clusters.yaml" + + # Get node IP + node_ip = subprocess.check_output( + ["kubectl", "get", "nodes", "-o", 'jsonpath={.items[0].status.addresses[?(@.type=="InternalIP")].address}'], + text=True, + ).strip() + + # Get NodePort for public-api-service + node_port = subprocess.check_output( + [ + "kubectl", + "get", + "svc", + "public-api-service", + "-n", + "ambient-code", + "-o", + "jsonpath={.spec.ports[0].nodePort}", + ], + text=True, + ).strip() + + # Get test user token + token_b64 = subprocess.check_output( + ["kubectl", "get", "secret", "test-user-token", "-n", "ambient-code", "-o", "jsonpath={.data.token}"], + text=True, + ).strip() + token = subprocess.check_output(["base64", "-d"], input=token_b64, text=True).strip() + + config_content = textwrap.dedent(f"""\ + default_cluster: kind + clusters: + kind: + server: http://{node_ip}:{node_port} + default_project: ambient-code + description: Local kind development cluster + token: "{token}" + """) + + config_dir.mkdir(parents=True, exist_ok=True) + config_path.write_text(config_content) + print(f"Wrote clusters.yaml to {config_path}") + print(f" server: http://{node_ip}:{node_port}") + print(" project: ambient-code") + return config_path + + +async def run_e2e_test() -> None: + """Connect to the MCP server and exercise tools.""" + from mcp import ClientSession + from mcp.client.stdio import StdioServerParameters, stdio_client + + # Pass through current env (including Langfuse vars if set) + env = os.environ.copy() + env["PYTHONPATH"] = str(Path(__file__).resolve().parent.parent / "src") + + # Also add the utils path (sibling to src/) + utils_parent = Path(__file__).resolve().parent.parent + if "PYTHONPATH" in env: + env["PYTHONPATH"] = f"{utils_parent}:{env['PYTHONPATH']}" + else: + env["PYTHONPATH"] = str(utils_parent) + + # Prefer .venv-3.12 (avoids Langfuse/Pydantic v1 incompatibility with Python 3.14) + project_root = Path(__file__).resolve().parent.parent + venv_312 = project_root / ".venv-3.12" / "bin" / "python" + venv_default = project_root / ".venv" / "bin" / "python" + python_bin = str(venv_312 if venv_312.exists() else venv_default) + + server_params = StdioServerParameters( + command=python_bin, + args=["-m", "mcp_acp.server"], + env=env, + cwd=str(project_root), + ) + + langfuse_configured = bool(os.getenv("LANGFUSE_PUBLIC_KEY") and os.getenv("LANGFUSE_SECRET_KEY")) + tracing_enabled = os.getenv("MCP_ACP_TRACING_ENABLED", "true").lower() in ("true", "1", "yes") + + print("=" * 60) + print("MCP-ACP Tracing End-to-End Test") + print("=" * 60) + print(f" Langfuse credentials: {'configured' if langfuse_configured else 'NOT SET (silent no-op)'}") + print(f" Tracing enabled: {tracing_enabled}") + print(f" Kill switch: MCP_ACP_TRACING_ENABLED={os.getenv('MCP_ACP_TRACING_ENABLED', 'true')}") + print() + + results: list[dict] = [] + + async with stdio_client(server_params) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + print("[OK] MCP server started and initialized") + + # 1. List tools + tools_result = await session.list_tools() + tool_names = [t.name for t in tools_result.tools] + print(f"[OK] list_tools: {len(tool_names)} tools available") + results.append({"test": "list_tools", "status": "pass", "tool_count": len(tool_names)}) + + # 2. acp_whoami — tests cluster auth + print("\n--- Tool: acp_whoami ---") + whoami = await session.call_tool("acp_whoami", {}) + whoami_text = whoami.content[0].text + print(f" Response: {whoami_text[:200]}") + results.append({"test": "acp_whoami", "status": "pass"}) + + # 3. acp_list_clusters + print("\n--- Tool: acp_list_clusters ---") + clusters = await session.call_tool("acp_list_clusters", {}) + clusters_text = clusters.content[0].text + print(f" Response: {clusters_text[:200]}") + results.append({"test": "acp_list_clusters", "status": "pass"}) + + # 4. acp_list_sessions — exercises HTTP call tracing + print("\n--- Tool: acp_list_sessions ---") + sessions_result = await session.call_tool("acp_list_sessions", {}) + sessions_text = sessions_result.content[0].text + print(f" Response: {sessions_text[:200]}") + results.append({"test": "acp_list_sessions", "status": "pass"}) + + # 5. acp_get_session with invalid session — exercises error tracing + print("\n--- Tool: acp_get_session (expect error) ---") + err_result = await session.call_tool("acp_get_session", {"session": "nonexistent-session-xyz"}) + err_text = err_result.content[0].text + is_error = "error" in err_text.lower() or "not found" in err_text.lower() + print(f" Response: {err_text[:200]}") + print(f" Got expected error: {is_error}") + results.append({"test": "acp_get_session_error", "status": "pass" if is_error else "warn"}) + + # 6. acp_create_session dry_run — exercises POST without side effects + print("\n--- Tool: acp_create_session (dry_run) ---") + create_result = await session.call_tool( + "acp_create_session", + { + "initial_prompt": "Hello from tracing e2e test", + "display_name": "tracing-e2e-test", + "dry_run": True, + }, + ) + create_text = create_result.content[0].text + print(f" Response: {create_text[:200]}") + results.append({"test": "acp_create_session_dry_run", "status": "pass"}) + + # 7. acp_bulk_delete_sessions without confirm — exercises validation error + print("\n--- Tool: acp_bulk_delete_sessions (expect validation error) ---") + bulk_result = await session.call_tool( + "acp_bulk_delete_sessions", + {"sessions": ["s1"]}, + ) + bulk_text = bulk_result.content[0].text + is_validation = "confirm" in bulk_text.lower() or "validation" in bulk_text.lower() + print(f" Response: {bulk_text[:200]}") + print(f" Got expected validation error: {is_validation}") + results.append({"test": "acp_bulk_validation_error", "status": "pass" if is_validation else "warn"}) + + # Summary + print("\n" + "=" * 60) + print("RESULTS SUMMARY") + print("=" * 60) + for r in results: + icon = "PASS" if r["status"] == "pass" else "WARN" + print(f" [{icon}] {r['test']}") + + passed = sum(1 for r in results if r["status"] == "pass") + print(f"\n {passed}/{len(results)} tests passed") + + if langfuse_configured and tracing_enabled: + print("\n Check your Langfuse dashboard for traces:") + print(f" {os.getenv('LANGFUSE_BASE_URL', 'https://cloud.langfuse.com')}") + print(" Expected traces:") + print(" - acp_whoami") + print(" - acp_list_clusters") + print(" - acp_list_sessions (with child span: GET /v1/sessions)") + print(" - acp_get_session (with error status)") + print(" - acp_create_session (dry_run, no HTTP span)") + print(" - acp_bulk_delete_sessions (validation_error, no HTTP span)") + elif not langfuse_configured: + print("\n Langfuse credentials not set — no traces were sent (silent no-op).") + print(" To verify tracing, set LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, and LANGFUSE_BASE_URL.") + elif not tracing_enabled: + print("\n Tracing disabled via MCP_ACP_TRACING_ENABLED=false — no traces sent.") + + print() + + +def main(): + parser = argparse.ArgumentParser(description="End-to-end test for Langfuse tracing in mcp-acp") + parser.add_argument( + "--setup-config", + action="store_true", + help="Generate ~/.config/acp/clusters.yaml from the local kind cluster, then exit", + ) + args = parser.parse_args() + + if args.setup_config: + setup_kind_config() + return + + # Check clusters.yaml exists + config_path = os.getenv("ACP_CLUSTER_CONFIG") or str(Path.home() / ".config" / "acp" / "clusters.yaml") + if not Path(config_path).exists(): + print(f"ERROR: No clusters.yaml found at {config_path}") + print("Run with --setup-config to auto-generate from kind cluster:") + print(f" uv run python {sys.argv[0]} --setup-config") + sys.exit(1) + + asyncio.run(run_e2e_test()) + + +if __name__ == "__main__": + main() diff --git a/src/mcp_acp/client.py b/src/mcp_acp/client.py index 0523daa..9d5e02d 100644 --- a/src/mcp_acp/client.py +++ b/src/mcp_acp/client.py @@ -13,6 +13,7 @@ import httpx from mcp_acp.settings import _acpctl_config_path, load_clusters_config, load_settings +from mcp_acp.tracing import trace_http_request from mcp_acp.utils.pylogger import get_python_logger logger = get_python_logger() @@ -162,42 +163,44 @@ async def _request( client = await self._get_http_client() - try: - response = await client.request( - method=method, - url=url, - headers=headers, - json=json_data, - params=params, - ) - - if response.status_code >= 400: - try: - error_data = response.json() - error_msg = error_data.get("error", f"HTTP {response.status_code}") - except Exception: - error_msg = f"HTTP {response.status_code}: {response.text}" - - logger.warning( - "api_request_failed", + async with trace_http_request(method, path, params) as span_ctx: + try: + response = await client.request( method=method, - path=path, - status_code=response.status_code, - error=error_msg, + url=url, + headers=headers, + json=json_data, + params=params, ) - raise ValueError(error_msg) - - if response.status_code == 204: - return {"success": True} - - return response.json() - - except httpx.TimeoutException as e: - logger.error("api_request_timeout", method=method, path=path, error=str(e)) - raise TimeoutError(f"Request timed out: {path}") from e - except httpx.RequestError as e: - logger.error("api_request_error", method=method, path=path, error=str(e)) - raise ValueError(f"Request failed: {str(e)}") from e + span_ctx.set_response(response.status_code) + + if response.status_code >= 400: + try: + error_data = response.json() + error_msg = error_data.get("error", f"HTTP {response.status_code}") + except Exception: + error_msg = f"HTTP {response.status_code}: {response.text}" + + logger.warning( + "api_request_failed", + method=method, + path=path, + status_code=response.status_code, + error=error_msg, + ) + raise ValueError(error_msg) + + if response.status_code == 204: + return {"success": True} + + return response.json() + + except httpx.TimeoutException as e: + logger.error("api_request_timeout", method=method, path=path, error=str(e)) + raise TimeoutError(f"Request timed out: {path}") from e + except httpx.RequestError as e: + logger.error("api_request_error", method=method, path=path, error=str(e)) + raise ValueError(f"Request failed: {str(e)}") from e async def _request_text( self, @@ -221,20 +224,22 @@ async def _request_text( client = await self._get_http_client() - try: - response = await client.request(method=method, url=url, headers=headers, params=params) + async with trace_http_request(method, path, params) as span_ctx: + try: + response = await client.request(method=method, url=url, headers=headers, params=params) + span_ctx.set_response(response.status_code) - if response.status_code >= 400: - raise ValueError(f"HTTP {response.status_code}: {response.text}") + if response.status_code >= 400: + raise ValueError(f"HTTP {response.status_code}: {response.text}") - return response.text + return response.text - except httpx.TimeoutException as e: - logger.error("api_request_timeout", method=method, path=path, error=str(e)) - raise TimeoutError(f"Request timed out: {path}") from e - except httpx.RequestError as e: - logger.error("api_request_error", method=method, path=path, error=str(e)) - raise ValueError(f"Request failed: {str(e)}") from e + except httpx.TimeoutException as e: + logger.error("api_request_timeout", method=method, path=path, error=str(e)) + raise TimeoutError(f"Request timed out: {path}") from e + except httpx.RequestError as e: + logger.error("api_request_error", method=method, path=path, error=str(e)) + raise ValueError(f"Request failed: {str(e)}") from e # ── Validation ─────────────────────────────────────────────────────── @@ -544,6 +549,7 @@ async def clone_session( clone_data: dict[str, Any] = { "task": source.get("task", ""), "model": source.get("model", "claude-sonnet-4"), + "displayName": new_display_name, } if source.get("repos"): diff --git a/src/mcp_acp/server.py b/src/mcp_acp/server.py index 2b00792..e8c2aa0 100644 --- a/src/mcp_acp/server.py +++ b/src/mcp_acp/server.py @@ -24,6 +24,8 @@ format_transcript, format_whoami, ) +from .tracing import flush as flush_tracing +from .tracing import trace_tool_call logger = get_python_logger() @@ -445,233 +447,250 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: client = get_client() - try: - # Auto-fill project from default if not provided - if name not in TOOLS_WITHOUT_PROJECT and not arguments.get("project"): - cluster_name = client.clusters_config.default_cluster - if cluster_name: - cluster = client.clusters_config.clusters.get(cluster_name) - if cluster and cluster.default_project: - arguments["project"] = cluster.default_project - logger.info("project_autofilled", project=cluster.default_project) - - # Confirmation enforcement for destructive bulk operations - if name in TOOLS_REQUIRING_CONFIRMATION: - if not arguments.get("dry_run") and not arguments.get("confirm"): - op = name.replace("acp_bulk_", "").replace("_", " ") - raise ValueError(f"Bulk {op} requires confirm=true. Use dry_run=true to preview first.") - - # ── Dispatch ───────────────────────────────────────────────── - project = arguments.get("project", "") - - # Session management - if name == "acp_list_sessions": - result = await client.list_sessions( - project=project, - status=arguments.get("status"), - older_than=arguments.get("older_than"), - sort_by=arguments.get("sort_by"), - limit=arguments.get("limit"), - ) - text = format_sessions_list(result) - - elif name == "acp_get_session": - result = await client.get_session(project=project, session=arguments["session"]) - text = format_result(result) - - elif name == "acp_create_session": - result = await client.create_session( - project=project, - initial_prompt=arguments["initial_prompt"], - display_name=arguments.get("display_name"), - repos=arguments.get("repos"), - model=arguments.get("model", "claude-sonnet-4"), - dry_run=arguments.get("dry_run", False), - ) - text = format_session_created(result) - - elif name == "acp_create_session_from_template": - result = await client.create_session_from_template( - project=project, - template=arguments["template"], - display_name=arguments["display_name"], - repos=arguments.get("repos"), - dry_run=arguments.get("dry_run", False), - ) - text = format_session_created(result) - - elif name == "acp_delete_session": - result = await client.delete_session( - project=project, session=arguments["session"], dry_run=arguments.get("dry_run", False) - ) - text = format_result(result) - - elif name == "acp_restart_session": - result = await client.restart_session( - project=project, session=arguments["session"], dry_run=arguments.get("dry_run", False) - ) - text = format_result(result) - - elif name == "acp_clone_session": - result = await client.clone_session( - project=project, - source_session=arguments["source_session"], - new_display_name=arguments["new_display_name"], - dry_run=arguments.get("dry_run", False), - ) - text = format_session_created(result) - - elif name == "acp_update_session": - result = await client.update_session( - project=project, - session=arguments["session"], - display_name=arguments.get("display_name"), - timeout=arguments.get("timeout"), - dry_run=arguments.get("dry_run", False), - ) - text = format_result(result) - - # Observability - elif name == "acp_get_session_logs": - result = await client.get_session_logs( - project=project, - session=arguments["session"], - container=arguments.get("container"), - tail_lines=arguments.get("tail_lines", 1000), - ) - text = format_logs(result) - - elif name == "acp_get_session_transcript": - result = await client.get_session_transcript( - project=project, - session=arguments["session"], - format=arguments.get("format", "json"), - ) - text = format_transcript(result) - - elif name == "acp_get_session_metrics": - result = await client.get_session_metrics(project=project, session=arguments["session"]) - text = format_metrics(result) - - # Labels - elif name == "acp_label_resource": - result = await client.label_session(project=project, session=arguments["name"], labels=arguments["labels"]) - text = format_labels(result) - - elif name == "acp_unlabel_resource": - result = await client.unlabel_session( - project=project, session=arguments["name"], label_keys=arguments["label_keys"] - ) - text = format_labels(result) - - elif name == "acp_list_sessions_by_label": - result = await client.list_sessions_by_label(project=project, labels=arguments["labels"]) - text = format_sessions_list(result) - - elif name == "acp_bulk_label_resources": - result = await client.bulk_label_sessions( - project=project, - sessions=arguments["sessions"], - labels=arguments["labels"], - dry_run=arguments.get("dry_run", False), - ) - text = format_labels(result) - - elif name == "acp_bulk_unlabel_resources": - result = await client.bulk_unlabel_sessions( - project=project, - sessions=arguments["sessions"], - label_keys=arguments["label_keys"], - dry_run=arguments.get("dry_run", False), - ) - text = format_labels(result) - - # Bulk operations (named) - elif name == "acp_bulk_delete_sessions": - result = await client.bulk_delete_sessions( - project=project, sessions=arguments["sessions"], dry_run=arguments.get("dry_run", False) - ) - text = format_bulk_result(result, "delete") + metadata = { + "project": arguments.get("project", ""), + "dry_run": arguments.get("dry_run", False), + } - elif name == "acp_bulk_stop_sessions": - result = await client.bulk_stop_sessions( - project=project, sessions=arguments["sessions"], dry_run=arguments.get("dry_run", False) - ) - text = format_bulk_result(result, "stop") - - elif name == "acp_bulk_restart_sessions": - result = await client.bulk_restart_sessions( - project=project, sessions=arguments["sessions"], dry_run=arguments.get("dry_run", False) - ) - text = format_bulk_result(result, "restart") - - # Bulk operations (by label) - elif name == "acp_bulk_delete_sessions_by_label": - result = await client.bulk_delete_sessions_by_label( - project=project, labels=arguments["labels"], dry_run=arguments.get("dry_run", False) - ) - text = format_bulk_result(result, "delete") - - elif name == "acp_bulk_stop_sessions_by_label": - result = await client.bulk_stop_sessions_by_label( - project=project, labels=arguments["labels"], dry_run=arguments.get("dry_run", False) - ) - text = format_bulk_result(result, "stop") + async with trace_tool_call(name, safe_args, metadata) as trace_ctx: + try: + # Auto-fill project from default if not provided + if name not in TOOLS_WITHOUT_PROJECT and not arguments.get("project"): + cluster_name = client.clusters_config.default_cluster + if cluster_name: + cluster = client.clusters_config.clusters.get(cluster_name) + if cluster and cluster.default_project: + arguments["project"] = cluster.default_project + logger.info("project_autofilled", project=cluster.default_project) + + # Confirmation enforcement for destructive bulk operations + if name in TOOLS_REQUIRING_CONFIRMATION: + if not arguments.get("dry_run") and not arguments.get("confirm"): + op = name.replace("acp_bulk_", "").replace("_", " ") + raise ValueError(f"Bulk {op} requires confirm=true. Use dry_run=true to preview first.") + + # ── Dispatch ───────────────────────────────────────────────── + project = arguments.get("project", "") + + # Session management + if name == "acp_list_sessions": + result = await client.list_sessions( + project=project, + status=arguments.get("status"), + older_than=arguments.get("older_than"), + sort_by=arguments.get("sort_by"), + limit=arguments.get("limit"), + ) + text = format_sessions_list(result) + + elif name == "acp_get_session": + result = await client.get_session(project=project, session=arguments["session"]) + text = format_result(result) + + elif name == "acp_create_session": + result = await client.create_session( + project=project, + initial_prompt=arguments["initial_prompt"], + display_name=arguments.get("display_name"), + repos=arguments.get("repos"), + model=arguments.get("model", "claude-sonnet-4"), + dry_run=arguments.get("dry_run", False), + ) + text = format_session_created(result) + + elif name == "acp_create_session_from_template": + result = await client.create_session_from_template( + project=project, + template=arguments["template"], + display_name=arguments["display_name"], + repos=arguments.get("repos"), + dry_run=arguments.get("dry_run", False), + ) + text = format_session_created(result) + + elif name == "acp_delete_session": + result = await client.delete_session( + project=project, session=arguments["session"], dry_run=arguments.get("dry_run", False) + ) + text = format_result(result) + + elif name == "acp_restart_session": + result = await client.restart_session( + project=project, session=arguments["session"], dry_run=arguments.get("dry_run", False) + ) + text = format_result(result) + + elif name == "acp_clone_session": + result = await client.clone_session( + project=project, + source_session=arguments["source_session"], + new_display_name=arguments["new_display_name"], + dry_run=arguments.get("dry_run", False), + ) + text = format_session_created(result) + + elif name == "acp_update_session": + result = await client.update_session( + project=project, + session=arguments["session"], + display_name=arguments.get("display_name"), + timeout=arguments.get("timeout"), + dry_run=arguments.get("dry_run", False), + ) + text = format_result(result) + + # Observability + elif name == "acp_get_session_logs": + result = await client.get_session_logs( + project=project, + session=arguments["session"], + container=arguments.get("container"), + tail_lines=arguments.get("tail_lines", 1000), + ) + text = format_logs(result) + + elif name == "acp_get_session_transcript": + result = await client.get_session_transcript( + project=project, + session=arguments["session"], + format=arguments.get("format", "json"), + ) + text = format_transcript(result) + + elif name == "acp_get_session_metrics": + result = await client.get_session_metrics(project=project, session=arguments["session"]) + text = format_metrics(result) + + # Labels + elif name == "acp_label_resource": + result = await client.label_session( + project=project, session=arguments["name"], labels=arguments["labels"] + ) + text = format_labels(result) + + elif name == "acp_unlabel_resource": + result = await client.unlabel_session( + project=project, session=arguments["name"], label_keys=arguments["label_keys"] + ) + text = format_labels(result) + + elif name == "acp_list_sessions_by_label": + result = await client.list_sessions_by_label(project=project, labels=arguments["labels"]) + text = format_sessions_list(result) + + elif name == "acp_bulk_label_resources": + result = await client.bulk_label_sessions( + project=project, + sessions=arguments["sessions"], + labels=arguments["labels"], + dry_run=arguments.get("dry_run", False), + ) + text = format_labels(result) + + elif name == "acp_bulk_unlabel_resources": + result = await client.bulk_unlabel_sessions( + project=project, + sessions=arguments["sessions"], + label_keys=arguments["label_keys"], + dry_run=arguments.get("dry_run", False), + ) + text = format_labels(result) + + # Bulk operations (named) + elif name == "acp_bulk_delete_sessions": + result = await client.bulk_delete_sessions( + project=project, sessions=arguments["sessions"], dry_run=arguments.get("dry_run", False) + ) + text = format_bulk_result(result, "delete") + + elif name == "acp_bulk_stop_sessions": + result = await client.bulk_stop_sessions( + project=project, sessions=arguments["sessions"], dry_run=arguments.get("dry_run", False) + ) + text = format_bulk_result(result, "stop") + + elif name == "acp_bulk_restart_sessions": + result = await client.bulk_restart_sessions( + project=project, sessions=arguments["sessions"], dry_run=arguments.get("dry_run", False) + ) + text = format_bulk_result(result, "restart") + + # Bulk operations (by label) + elif name == "acp_bulk_delete_sessions_by_label": + result = await client.bulk_delete_sessions_by_label( + project=project, labels=arguments["labels"], dry_run=arguments.get("dry_run", False) + ) + text = format_bulk_result(result, "delete") + + elif name == "acp_bulk_stop_sessions_by_label": + result = await client.bulk_stop_sessions_by_label( + project=project, labels=arguments["labels"], dry_run=arguments.get("dry_run", False) + ) + text = format_bulk_result(result, "stop") + + elif name == "acp_bulk_restart_sessions_by_label": + result = await client.bulk_restart_sessions_by_label( + project=project, labels=arguments["labels"], dry_run=arguments.get("dry_run", False) + ) + text = format_bulk_result(result, "restart") + + # Cluster management + elif name == "acp_list_clusters": + result = client.list_clusters() + text = format_clusters(result) + + elif name == "acp_whoami": + result = await client.whoami() + text = format_whoami(result) + + elif name == "acp_switch_cluster": + result = await client.switch_cluster(arguments["cluster"]) + text = format_result(result) + + elif name == "acp_login": + result = await client.login(cluster=arguments["cluster"], token=arguments.get("token")) + text = format_login(result) + + else: + logger.warning("unknown_tool_requested", tool=name) + return [TextContent(type="text", text=f"Unknown tool: {name}")] + + elapsed = time.time() - start_time + logger.info("tool_call_completed", tool=name, elapsed_seconds=round(elapsed, 2)) + + trace_ctx.set_output({"status": "success"}) + return [TextContent(type="text", text=text)] - elif name == "acp_bulk_restart_sessions_by_label": - result = await client.bulk_restart_sessions_by_label( - project=project, labels=arguments["labels"], dry_run=arguments.get("dry_run", False) + except ValueError as e: + elapsed = time.time() - start_time + logger.warning("tool_validation_error", tool=name, elapsed_seconds=round(elapsed, 2), error=str(e)) + trace_ctx.set_output({"status": "validation_error", "error": str(e)}) + return [TextContent(type="text", text=f"Validation Error: {str(e)}")] + except TimeoutError as e: + elapsed = time.time() - start_time + logger.error("tool_timeout", tool=name, elapsed_seconds=round(elapsed, 2), error=str(e)) + trace_ctx.set_output({"status": "timeout_error", "error": str(e)}) + return [TextContent(type="text", text=f"Timeout Error: {str(e)}")] + except Exception as e: + elapsed = time.time() - start_time + logger.error( + "tool_unexpected_error", tool=name, elapsed_seconds=round(elapsed, 2), error=str(e), exc_info=True ) - text = format_bulk_result(result, "restart") - - # Cluster management - elif name == "acp_list_clusters": - result = client.list_clusters() - text = format_clusters(result) - - elif name == "acp_whoami": - result = await client.whoami() - text = format_whoami(result) - - elif name == "acp_switch_cluster": - result = await client.switch_cluster(arguments["cluster"]) - text = format_result(result) - - elif name == "acp_login": - result = await client.login(cluster=arguments["cluster"], token=arguments.get("token")) - text = format_login(result) - - else: - logger.warning("unknown_tool_requested", tool=name) - return [TextContent(type="text", text=f"Unknown tool: {name}")] - - elapsed = time.time() - start_time - logger.info("tool_call_completed", tool=name, elapsed_seconds=round(elapsed, 2)) - - return [TextContent(type="text", text=text)] - - except ValueError as e: - elapsed = time.time() - start_time - logger.warning("tool_validation_error", tool=name, elapsed_seconds=round(elapsed, 2), error=str(e)) - return [TextContent(type="text", text=f"Validation Error: {str(e)}")] - except TimeoutError as e: - elapsed = time.time() - start_time - logger.error("tool_timeout", tool=name, elapsed_seconds=round(elapsed, 2), error=str(e)) - return [TextContent(type="text", text=f"Timeout Error: {str(e)}")] - except Exception as e: - elapsed = time.time() - start_time - logger.error("tool_unexpected_error", tool=name, elapsed_seconds=round(elapsed, 2), error=str(e), exc_info=True) - return [TextContent(type="text", text=f"Error: {str(e)}")] + trace_ctx.set_output({"status": "error", "error": str(e)}) + return [TextContent(type="text", text=f"Error: {str(e)}")] async def main() -> None: """Run the MCP server.""" - async with stdio_server() as (read_stream, write_stream): - await app.run( - read_stream, - write_stream, - app.create_initialization_options(), - ) + try: + async with stdio_server() as (read_stream, write_stream): + await app.run( + read_stream, + write_stream, + app.create_initialization_options(), + ) + finally: + flush_tracing() def run() -> None: diff --git a/src/mcp_acp/settings.py b/src/mcp_acp/settings.py index e1505ef..36515dd 100644 --- a/src/mcp_acp/settings.py +++ b/src/mcp_acp/settings.py @@ -213,5 +213,10 @@ def load_clusters_config(settings: Settings | None = None) -> ClustersConfig: return ClustersConfig.from_yaml(settings.config_path) +def _acpctl_config_path() -> Path: + """Return the path to the acpctl CLI configuration file.""" + return Path.home() / ".config" / "ambient" / "config.json" + + # Global settings instance settings = load_settings() diff --git a/src/mcp_acp/tracing.py b/src/mcp_acp/tracing.py new file mode 100644 index 0000000..257a7f6 --- /dev/null +++ b/src/mcp_acp/tracing.py @@ -0,0 +1,261 @@ +"""Langfuse tracing integration for mcp-acp. + +Provides async context managers for tracing MCP tool calls and HTTP requests. +When Langfuse credentials are not configured or tracing is disabled via the +MCP_ACP_TRACING_ENABLED env var, all operations silently no-op. + +Uses Langfuse SDK v3 API (start_span / start_as_current_span). +""" + +import os +import time +from contextlib import asynccontextmanager +from contextvars import ContextVar +from typing import Any + +from mcp_acp.utils.pylogger import get_python_logger + +logger = get_python_logger() + +# Contextvar holds the parent span so child spans can nest under it +_current_span: ContextVar[Any] = ContextVar("_current_span", default=None) + +_langfuse_client = None +_langfuse_init_attempted = False + + +def _is_tracing_enabled() -> bool: + """Check if tracing is enabled via env var (default: true).""" + return os.getenv("MCP_ACP_TRACING_ENABLED", "true").lower() in ("true", "1", "yes") + + +def get_langfuse(): + """Get or create a Langfuse client singleton. + + Returns None if tracing is disabled or credentials are not configured. + """ + global _langfuse_client, _langfuse_init_attempted + + if _langfuse_init_attempted: + return _langfuse_client + + _langfuse_init_attempted = True + + if not _is_tracing_enabled(): + logger.info("tracing_disabled", reason="MCP_ACP_TRACING_ENABLED is not set to true") + return None + + public_key = os.getenv("LANGFUSE_PUBLIC_KEY") + secret_key = os.getenv("LANGFUSE_SECRET_KEY") + + if not public_key or not secret_key: + logger.info("tracing_disabled", reason="LANGFUSE_PUBLIC_KEY or LANGFUSE_SECRET_KEY not set") + return None + + try: + from langfuse import Langfuse + + _langfuse_client = Langfuse() + logger.info("tracing_initialized", host=os.getenv("LANGFUSE_BASE_URL", "default")) + return _langfuse_client + except Exception as e: + logger.warning("tracing_init_failed", error=str(e)) + return None + + +class _NoOpContext: + """No-op context returned when tracing is disabled.""" + + def set_output(self, output: dict[str, Any]) -> None: + pass + + def set_response(self, status_code: int) -> None: + pass + + +@asynccontextmanager +async def trace_tool_call(tool_name: str, safe_args: dict[str, Any], metadata: dict[str, Any] | None = None): + """Trace an MCP tool call as a Langfuse span. + + Creates a top-level Langfuse span for the tool invocation and stores it + in a contextvar so child HTTP spans can nest under it. + + Yields a context object with set_output() for recording results. + """ + lf = get_langfuse() + if lf is None: + yield _NoOpContext() + return + + trace_input = {"tool": tool_name, "arguments": safe_args} + if metadata: + trace_input["metadata"] = metadata + + start = time.time() + span = None + token = None + + try: + span = lf.start_span(name=tool_name, input=trace_input, metadata=metadata or {}) + token = _current_span.set(span) + + ctx = _TraceContext(span, start) + yield ctx + except Exception: + if span: + elapsed = time.time() - start + span.update( + output={"status": "error", "duration_ms": round(elapsed * 1000)}, + level="ERROR", + ) + span.end() + raise + else: + if span and not ctx._output_set: + elapsed = time.time() - start + span.update( + output={"status": "success", "duration_ms": round(elapsed * 1000)}, + level="DEFAULT", + ) + span.end() + finally: + if token is not None: + _current_span.reset(token) + + +class _TraceContext: + """Context object for an active Langfuse trace span.""" + + def __init__(self, span, start_time: float): + self._span = span + self._start_time = start_time + self._output_set = False + + def set_output(self, output: dict[str, Any]) -> None: + """Record output and status on the span.""" + self._output_set = True + elapsed = time.time() - self._start_time + output["duration_ms"] = round(elapsed * 1000) + + status = output.get("status", "") + if "error" in status: + level = "ERROR" + elif "warning" in status: + level = "WARNING" + else: + level = "DEFAULT" + + self._span.update(output=output, level=level) + self._span.end() + + def set_response(self, status_code: int) -> None: + pass + + +@asynccontextmanager +async def trace_http_request(method: str, path: str, params: dict[str, Any] | None = None): + """Trace an HTTP request as a child span nested under the current tool span. + + Reads the contextvar to find the parent span and creates a child. + Yields a context object with set_response() for recording the HTTP status code. + """ + parent_span = _current_span.get() + + if parent_span is None: + yield _NoOpContext() + return + + span_input = {"method": method, "path": path} + if params: + span_input["params"] = params + + start = time.time() + span = None + token = None + + try: + span = parent_span.start_span(name=f"{method} {path}", input=span_input) + token = _current_span.set(span) + + ctx = _SpanContext(span, start) + yield ctx + except Exception: + if span and not ctx._ended: + elapsed = time.time() - start + span.update( + output={"status": "error", "duration_ms": round(elapsed * 1000)}, + level="ERROR", + ) + span.end() + ctx._ended = True + raise + else: + if span and not ctx._ended: + elapsed = time.time() - start + span.update( + output={"status": "success", "duration_ms": round(elapsed * 1000)}, + level="DEFAULT", + ) + span.end() + ctx._ended = True + finally: + if token is not None: + _current_span.reset(token) + + +class _SpanContext: + """Context object for an active Langfuse child span.""" + + def __init__(self, span, start_time: float): + self._span = span + self._start_time = start_time + self._ended = False + + @property + def _response_set(self) -> bool: + return self._ended + + def set_output(self, output: dict[str, Any]) -> None: + pass + + def set_response(self, status_code: int) -> None: + """Record HTTP status code on the span.""" + if self._ended: + return + self._ended = True + elapsed = time.time() - self._start_time + + if status_code >= 500: + level = "ERROR" + elif status_code >= 400: + level = "WARNING" + else: + level = "DEFAULT" + + self._span.update( + output={"status_code": status_code, "duration_ms": round(elapsed * 1000)}, + level=level, + ) + self._span.end() + + +def flush() -> None: + """Flush pending traces to Langfuse.""" + if _langfuse_client is not None: + try: + _langfuse_client.flush() + except Exception as e: + logger.warning("tracing_flush_failed", error=str(e)) + + +def shutdown() -> None: + """Shut down the Langfuse client, flushing any pending data.""" + global _langfuse_client, _langfuse_init_attempted + if _langfuse_client is not None: + try: + _langfuse_client.shutdown() + except Exception as e: + logger.warning("tracing_shutdown_failed", error=str(e)) + finally: + _langfuse_client = None + _langfuse_init_attempted = False diff --git a/tests/test_client.py b/tests/test_client.py index 019d4a0..cd0595c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -286,12 +286,10 @@ async def test_create_session_dry_run(self, client: ACPClient) -> None: assert result["project"] == "test-project" manifest = result["manifest"] - assert manifest["initialPrompt"] == "Run all tests" + assert manifest["task"] == "Run all tests" assert manifest["displayName"] == "Test Run" - assert manifest["repos"] == ["https://github.com/org/repo"] - assert manifest["interactive"] is False - assert manifest["llmConfig"]["model"] == "claude-sonnet-4" - assert manifest["timeout"] == 900 + assert manifest["repos"] == [{"url": "https://github.com/org/repo"}] + assert manifest["model"] == "claude-sonnet-4" @pytest.mark.asyncio async def test_create_session_dry_run_minimal(self, client: ACPClient) -> None: @@ -349,19 +347,17 @@ async def test_create_session_api_failure(self, client: ACPClient) -> None: assert "invalid session spec" in result["message"] @pytest.mark.asyncio - async def test_create_session_custom_model_and_timeout(self, client: ACPClient) -> None: - """Custom model and timeout should appear in dry-run manifest.""" + async def test_create_session_custom_model(self, client: ACPClient) -> None: + """Custom model should appear in dry-run manifest.""" result = await client.create_session( project="test-project", initial_prompt="hello", model="claude-opus-4", - timeout=3600, dry_run=True, ) manifest = result["manifest"] - assert manifest["llmConfig"]["model"] == "claude-opus-4" - assert manifest["timeout"] == 3600 + assert manifest["model"] == "claude-opus-4" class TestRestartSession: @@ -460,11 +456,9 @@ async def test_clone_session_dry_run(self, client: ACPClient) -> None: mock_response.status_code = 200 mock_response.json.return_value = { "id": "source-1", - "initialPrompt": "original prompt", - "interactive": False, - "timeout": 900, - "llmConfig": {"model": "claude-sonnet-4"}, - "repos": ["https://github.com/org/repo"], + "task": "original prompt", + "model": "claude-sonnet-4", + "repos": [{"url": "https://github.com/org/repo"}], } with patch.object(client, "_get_http_client") as mock_get_client: @@ -478,6 +472,7 @@ async def test_clone_session_dry_run(self, client: ACPClient) -> None: assert result["success"] is True assert "Would clone" in result["message"] assert result["manifest"]["displayName"] == "clone-name" + assert result["manifest"]["task"] == "original prompt" assert result["source_session"] == "source-1" @pytest.mark.asyncio @@ -487,10 +482,8 @@ async def test_clone_session_success(self, client: ACPClient) -> None: source_response.status_code = 200 source_response.json.return_value = { "id": "source-1", - "initialPrompt": "original prompt", - "interactive": False, - "timeout": 900, - "llmConfig": {"model": "claude-sonnet-4"}, + "task": "original prompt", + "model": "claude-sonnet-4", } create_response = MagicMock() @@ -588,8 +581,8 @@ async def test_create_from_template_dry_run(self, client: ACPClient) -> None: assert "bugfix" in result["message"] manifest = result["manifest"] assert manifest["displayName"] == "Fix login bug" - assert manifest["workflow"] == "bugfix" - assert manifest["llmConfig"]["model"] == "claude-sonnet-4" + assert manifest["task"] == "Bugfix: diagnose and fix the reported bug." + assert manifest["model"] == "claude-sonnet-4" @pytest.mark.asyncio async def test_create_from_template_invalid_raises(self, client: ACPClient) -> None: diff --git a/tests/test_server_e2e.py b/tests/test_server_e2e.py new file mode 100644 index 0000000..7b129dc --- /dev/null +++ b/tests/test_server_e2e.py @@ -0,0 +1,1115 @@ +"""End-to-end tests for MCP server exercising all tools with mocked HTTP responses. + +This module tests the complete flow from tool call through the client to HTTP requests, +mocking only the HTTP transport layer to verify the full integration. + +Test Categories: +- Session Management: CRUD operations for AgenticSessions +- Observability: Logs, transcripts, and metrics retrieval +- Labels: Adding, removing, and filtering by labels +- Bulk Operations: Multi-session operations with confirmation +- Cluster Management: Multi-cluster configuration and auth +- Error Handling: HTTP errors, validation, and edge cases +""" + +import json +from unittest.mock import MagicMock, patch + +import httpx +import pytest + +from mcp_acp.client import ACPClient +from mcp_acp.server import call_tool, list_tools + +# ── Fixtures ───────────────────────────────────────────────────────────────── + + +@pytest.fixture +def mock_clusters_config(): + """Create mock clusters configuration with a single test cluster.""" + cluster = MagicMock() + cluster.server = "https://api.test.example.com" + cluster.default_project = "test-project" + cluster.description = "Test Cluster" + cluster.token = "test-token-abc123" + + config = MagicMock() + config.clusters = {"test-cluster": cluster} + config.default_cluster = "test-cluster" + return config + + +@pytest.fixture +def mock_settings(): + """Create mock settings.""" + settings = MagicMock() + settings.config_path = None + return settings + + +def make_response(status_code: int = 200, json_data: dict | None = None, text: str = "") -> httpx.Response: + """Create a mock httpx Response. + + Args: + status_code: HTTP status code + json_data: JSON response body (takes precedence over text) + text: Plain text response body + """ + if json_data is not None: + content = json.dumps(json_data).encode() + headers = {"content-type": "application/json"} + else: + content = text.encode() + headers = {"content-type": "text/plain"} + + return httpx.Response( + status_code=status_code, + content=content, + headers=headers, + request=httpx.Request("GET", "https://api.test.example.com"), + ) + + +class MockHTTPClient: + """Mock HTTP client that returns predefined responses based on method and path. + + Supports: + - Setting responses for specific method/path combinations + - Recording all HTTP calls for verification + - Default successful response for unconfigured paths + """ + + def __init__(self): + self.responses: dict[tuple[str, str], httpx.Response] = {} + self.calls: list[dict] = [] + self.is_closed = False + + def set_response(self, method: str, path_contains: str, response: httpx.Response): + """Set a response for a specific method/path combination.""" + self.responses[(method.upper(), path_contains)] = response + + async def request(self, method: str, url: str, **kwargs) -> httpx.Response: + """Mock request that returns configured responses.""" + self.calls.append({"method": method, "url": url, "kwargs": kwargs}) + + # Try to find a matching response (more specific paths first) + for (m, path), response in sorted(self.responses.items(), key=lambda x: len(x[0][1]), reverse=True): + if method.upper() == m and path in url: + return response + + # Default successful response + return make_response(200, {"success": True}) + + async def aclose(self): + """Close the mock client.""" + self.is_closed = True + + def get_calls_for(self, method: str, path_contains: str = "") -> list[dict]: + """Get all recorded calls matching method and optional path substring.""" + return [c for c in self.calls if c["method"].upper() == method.upper() and path_contains in c["url"]] + + def assert_called_with(self, method: str, path_contains: str): + """Assert that at least one call was made with the given method and path.""" + matching = self.get_calls_for(method, path_contains) + assert matching, f"No {method} call to '{path_contains}' found. Calls: {self.calls}" + + +@pytest.fixture +def mock_http_client(): + """Create a fresh mock HTTP client for each test.""" + return MockHTTPClient() + + +@pytest.fixture +def client_with_mock_http(mock_settings, mock_clusters_config, mock_http_client): + """Create an ACP client with mocked HTTP transport.""" + with patch("mcp_acp.client.load_settings", return_value=mock_settings): + with patch("mcp_acp.client.load_clusters_config", return_value=mock_clusters_config): + client = ACPClient() + client._http_client = mock_http_client + return client + + +# ── Test: List Tools ───────────────────────────────────────────────────────── + + +class TestListToolsE2E: + """Verify all expected tools are registered.""" + + @pytest.mark.asyncio + async def test_all_tools_registered(self): + """All 26 tools should be available.""" + tools = await list_tools() + assert len(tools) == 26 + + tool_names = {t.name for t in tools} + expected_tools = { + # Session management + "acp_list_sessions", + "acp_get_session", + "acp_create_session", + "acp_create_session_from_template", + "acp_delete_session", + "acp_restart_session", + "acp_clone_session", + "acp_update_session", + # Observability + "acp_get_session_logs", + "acp_get_session_transcript", + "acp_get_session_metrics", + # Labels + "acp_label_resource", + "acp_unlabel_resource", + "acp_list_sessions_by_label", + "acp_bulk_label_resources", + "acp_bulk_unlabel_resources", + # Bulk operations + "acp_bulk_delete_sessions", + "acp_bulk_stop_sessions", + "acp_bulk_restart_sessions", + "acp_bulk_delete_sessions_by_label", + "acp_bulk_stop_sessions_by_label", + "acp_bulk_restart_sessions_by_label", + # Cluster management + "acp_list_clusters", + "acp_whoami", + "acp_switch_cluster", + "acp_login", + } + assert tool_names == expected_tools + + +# ── Test: Session Management Tools ─────────────────────────────────────────── + + +class TestSessionManagementE2E: + """E2E tests for session management tools.""" + + @pytest.mark.asyncio + async def test_list_sessions(self, client_with_mock_http, mock_http_client): + """Test listing sessions with HTTP mock.""" + mock_http_client.set_response( + "GET", + "/v1/sessions", + make_response( + 200, + { + "items": [ + { + "id": "session-001", + "status": "running", + "displayName": "Test Session", + "createdAt": "2024-01-15T10:30:00Z", + }, + { + "id": "session-002", + "status": "stopped", + "displayName": "Stopped Session", + "createdAt": "2024-01-14T08:00:00Z", + }, + ] + }, + ), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool("acp_list_sessions", {"project": "test-project"}) + + assert len(result) == 1 + assert "session-001" in result[0].text + assert "session-002" in result[0].text + + @pytest.mark.asyncio + async def test_list_sessions_with_filters(self, client_with_mock_http, mock_http_client): + """Test listing sessions with status filter.""" + mock_http_client.set_response( + "GET", + "/v1/sessions", + make_response( + 200, + { + "items": [ + {"id": "running-session", "status": "running", "createdAt": "2024-01-15T10:30:00Z"}, + {"id": "stopped-session", "status": "stopped", "createdAt": "2024-01-14T08:00:00Z"}, + ] + }, + ), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool("acp_list_sessions", {"project": "test-project", "status": "running", "limit": 10}) + + assert len(result) == 1 + # Only running session should be shown after client-side filter + assert "running-session" in result[0].text + + @pytest.mark.asyncio + async def test_get_session(self, client_with_mock_http, mock_http_client): + """Test getting a single session.""" + mock_http_client.set_response( + "GET", + "/v1/sessions/session-001", + make_response( + 200, + { + "id": "session-001", + "status": "running", + "displayName": "My Test Session", + "model": "claude-sonnet-4", + "createdAt": "2024-01-15T10:30:00Z", + "task": "Fix the bug in auth module", + }, + ), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool("acp_get_session", {"project": "test-project", "session": "session-001"}) + + assert len(result) == 1 + assert "session-001" in result[0].text + assert "running" in result[0].text.lower() or "My Test Session" in result[0].text + + @pytest.mark.asyncio + async def test_create_session(self, client_with_mock_http, mock_http_client): + """Test creating a new session.""" + mock_http_client.set_response( + "POST", + "/v1/sessions", + make_response( + 201, + { + "id": "new-session-xyz", + "status": "creating", + "displayName": "Bug Fix Session", + }, + ), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_create_session", + { + "project": "test-project", + "initial_prompt": "Fix the authentication bug", + "display_name": "Bug Fix Session", + "model": "claude-sonnet-4", + }, + ) + + assert len(result) == 1 + assert "new-session-xyz" in result[0].text + + @pytest.mark.asyncio + async def test_create_session_dry_run(self, client_with_mock_http, mock_http_client): + """Test create session dry run mode.""" + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_create_session", + { + "project": "test-project", + "initial_prompt": "Test prompt", + "dry_run": True, + }, + ) + + assert len(result) == 1 + assert "dry" in result[0].text.lower() or "would" in result[0].text.lower() + # No HTTP calls should be made for dry run + assert len([c for c in mock_http_client.calls if "POST" in c["method"]]) == 0 + + @pytest.mark.asyncio + async def test_create_session_from_template(self, client_with_mock_http, mock_http_client): + """Test creating session from template.""" + mock_http_client.set_response( + "POST", + "/v1/sessions", + make_response(201, {"id": "template-session-abc"}), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_create_session_from_template", + { + "project": "test-project", + "template": "bugfix", + "display_name": "Fix auth bug", + }, + ) + + assert len(result) == 1 + assert "template-session-abc" in result[0].text + + @pytest.mark.asyncio + async def test_delete_session(self, client_with_mock_http, mock_http_client): + """Test deleting a session.""" + mock_http_client.set_response("DELETE", "/v1/sessions/session-to-delete", make_response(204)) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_delete_session", + {"project": "test-project", "session": "session-to-delete"}, + ) + + assert len(result) == 1 + assert "delete" in result[0].text.lower() or "success" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_delete_session_dry_run(self, client_with_mock_http, mock_http_client): + """Test delete session dry run.""" + mock_http_client.set_response( + "GET", + "/v1/sessions/session-001", + make_response(200, {"id": "session-001", "status": "running", "createdAt": "2024-01-15T10:30:00Z"}), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_delete_session", + {"project": "test-project", "session": "session-001", "dry_run": True}, + ) + + assert len(result) == 1 + assert "dry" in result[0].text.lower() or "would" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_restart_session(self, client_with_mock_http, mock_http_client): + """Test restarting a stopped session.""" + mock_http_client.set_response( + "PATCH", + "/v1/sessions/stopped-session", + make_response(200, {"id": "stopped-session", "status": "running"}), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_restart_session", + {"project": "test-project", "session": "stopped-session"}, + ) + + assert len(result) == 1 + assert "restart" in result[0].text.lower() or "success" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_clone_session(self, client_with_mock_http, mock_http_client): + """Test cloning a session.""" + # First GET to fetch source session config + mock_http_client.set_response( + "GET", + "/v1/sessions/source-session", + make_response( + 200, + { + "id": "source-session", + "task": "Original task", + "model": "claude-sonnet-4", + "repos": [{"url": "https://github.com/test/repo"}], + }, + ), + ) + # Then POST to create clone + mock_http_client.set_response( + "POST", + "/v1/sessions", + make_response(201, {"id": "cloned-session-xyz"}), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_clone_session", + { + "project": "test-project", + "source_session": "source-session", + "new_display_name": "My Clone", + }, + ) + + assert len(result) == 1 + assert "cloned-session-xyz" in result[0].text + + @pytest.mark.asyncio + async def test_update_session(self, client_with_mock_http, mock_http_client): + """Test updating session metadata.""" + mock_http_client.set_response( + "PATCH", + "/v1/sessions/session-001", + make_response(200, {"id": "session-001", "displayName": "New Name"}), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_update_session", + { + "project": "test-project", + "session": "session-001", + "display_name": "New Name", + "timeout": 3600, + }, + ) + + assert len(result) == 1 + assert "update" in result[0].text.lower() + + +# ── Test: Observability Tools ──────────────────────────────────────────────── + + +class TestObservabilityE2E: + """E2E tests for observability tools (logs, transcript, metrics).""" + + @pytest.mark.asyncio + async def test_get_session_logs(self, client_with_mock_http, mock_http_client): + """Test retrieving session logs.""" + log_content = """2024-01-15 10:30:00 INFO Starting session +2024-01-15 10:30:01 INFO Loading model claude-sonnet-4 +2024-01-15 10:30:05 INFO Processing initial prompt +2024-01-15 10:31:00 INFO Tool call: Read file.py""" + + mock_http_client.set_response( + "GET", + "/v1/sessions/session-001/logs", + make_response(200, None, log_content), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_get_session_logs", + {"project": "test-project", "session": "session-001", "tail_lines": 100}, + ) + + assert len(result) == 1 + assert "Starting session" in result[0].text or "session-001" in result[0].text + + @pytest.mark.asyncio + async def test_get_session_transcript(self, client_with_mock_http, mock_http_client): + """Test retrieving session transcript.""" + mock_http_client.set_response( + "GET", + "/v1/sessions/session-001/transcript", + make_response( + 200, + { + "messages": [ + {"role": "user", "content": "Fix the login bug"}, + {"role": "assistant", "content": "I'll analyze the auth module..."}, + ] + }, + ), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_get_session_transcript", + {"project": "test-project", "session": "session-001", "format": "json"}, + ) + + assert len(result) == 1 + assert "login bug" in result[0].text or "auth module" in result[0].text + + @pytest.mark.asyncio + async def test_get_session_metrics(self, client_with_mock_http, mock_http_client): + """Test retrieving session metrics.""" + mock_http_client.set_response( + "GET", + "/v1/sessions/session-001/metrics", + make_response( + 200, + { + "total_tokens": 15000, + "input_tokens": 5000, + "output_tokens": 10000, + "duration_seconds": 300, + "tool_calls": 25, + "api_calls": 8, + }, + ), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_get_session_metrics", + {"project": "test-project", "session": "session-001"}, + ) + + assert len(result) == 1 + # Check that metrics values appear in output + text = result[0].text + assert "15000" in text or "15,000" in text or "tokens" in text.lower() + + +# ── Test: Label Tools ──────────────────────────────────────────────────────── + + +class TestLabelToolsE2E: + """E2E tests for label management tools.""" + + @pytest.mark.asyncio + async def test_label_resource(self, client_with_mock_http, mock_http_client): + """Test adding labels to a session.""" + mock_http_client.set_response( + "PATCH", + "/v1/sessions/session-001", + make_response(200, {"id": "session-001", "labels": {"env": "prod", "team": "platform"}}), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_label_resource", + { + "project": "test-project", + "name": "session-001", + "labels": {"env": "prod", "team": "platform"}, + }, + ) + + assert len(result) == 1 + assert "label" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_unlabel_resource(self, client_with_mock_http, mock_http_client): + """Test removing labels from a session.""" + mock_http_client.set_response( + "PATCH", + "/v1/sessions/session-001", + make_response(200, {"id": "session-001"}), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_unlabel_resource", + { + "project": "test-project", + "name": "session-001", + "label_keys": ["env", "team"], + }, + ) + + assert len(result) == 1 + assert "label" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_list_sessions_by_label(self, client_with_mock_http, mock_http_client): + """Test listing sessions filtered by labels.""" + mock_http_client.set_response( + "GET", + "/v1/sessions", + make_response( + 200, + { + "items": [ + {"id": "labeled-session-1", "status": "running", "createdAt": "2024-01-15T10:30:00Z"}, + {"id": "labeled-session-2", "status": "stopped", "createdAt": "2024-01-14T08:00:00Z"}, + ] + }, + ), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_list_sessions_by_label", + {"project": "test-project", "labels": {"env": "prod"}}, + ) + + assert len(result) == 1 + assert "labeled-session" in result[0].text + + +# ── Test: Bulk Operations ──────────────────────────────────────────────────── + + +class TestBulkOperationsE2E: + """E2E tests for bulk operation tools.""" + + @pytest.mark.asyncio + async def test_bulk_delete_requires_confirm(self, client_with_mock_http): + """Test that bulk delete requires confirmation.""" + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_delete_sessions", + {"project": "test-project", "sessions": ["s1", "s2"]}, + ) + + assert "requires confirm=true" in result[0].text + + @pytest.mark.asyncio + async def test_bulk_delete_with_confirm(self, client_with_mock_http, mock_http_client): + """Test bulk delete with confirmation.""" + mock_http_client.set_response("DELETE", "/v1/sessions/s1", make_response(204)) + mock_http_client.set_response("DELETE", "/v1/sessions/s2", make_response(204)) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_delete_sessions", + {"project": "test-project", "sessions": ["s1", "s2"], "confirm": True}, + ) + + assert len(result) == 1 + assert "deleted" in result[0].text.lower() or "2" in result[0].text + + @pytest.mark.asyncio + async def test_bulk_delete_dry_run(self, client_with_mock_http, mock_http_client): + """Test bulk delete dry run mode.""" + mock_http_client.set_response( + "GET", "/v1/sessions/s1", make_response(200, {"id": "s1", "status": "running", "createdAt": "2024-01-01"}) + ) + mock_http_client.set_response( + "GET", "/v1/sessions/s2", make_response(200, {"id": "s2", "status": "stopped", "createdAt": "2024-01-01"}) + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_delete_sessions", + {"project": "test-project", "sessions": ["s1", "s2"], "dry_run": True}, + ) + + assert len(result) == 1 + # Dry run should not require confirm + assert "requires confirm" not in result[0].text + + @pytest.mark.asyncio + async def test_bulk_stop_sessions(self, client_with_mock_http, mock_http_client): + """Test bulk stop sessions.""" + mock_http_client.set_response("PATCH", "/v1/sessions/s1", make_response(200, {"id": "s1", "status": "stopped"})) + mock_http_client.set_response("PATCH", "/v1/sessions/s2", make_response(200, {"id": "s2", "status": "stopped"})) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_stop_sessions", + {"project": "test-project", "sessions": ["s1", "s2"], "confirm": True}, + ) + + assert len(result) == 1 + assert "stopped" in result[0].text.lower() or "2" in result[0].text + + @pytest.mark.asyncio + async def test_bulk_restart_sessions(self, client_with_mock_http, mock_http_client): + """Test bulk restart sessions.""" + mock_http_client.set_response("PATCH", "/v1/sessions/s1", make_response(200, {"id": "s1", "status": "running"})) + mock_http_client.set_response("PATCH", "/v1/sessions/s2", make_response(200, {"id": "s2", "status": "running"})) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_restart_sessions", + {"project": "test-project", "sessions": ["s1", "s2"], "confirm": True}, + ) + + assert len(result) == 1 + assert "restart" in result[0].text.lower() or "2" in result[0].text + + @pytest.mark.asyncio + async def test_bulk_label_resources(self, client_with_mock_http, mock_http_client): + """Test bulk labeling.""" + mock_http_client.set_response("PATCH", "/v1/sessions/s1", make_response(200, {"id": "s1"})) + mock_http_client.set_response("PATCH", "/v1/sessions/s2", make_response(200, {"id": "s2"})) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_label_resources", + { + "project": "test-project", + "sessions": ["s1", "s2"], + "labels": {"env": "test"}, + "confirm": True, + }, + ) + + assert len(result) == 1 + + @pytest.mark.asyncio + async def test_bulk_unlabel_resources(self, client_with_mock_http, mock_http_client): + """Test bulk unlabeling.""" + mock_http_client.set_response("PATCH", "/v1/sessions/s1", make_response(200, {"id": "s1"})) + mock_http_client.set_response("PATCH", "/v1/sessions/s2", make_response(200, {"id": "s2"})) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_unlabel_resources", + { + "project": "test-project", + "sessions": ["s1", "s2"], + "label_keys": ["env"], + "confirm": True, + }, + ) + + assert len(result) == 1 + + +# ── Test: Bulk Operations by Label ─────────────────────────────────────────── + + +class TestBulkByLabelE2E: + """E2E tests for bulk operations filtered by label.""" + + @pytest.mark.asyncio + async def test_bulk_delete_by_label(self, client_with_mock_http, mock_http_client): + """Test bulk delete sessions matching labels.""" + # List sessions by label + mock_http_client.set_response( + "GET", + "/v1/sessions", + make_response( + 200, + { + "items": [ + {"id": "labeled-1", "status": "stopped", "createdAt": "2024-01-01"}, + {"id": "labeled-2", "status": "stopped", "createdAt": "2024-01-01"}, + ] + }, + ), + ) + mock_http_client.set_response("DELETE", "/v1/sessions/labeled-1", make_response(204)) + mock_http_client.set_response("DELETE", "/v1/sessions/labeled-2", make_response(204)) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_delete_sessions_by_label", + {"project": "test-project", "labels": {"env": "test"}, "confirm": True}, + ) + + assert len(result) == 1 + assert "deleted" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_bulk_stop_by_label(self, client_with_mock_http, mock_http_client): + """Test bulk stop sessions matching labels.""" + mock_http_client.set_response( + "GET", + "/v1/sessions", + make_response(200, {"items": [{"id": "running-1", "status": "running", "createdAt": "2024-01-01"}]}), + ) + mock_http_client.set_response( + "PATCH", "/v1/sessions/running-1", make_response(200, {"id": "running-1", "status": "stopped"}) + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_stop_sessions_by_label", + {"project": "test-project", "labels": {"team": "qa"}, "confirm": True}, + ) + + assert len(result) == 1 + assert "stopped" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_bulk_restart_by_label(self, client_with_mock_http, mock_http_client): + """Test bulk restart sessions matching labels.""" + mock_http_client.set_response( + "GET", + "/v1/sessions", + make_response(200, {"items": [{"id": "stopped-1", "status": "stopped", "createdAt": "2024-01-01"}]}), + ) + mock_http_client.set_response( + "PATCH", "/v1/sessions/stopped-1", make_response(200, {"id": "stopped-1", "status": "running"}) + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_restart_sessions_by_label", + {"project": "test-project", "labels": {"priority": "high"}, "confirm": True}, + ) + + assert len(result) == 1 + assert "restart" in result[0].text.lower() + + +# ── Test: Cluster Management Tools ─────────────────────────────────────────── + + +class TestClusterManagementE2E: + """E2E tests for cluster management tools.""" + + @pytest.mark.asyncio + async def test_list_clusters(self, client_with_mock_http): + """Test listing configured clusters.""" + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool("acp_list_clusters", {}) + + assert len(result) == 1 + assert "test-cluster" in result[0].text + + @pytest.mark.asyncio + async def test_whoami(self, client_with_mock_http): + """Test whoami shows current config.""" + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool("acp_whoami", {}) + + assert len(result) == 1 + text = result[0].text.lower() + assert "cluster" in text or "authenticated" in text or "test" in text + + @pytest.mark.asyncio + async def test_switch_cluster(self, client_with_mock_http): + """Test switching cluster context.""" + # Add a second cluster to switch to + second_cluster = MagicMock() + second_cluster.server = "https://api.prod.example.com" + second_cluster.default_project = "prod-project" + second_cluster.token = "prod-token" + client_with_mock_http.clusters_config.clusters["prod-cluster"] = second_cluster + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool("acp_switch_cluster", {"cluster": "prod-cluster"}) + + assert len(result) == 1 + assert "switched" in result[0].text.lower() or "prod" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_switch_cluster_unknown(self, client_with_mock_http): + """Test switching to unknown cluster fails gracefully.""" + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool("acp_switch_cluster", {"cluster": "nonexistent"}) + + assert len(result) == 1 + assert "unknown" in result[0].text.lower() or "not found" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_login(self, client_with_mock_http): + """Test login to cluster.""" + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_login", + {"cluster": "test-cluster", "token": "new-auth-token"}, + ) + + assert len(result) == 1 + assert "authenticated" in result[0].text.lower() or "success" in result[0].text.lower() + + +# ── Test: Error Handling ───────────────────────────────────────────────────── + + +class TestErrorHandlingE2E: + """E2E tests for error handling scenarios.""" + + @pytest.mark.asyncio + async def test_http_error_response(self, client_with_mock_http, mock_http_client): + """Test handling of HTTP error responses.""" + mock_http_client.set_response( + "GET", + "/v1/sessions/nonexistent", + make_response(404, {"error": "Session not found"}), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_get_session", + {"project": "test-project", "session": "nonexistent"}, + ) + + assert len(result) == 1 + assert "error" in result[0].text.lower() or "not found" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_unknown_tool(self, client_with_mock_http): + """Test calling an unknown tool.""" + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool("acp_unknown_tool", {}) + + assert len(result) == 1 + assert "unknown tool" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_validation_error_invalid_project(self, client_with_mock_http): + """Test validation error for invalid project name.""" + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_list_sessions", + {"project": "INVALID_PROJECT_NAME!@#"}, + ) + + assert len(result) == 1 + # Should show validation error about invalid characters + assert "error" in result[0].text.lower() or "invalid" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_bulk_operation_limit_exceeded(self, client_with_mock_http): + """Test that bulk operations enforce max item limit.""" + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_delete_sessions", + { + "project": "test-project", + "sessions": ["s1", "s2", "s3", "s4", "s5"], # More than max 3 + "confirm": True, + }, + ) + + assert len(result) == 1 + # Should fail with limit error + assert "limit" in result[0].text.lower() or "3" in result[0].text + + +# ── Test: Project Auto-fill ────────────────────────────────────────────────── + + +class TestProjectAutoFillE2E: + """E2E tests for automatic project filling from cluster config.""" + + @pytest.mark.asyncio + async def test_project_auto_filled_from_cluster(self, client_with_mock_http, mock_http_client): + """Test that project is auto-filled when not provided.""" + mock_http_client.set_response( + "GET", + "/v1/sessions", + make_response(200, {"items": []}), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + # Call without project - should use default from cluster config + result = await call_tool("acp_list_sessions", {}) + + assert len(result) == 1 + # Verify the request used the default project from cluster config + assert any("test-project" in str(call.get("kwargs", {}).get("headers", {})) for call in mock_http_client.calls) + + +# ── Test: Complete Workflow ────────────────────────────────────────────────── + + +class TestCompleteWorkflowE2E: + """E2E tests simulating complete user workflows.""" + + @pytest.mark.asyncio + async def test_session_lifecycle(self, client_with_mock_http, mock_http_client): + """Test complete session lifecycle: create -> get -> update -> stop -> delete.""" + # 1. Create session + mock_http_client.set_response("POST", "/v1/sessions", make_response(201, {"id": "lifecycle-session"})) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + create_result = await call_tool( + "acp_create_session", + {"project": "test-project", "initial_prompt": "Test workflow"}, + ) + assert "lifecycle-session" in create_result[0].text + + # 2. Get session details + mock_http_client.set_response( + "GET", + "/v1/sessions/lifecycle-session", + make_response( + 200, {"id": "lifecycle-session", "status": "running", "displayName": "Test", "createdAt": "2024-01-01"} + ), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + get_result = await call_tool( + "acp_get_session", + {"project": "test-project", "session": "lifecycle-session"}, + ) + assert "lifecycle-session" in get_result[0].text + + # 3. Update session + mock_http_client.set_response( + "PATCH", + "/v1/sessions/lifecycle-session", + make_response(200, {"id": "lifecycle-session", "displayName": "Updated Name"}), + ) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + update_result = await call_tool( + "acp_update_session", + {"project": "test-project", "session": "lifecycle-session", "display_name": "Updated Name"}, + ) + assert "update" in update_result[0].text.lower() + + # 4. Delete session + mock_http_client.set_response("DELETE", "/v1/sessions/lifecycle-session", make_response(204)) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + delete_result = await call_tool( + "acp_delete_session", + {"project": "test-project", "session": "lifecycle-session"}, + ) + assert "delete" in delete_result[0].text.lower() or "success" in delete_result[0].text.lower() + + +# ── Test: Additional Edge Cases ────────────────────────────────────────────── + + +class TestAdditionalEdgeCasesE2E: + """Additional edge case tests for comprehensive coverage.""" + + @pytest.mark.asyncio + async def test_create_session_with_repos(self, client_with_mock_http, mock_http_client): + """Test creating a session with repository URLs.""" + mock_http_client.set_response("POST", "/v1/sessions", make_response(201, {"id": "repo-session"})) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_create_session", + { + "project": "test-project", + "initial_prompt": "Review the code", + "repos": ["https://github.com/org/repo1", "https://github.com/org/repo2"], + }, + ) + + assert len(result) == 1 + assert "repo-session" in result[0].text + mock_http_client.assert_called_with("POST", "/v1/sessions") + + @pytest.mark.asyncio + async def test_invalid_template_name(self, client_with_mock_http): + """Test validation error for invalid template name.""" + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_create_session_from_template", + { + "project": "test-project", + "template": "nonexistent-template", + "display_name": "Test", + }, + ) + + assert len(result) == 1 + assert "unknown template" in result[0].text.lower() or "error" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_update_session_no_fields(self, client_with_mock_http): + """Test update session fails when no fields provided.""" + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_update_session", + {"project": "test-project", "session": "session-001"}, + ) + + assert len(result) == 1 + assert "no fields" in result[0].text.lower() or "error" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_bulk_by_label_no_matches(self, client_with_mock_http, mock_http_client): + """Test bulk operation by label when no sessions match.""" + mock_http_client.set_response("GET", "/v1/sessions", make_response(200, {"items": []})) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool( + "acp_bulk_delete_sessions_by_label", + {"project": "test-project", "labels": {"env": "nonexistent"}, "confirm": True}, + ) + + assert len(result) == 1 + assert "no sessions" in result[0].text.lower() or "0" in result[0].text + + @pytest.mark.asyncio + async def test_empty_sessions_list(self, client_with_mock_http, mock_http_client): + """Test listing sessions returns empty list gracefully.""" + mock_http_client.set_response("GET", "/v1/sessions", make_response(200, {"items": []})) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + result = await call_tool("acp_list_sessions", {"project": "test-project"}) + + assert len(result) == 1 + assert "0" in result[0].text or "no" in result[0].text.lower() + + @pytest.mark.asyncio + async def test_delete_verifies_http_call(self, client_with_mock_http, mock_http_client): + """Test delete session makes correct HTTP call.""" + mock_http_client.set_response("DELETE", "/v1/sessions/verify-delete", make_response(204)) + + with patch("mcp_acp.server.get_client", return_value=client_with_mock_http): + await call_tool( + "acp_delete_session", + {"project": "test-project", "session": "verify-delete"}, + ) + + mock_http_client.assert_called_with("DELETE", "/v1/sessions/verify-delete") diff --git a/tests/test_tracing.py b/tests/test_tracing.py new file mode 100644 index 0000000..725ff88 --- /dev/null +++ b/tests/test_tracing.py @@ -0,0 +1,344 @@ +"""Tests for Langfuse tracing module.""" + +import os +from unittest.mock import MagicMock, patch + +import pytest + +from mcp_acp.tracing import ( + _NoOpContext, + _SpanContext, + _TraceContext, + flush, + get_langfuse, + shutdown, + trace_http_request, + trace_tool_call, +) + + +@pytest.fixture(autouse=True) +def _reset_tracing_state(): + """Reset the tracing module singleton state between tests.""" + import mcp_acp.tracing as tracing_mod + + tracing_mod._langfuse_client = None + tracing_mod._langfuse_init_attempted = False + yield + tracing_mod._langfuse_client = None + tracing_mod._langfuse_init_attempted = False + + +class TestIsTracingEnabled: + """Tests for the kill switch env var.""" + + def test_enabled_by_default(self) -> None: + """Tracing is enabled when env var is not set.""" + with patch.dict(os.environ, {}, clear=False): + os.environ.pop("MCP_ACP_TRACING_ENABLED", None) + from mcp_acp.tracing import _is_tracing_enabled + + assert _is_tracing_enabled() is True + + def test_disabled_when_false(self) -> None: + """Tracing is disabled when env var is 'false'.""" + with patch.dict(os.environ, {"MCP_ACP_TRACING_ENABLED": "false"}): + from mcp_acp.tracing import _is_tracing_enabled + + assert _is_tracing_enabled() is False + + def test_disabled_when_no(self) -> None: + """Tracing is disabled when env var is 'no'.""" + with patch.dict(os.environ, {"MCP_ACP_TRACING_ENABLED": "no"}): + from mcp_acp.tracing import _is_tracing_enabled + + assert _is_tracing_enabled() is False + + def test_enabled_when_true(self) -> None: + """Tracing is enabled when env var is 'true'.""" + with patch.dict(os.environ, {"MCP_ACP_TRACING_ENABLED": "true"}): + from mcp_acp.tracing import _is_tracing_enabled + + assert _is_tracing_enabled() is True + + +class TestGetLangfuse: + """Tests for get_langfuse singleton.""" + + def test_returns_none_when_disabled(self) -> None: + """Returns None when tracing is disabled via env var.""" + with patch.dict(os.environ, {"MCP_ACP_TRACING_ENABLED": "false"}): + assert get_langfuse() is None + + def test_returns_none_when_keys_missing(self) -> None: + """Returns None when Langfuse credentials are not set.""" + env = {"MCP_ACP_TRACING_ENABLED": "true"} + with patch.dict(os.environ, env, clear=False): + os.environ.pop("LANGFUSE_PUBLIC_KEY", None) + os.environ.pop("LANGFUSE_SECRET_KEY", None) + assert get_langfuse() is None + + def test_returns_client_when_configured(self) -> None: + """Returns a Langfuse client when credentials are set.""" + mock_langfuse = MagicMock() + env = { + "MCP_ACP_TRACING_ENABLED": "true", + "LANGFUSE_PUBLIC_KEY": "pk-test", + "LANGFUSE_SECRET_KEY": "sk-test", + "LANGFUSE_BASE_URL": "https://langfuse.test", + } + with ( + patch.dict(os.environ, env, clear=False), + patch("mcp_acp.tracing.Langfuse", return_value=mock_langfuse, create=True) as mock_cls, + ): + # Patch the import inside get_langfuse + with patch.dict("sys.modules", {"langfuse": MagicMock(Langfuse=mock_cls)}): + result = get_langfuse() + assert result is mock_langfuse + + def test_returns_none_on_init_error(self) -> None: + """Returns None when Langfuse SDK raises during init.""" + import mcp_acp.tracing as mod + + # Reset module state + mod._langfuse_init_attempted = False + mod._langfuse_client = None + + env = { + "MCP_ACP_TRACING_ENABLED": "true", + "LANGFUSE_PUBLIC_KEY": "pk-test", + "LANGFUSE_SECRET_KEY": "sk-test", + } + # Create a mock langfuse module that raises on Langfuse() + mock_langfuse_module = MagicMock() + mock_langfuse_module.Langfuse = MagicMock(side_effect=RuntimeError("init failed")) + + with patch.dict(os.environ, env, clear=False): + with patch.dict("sys.modules", {"langfuse": mock_langfuse_module}): + result = mod.get_langfuse() + # Should return None, not raise + assert result is None + + def test_caches_result(self) -> None: + """Only attempts initialization once.""" + with patch.dict(os.environ, {"MCP_ACP_TRACING_ENABLED": "false"}): + result1 = get_langfuse() + result2 = get_langfuse() + assert result1 is None + assert result2 is None + + +class TestTraceToolCall: + """Tests for trace_tool_call context manager.""" + + @pytest.mark.asyncio + async def test_yields_noop_when_disabled(self) -> None: + """Yields a no-op context when Langfuse is not available.""" + with patch("mcp_acp.tracing.get_langfuse", return_value=None): + async with trace_tool_call("test_tool", {"key": "value"}) as ctx: + assert isinstance(ctx, _NoOpContext) + # Should not raise + ctx.set_output({"status": "success"}) + ctx.set_response(200) + + @pytest.mark.asyncio + async def test_creates_span_when_enabled(self) -> None: + """Creates a Langfuse span with tool name and args.""" + mock_span = MagicMock() + mock_span.id = "span-123" + mock_lf = MagicMock() + mock_lf.start_span.return_value = mock_span + + with patch("mcp_acp.tracing.get_langfuse", return_value=mock_lf): + async with trace_tool_call("test_tool", {"arg": "val"}, {"project": "test"}) as ctx: + assert isinstance(ctx, _TraceContext) + + # Verify span was created + mock_lf.start_span.assert_called_once() + call_kwargs = mock_lf.start_span.call_args + assert call_kwargs.kwargs["name"] == "test_tool" + assert call_kwargs.kwargs["input"]["tool"] == "test_tool" + assert call_kwargs.kwargs["input"]["arguments"] == {"arg": "val"} + + @pytest.mark.asyncio + async def test_records_explicit_output(self) -> None: + """Records output when set_output is called explicitly.""" + mock_span = MagicMock() + mock_span.id = "span-123" + mock_lf = MagicMock() + mock_lf.start_span.return_value = mock_span + + with patch("mcp_acp.tracing.get_langfuse", return_value=mock_lf): + async with trace_tool_call("test_tool", {}) as ctx: + ctx.set_output({"status": "validation_error", "error": "bad input"}) + + # The explicit set_output should have called span.update and span.end + update_calls = [c for c in mock_span.update.call_args_list if "level" in (c.kwargs or {})] + assert len(update_calls) >= 1 + last_call = update_calls[-1] + assert last_call.kwargs["level"] == "ERROR" + mock_span.end.assert_called() + + @pytest.mark.asyncio + async def test_records_success_when_no_explicit_output(self) -> None: + """Auto-records success when context exits without set_output.""" + mock_span = MagicMock() + mock_span.id = "span-123" + mock_lf = MagicMock() + mock_lf.start_span.return_value = mock_span + + with patch("mcp_acp.tracing.get_langfuse", return_value=mock_lf): + async with trace_tool_call("test_tool", {}): + pass # No explicit set_output + + # Should auto-record success + mock_span.update.assert_called_once() + call_kwargs = mock_span.update.call_args + assert call_kwargs.kwargs["output"]["status"] == "success" + assert call_kwargs.kwargs["level"] == "DEFAULT" + mock_span.end.assert_called_once() + + +class TestTraceHttpRequest: + """Tests for trace_http_request context manager.""" + + @pytest.mark.asyncio + async def test_yields_noop_when_no_parent_span(self) -> None: + """Yields a no-op context when there is no parent span in contextvars.""" + async with trace_http_request("GET", "/v1/sessions") as ctx: + assert isinstance(ctx, _NoOpContext) + ctx.set_response(200) + + @pytest.mark.asyncio + async def test_creates_child_span_with_parent(self) -> None: + """Creates a child span when a parent span exists in contextvars.""" + mock_child_span = MagicMock() + mock_child_span.id = "child-456" + mock_parent_span = MagicMock() + mock_parent_span.id = "parent-123" + mock_parent_span.start_span.return_value = mock_child_span + mock_lf = MagicMock() + mock_lf.start_span.return_value = mock_parent_span + + with patch("mcp_acp.tracing.get_langfuse", return_value=mock_lf): + async with trace_tool_call("test_tool", {}) as _trace_ctx: + async with trace_http_request("GET", "/v1/sessions", {"limit": "10"}) as span_ctx: + assert isinstance(span_ctx, _SpanContext) + span_ctx.set_response(200) + + # Verify child span was created on parent + mock_parent_span.start_span.assert_called_once() + span_kwargs = mock_parent_span.start_span.call_args.kwargs + assert span_kwargs["name"] == "GET /v1/sessions" + + @pytest.mark.asyncio + async def test_records_status_code(self) -> None: + """Records HTTP status code on the span.""" + mock_child_span = MagicMock() + mock_child_span.id = "child-456" + mock_parent_span = MagicMock() + mock_parent_span.id = "parent-123" + mock_parent_span.start_span.return_value = mock_child_span + mock_lf = MagicMock() + mock_lf.start_span.return_value = mock_parent_span + + with patch("mcp_acp.tracing.get_langfuse", return_value=mock_lf): + async with trace_tool_call("test_tool", {}): + async with trace_http_request("GET", "/v1/sessions") as span_ctx: + span_ctx.set_response(404) + + # Verify span.update was called with WARNING level for 404 + mock_child_span.update.assert_called_once() + update_kwargs = mock_child_span.update.call_args.kwargs + assert update_kwargs["output"]["status_code"] == 404 + assert update_kwargs["level"] == "WARNING" + mock_child_span.end.assert_called() + + @pytest.mark.asyncio + async def test_records_server_error_level(self) -> None: + """Records ERROR level for 5xx status codes.""" + mock_child_span = MagicMock() + mock_child_span.id = "child-456" + mock_parent_span = MagicMock() + mock_parent_span.id = "parent-123" + mock_parent_span.start_span.return_value = mock_child_span + mock_lf = MagicMock() + mock_lf.start_span.return_value = mock_parent_span + + with patch("mcp_acp.tracing.get_langfuse", return_value=mock_lf): + async with trace_tool_call("test_tool", {}): + async with trace_http_request("POST", "/v1/sessions") as span_ctx: + span_ctx.set_response(500) + + update_kwargs = mock_child_span.update.call_args.kwargs + assert update_kwargs["level"] == "ERROR" + + +class TestNoOpContext: + """Tests for _NoOpContext.""" + + def test_set_output_is_noop(self) -> None: + """set_output does nothing.""" + ctx = _NoOpContext() + ctx.set_output({"status": "success"}) + + def test_set_response_is_noop(self) -> None: + """set_response does nothing.""" + ctx = _NoOpContext() + ctx.set_response(200) + + +class TestFlushAndShutdown: + """Tests for flush and shutdown lifecycle functions.""" + + def test_flush_calls_client_flush(self) -> None: + """flush() calls the Langfuse client's flush method.""" + import mcp_acp.tracing as mod + + mock_client = MagicMock() + mod._langfuse_client = mock_client + + flush() + mock_client.flush.assert_called_once() + + def test_flush_noop_when_no_client(self) -> None: + """flush() is a no-op when no client exists.""" + flush() # Should not raise + + def test_flush_handles_error(self) -> None: + """flush() catches exceptions from the client.""" + import mcp_acp.tracing as mod + + mock_client = MagicMock() + mock_client.flush.side_effect = RuntimeError("flush failed") + mod._langfuse_client = mock_client + + flush() # Should not raise + + def test_shutdown_calls_client_shutdown(self) -> None: + """shutdown() calls the Langfuse client's shutdown method.""" + import mcp_acp.tracing as mod + + mock_client = MagicMock() + mod._langfuse_client = mock_client + + shutdown() + mock_client.shutdown.assert_called_once() + assert mod._langfuse_client is None + assert mod._langfuse_init_attempted is False + + def test_shutdown_noop_when_no_client(self) -> None: + """shutdown() is a no-op when no client exists.""" + shutdown() # Should not raise + + def test_shutdown_handles_error(self) -> None: + """shutdown() catches exceptions and still resets state.""" + import mcp_acp.tracing as mod + + mock_client = MagicMock() + mock_client.shutdown.side_effect = RuntimeError("shutdown failed") + mod._langfuse_client = mock_client + + shutdown() # Should not raise + assert mod._langfuse_client is None + assert mod._langfuse_init_attempted is False