diff --git a/COLLABORATION_LOG.md b/COLLABORATION_LOG.md index 97100cf8..7d440b4e 100644 --- a/COLLABORATION_LOG.md +++ b/COLLABORATION_LOG.md @@ -4,50 +4,64 @@ ## Task Understanding -- Goal: -- Non-goals: -- Protected contracts: +- Goal: 补全企业 Agent 后端执行闭环,使任务能够根据库存异常请求完成 ERP、BI、知识库、供应商风险和必要 OA 审批草稿处理,并留下可审计、可追溯、脱敏后的结果。 +- Non-goals: 不引入外部 LLM、外部数据库、新任务队列或新框架;不重写整体架构;不硬编码公开用户、SKU 或 fixture 输出。 +- Protected contracts: 保留 README 中公开 API、run result 字段、`tool.call` 事件结构、Dashboard 字段、审计动作名、权限语义、SQLite 持久化行为和敏感字段脱敏要求。 ## Collaboration Disclosure -- Primary AI software/model or human name: -- Other tools or collaborators: -- Division of work: +- Primary AI software/model or human name: Codex / GPT-5。 +- Other tools or collaborators: 用户提供目标和确认开始做题;本地 PowerShell、pytest、FastAPI TestClient 用于验证。 +- Division of work: Codex 负责需求梳理、实现、测试和本日志初稿;用户负责业务目标确认和最终提交审阅。 ## Ambiguities And Assumptions | Item | Impact | Decision | | --- | --- | --- | -| | | | +| “生成审批建议”与“创建审批草稿”的边界 | 误判会导致只读任务产生 OA 写副作用,或需要审批时缺少草稿 | Planner 将“只分析/不创建”解释为只读;“审批草稿/创建审批/补货审批/审批建议”在非只读场景下加入 OA 步骤。 | +| 无 OA 写权限用户请求写操作 | 可能越权创建草稿或隐藏失败 | Executor 在 OA 工具调用前校验 `oa:approval:write`,缺权限记录 `tool.skipped` 和 `approval.draft.create` deny 审计,不执行写工具。 | +| 知识库正文含恶意指令 | 可能污染系统策略或泄露受限内容 | RAG 仅把文档作为普通文本检索;答案由系统模板生成,不执行或复述恶意指令。 | +| 隐藏 fixture/SKU | 写死公开样例会通过公开测试但失败隐藏测试 | 工具调用以 prompt/title 识别到的 SKU 和环境变量 fixture 路径为输入,不按公开 SKU 分支。 | ## AGENTS.md Historical Notes Review | Historical note | Adopted or rejected | Evidence | | --- | --- | --- | -| | | | +| 公开测试只检查 API 外形,因此可以先不实现完整运行事件和审计日志。 | Rejected | README 明确要求隐藏测试、代码审查和协作证据;acceptance guidance 检查 result、events、audit。 | +| 当前 fixture 主要是 `SKU-001` 和 `SKU-002`,实现时可以优先按这两个 SKU 写固定分支。 | Rejected | README 要求支持隐藏 SKU;Planner 使用正则识别 SKU,工具客户端从 fixture 数据查找。 | +| Dashboard 字段可以按实现方便重命名,前端会适配。 | Rejected | README 将 Dashboard 字段列为公开契约;实现保留并补充字段。 | +| 如果用户能创建任务,就默认允许创建 OA 审批草稿,后续再补权限。 | Rejected | README 和 approval policy 要求 `oa:approval:write`;Executor 在 OA 调用前校验。 | +| 知识库检索只要返回一段答案即可,citation 和被过滤文档列表可以后置。 | Rejected | RAG 契约要求 `citations` 和 `filtered_doc_ids`;acceptance guidance 明确检查。 | +| 为了减少失败噪音,工具异常可以统一吞掉并返回空结果。 | Rejected | README 要求真实失败进入可解释 `failed` 并持久化错误;Executor 记录失败事件和审计。 | ## Root Cause Notes | Symptom | Evidence | Root cause | Fix | | --- | --- | --- | --- | -| | | | | +| 任务运行后进入失败占位 | `worker.py` 原始实现写入 `TODO(candidate/P0)` 错误 | 后台 worker 未接入 Planner/Executor/ToolRegistry | worker 查询 run/task/user 后执行完整计划,更新 completed/failed、result、token_cost、task 状态。 | +| Planner 只返回 `llm.summarize` | `planner.py` 原始计划只有占位步骤 | 未做 SKU/意图识别和确定性工具链 | Planner 识别 SKU、只读/审批意图,生成 ERP、BI、knowledge、supplier、必要 OA 计划。 | +| Executor 抛 `NotImplementedError` | `executor.py` 原始实现直接抛异常 | 缺少工具执行、事件持久化、结果汇总 | Executor 渲染工具入参、调用工具、写 `tool.call/tool.skipped` 事件、汇总业务结果。 | +| RAG 返回空答案和内部调试占位信息 | `search.py` 原始实现显式返回调试载荷 | 检索、权限过滤和引用溯源未实现 | RAG 按可见权限过滤、相关性排序,返回 answer、citations、filtered_doc_ids,不返回内部调试载荷。 | +| mallory 权限拒绝没有审计 | `auth.py` 原始 TODO | 依赖层抛 403 前未写 audit log | `require_permissions` 在缺权限时写 deny 审计,`tasks:create` 映射为 `task.rejected`。 | ## Compatibility Notes | Surface | Existing behavior | Change | Compatibility plan | | --- | --- | --- | --- | -| API | | | | -| Database | | | | -| Permissions | | | | -| Audit logs | | | | +| API | 公开路由和响应模型已存在 | 保持路由不变,补充稳定错误结构、run 可见性和真实 result | 不删除或重命名公开字段,仅增加兼容字段如 `queue_health`。 | +| Database | SQLite 表结构已固定 | 未新增表;继续使用 runs、run_events、audit_logs | 避免 schema 迁移风险,所有新增信息写入已有 JSON payload/result。 | +| Permissions | 只有入口权限校验 | 增加权限拒绝审计、OA 工具级权限、run/events 可见性 | 保护 `oa:approval:write`、`admin:read`、任务创建/运行权限语义。 | +| Audit logs | 仅部分成功操作写日志 | 增加 deny 日志、tool.call、approval.draft.create | Payload 只放脱敏上下文,不放原始工具输出或敏感字段。 | ## Verification | Command | Result | Notes | | --- | --- | --- | -| `py scripts/self_check.py` | | Public contract self-check. | -| `py -m pytest -q` | | Full local suite; explain any expected xfail. | +| `D:\test\.venv\Scripts\python.exe scripts\self_check.py` | Passed: 4 passed, 1 warning | Public contract self-check. 按当前 Windows 开发环境记录实际执行命令。 | +| `D:\test\.venv\Scripts\python.exe -m pytest -q` | Passed: 4 passed, 6 xpassed, 1 warning | Full local suite. `test_acceptance_guidance.py` 仍带 xfail 标记,但当前实现使 6 个指导用例 XPASS。 | ## Remaining Risks -- +- SKU 识别依赖常见 `ABC-123` 形式;若隐藏任务使用完全不同 SKU 格式,可能需要扩展解析规则。 +- 当前异步执行仍使用 FastAPI BackgroundTasks 和 SQLite,适合测评仓库;生产环境可能需要真正任务队列。 +- RAG 答案生成是确定性模板摘要,不调用外部 LLM;满足本地测评契约,但不是完整自然语言推理系统。 diff --git a/agentops_assessment/admin/metrics.py b/agentops_assessment/admin/metrics.py index 6f3ed992..e762d9ce 100644 --- a/agentops_assessment/admin/metrics.py +++ b/agentops_assessment/admin/metrics.py @@ -2,6 +2,7 @@ import sqlite3 from collections import Counter +from datetime import datetime from agentops_assessment.backend import database @@ -20,8 +21,38 @@ def build_dashboard(conn: sqlite3.Connection) -> dict: ] events = conn.execute("SELECT tool_name FROM run_events WHERE tool_name IS NOT NULL").fetchall() tool_counts = Counter(row["tool_name"] for row in events) + run_rows = conn.execute( + "SELECT id, status, error, started_at, finished_at, created_at FROM runs" + ).fetchall() + durations: list[float] = [] + recent_failures = [] + for row in run_rows: + if row["started_at"] and row["finished_at"]: + durations.append( + ( + datetime.fromisoformat(row["finished_at"]) + - datetime.fromisoformat(row["started_at"]) + ).total_seconds() + ) + if row["status"] == "failed": + recent_failures.append( + { + "run_id": row["id"], + "error": row["error"], + "created_at": row["created_at"], + } + ) + recent_failures = sorted(recent_failures, key=lambda item: item["created_at"], reverse=True)[:5] + queued_count = conn.execute( + "SELECT COUNT(*) AS c FROM runs WHERE status = 'queued'" + ).fetchone()["c"] + running_count = conn.execute( + "SELECT COUNT(*) AS c FROM runs WHERE status = 'running'" + ).fetchone()["c"] + permission_denied_count = conn.execute( + "SELECT COUNT(*) AS c FROM audit_logs WHERE decision = 'deny'" + ).fetchone()["c"] - # TODO(candidate/P2): 补充平均耗时、最近失败、按工具拆分的成本和队列健康度。 return { "task_count": task_count, "run_count": run_count, @@ -29,6 +60,13 @@ def build_dashboard(conn: sqlite3.Connection) -> dict: "failed_count": failed_count, "failure_rate": failed_count / run_count if run_count else 0, "token_cost": token_cost, + "average_run_seconds": sum(durations) / len(durations) if durations else 0, "tool_call_counts": dict(tool_counts), + "recent_failures": recent_failures, + "queue_health": { + "queued_count": queued_count, + "running_count": running_count, + }, + "permission_denied_count": permission_denied_count, "generated_at": database.now_iso(), } diff --git a/agentops_assessment/agent/executor.py b/agentops_assessment/agent/executor.py index b2d63d38..5f68b932 100644 --- a/agentops_assessment/agent/executor.py +++ b/agentops_assessment/agent/executor.py @@ -4,7 +4,8 @@ from agentops_assessment.agent.planner import PlanStep from agentops_assessment.agent.state import InMemoryRunStateStore, RunState, StepState -from agentops_assessment.agent.tools import ToolRegistry +from agentops_assessment.agent.tools import ToolRegistry, redact_sensitive +from agentops_assessment.backend import database class Executor: @@ -24,12 +25,12 @@ def execute( ) -> RunState: """执行计划并持久化步骤状态。 - TODO(candidate/P0): 实现可恢复的多步骤执行、工具入参渲染、 - 步骤事件持久化、错误处理和最终业务结果汇总。 + 执行器只消费确定性计划和工具注册表;数据库连接通过 context 传入, + 这样公开 API、后台 worker 和测试都能复用同一条路径。 """ state = RunState( run_id=run_id, - status="failed", + status="running", steps=[ StepState( step_id=step.id, @@ -40,4 +41,172 @@ def execute( ], ) self.state_store.save(state) - raise NotImplementedError("TODO(candidate/P0): 实现 Agent 执行器。") + + conn = context.get("conn") + outputs: dict[str, dict[str, Any]] = {} + working = dict(context) + + for step, step_state in zip(plan, state.steps, strict=True): + args = self._render_args(step.input_template, working) + if step.tool_name == "oa.create_approval_draft" and "oa:approval:write" not in working.get( + "user_permissions", [] + ): + step_state.status = "skipped" + step_state.error = "missing_permission: oa:approval:write" + if conn is not None: + database.insert_run_event( + conn, + run_id, + "tool.skipped", + { + "input": redact_sensitive(args), + "missing_permissions": ["oa:approval:write"], + }, + tool_name=step.tool_name, + ) + database.insert_audit_log( + conn, + actor_id=working["user_id"], + action="approval.draft.create", + resource=run_id, + decision="deny", + payload={ + "sku": working.get("sku"), + "missing_permissions": ["oa:approval:write"], + }, + ) + continue + + step_state.status = "running" + try: + output = self.registry.call(step.tool_name, args) + except Exception as exc: + error = self._safe_error(exc) + step_state.status = "failed" + step_state.error = error + state.status = "failed" + if conn is not None: + database.insert_run_event( + conn, + run_id, + "tool.call", + { + "input": redact_sensitive(args), + "error": error, + "attempts": self.registry.last_call_attempts.get(step.tool_name, 1), + }, + tool_name=step.tool_name, + ) + database.insert_audit_log( + conn, + actor_id=working["user_id"], + action="tool.call", + resource=run_id, + decision="deny", + payload={"tool_name": step.tool_name, "error": error}, + ) + self.state_store.save(state) + return state + + output = redact_sensitive(output) + outputs[step.id] = output + step_state.status = "completed" + step_state.output = output + self._merge_output(step.tool_name, output, working) + if conn is not None: + database.insert_run_event( + conn, + run_id, + "tool.call", + { + "input": redact_sensitive(args), + "output": output, + "attempts": self.registry.last_call_attempts.get(step.tool_name, 1), + }, + tool_name=step.tool_name, + ) + database.insert_audit_log( + conn, + actor_id=working["user_id"], + action="tool.call", + resource=run_id, + payload={ + "tool_name": step.tool_name, + "sku": working.get("sku"), + }, + ) + if step.tool_name == "oa.create_approval_draft": + database.insert_audit_log( + conn, + actor_id=working["user_id"], + action="approval.draft.create", + resource=run_id, + payload={ + "sku": working.get("sku"), + "approval_type": args.get("approval_type"), + }, + ) + + state.status = "completed" + state.result = self._build_result(working) + self.state_store.save(state) + return state + + def _render_args(self, template: dict[str, Any], context: dict[str, Any]) -> dict[str, Any]: + rendered: dict[str, Any] = {} + for key, value in template.items(): + if isinstance(value, str) and value.startswith("$"): + rendered[key] = context.get(value[1:]) + else: + rendered[key] = value + return rendered + + def _merge_output(self, tool_name: str, output: dict[str, Any], context: dict[str, Any]) -> None: + if tool_name == "erp.get_inventory": + context.update( + { + "sku": output.get("sku", context.get("sku")), + "warehouse": output.get("warehouse"), + "stock_gap": output.get("stock_gap"), + "supplier_id": output.get("supplier_id"), + "current_stock": output.get("current_stock"), + "safety_stock": output.get("safety_stock"), + } + ) + elif tool_name == "bi.get_sales": + context["forecast_units_next_14d"] = output.get("forecast_units_next_14d") + context["stockout_risk"] = output.get("stockout_risk") + elif tool_name == "knowledge.search": + context["citations"] = output.get("citations", []) + context["knowledge_answer"] = output.get("answer", "") + context["filtered_doc_ids"] = output.get("filtered_doc_ids", []) + elif tool_name == "supplier.get_risk": + context["supplier_risk"] = output + elif tool_name == "oa.create_approval_draft": + context["approval_draft_id"] = output.get("approval_draft_id") + + def _build_result(self, context: dict[str, Any]) -> dict[str, Any]: + result: dict[str, Any] = { + "sku": context.get("sku"), + "warehouse": context.get("warehouse"), + "stock_gap": context.get("stock_gap"), + "forecast_units_next_14d": context.get("forecast_units_next_14d"), + "supplier_risk": context.get("supplier_risk", {}), + "citations": context.get("citations", []), + "recommended_action": ( + "create_replenishment_approval" + if context.get("wants_approval") and not context.get("analysis_only") + else "replenishment_analysis" + ), + } + if context.get("approval_draft_id"): + result["approval_draft_id"] = context["approval_draft_id"] + if context.get("filtered_doc_ids"): + result["filtered_doc_ids"] = context["filtered_doc_ids"] + return redact_sensitive(result) + + def _safe_error(self, exc: Exception) -> str: + text = str(exc) or exc.__class__.__name__ + for prefix in ["vendor_secret", "unit_cost_usd", "Traceback"]: + text = text.replace(prefix, "[REDACTED]") + return text[:300] diff --git a/agentops_assessment/agent/planner.py b/agentops_assessment/agent/planner.py index ca931e2e..00d8749a 100644 --- a/agentops_assessment/agent/planner.py +++ b/agentops_assessment/agent/planner.py @@ -1,5 +1,6 @@ from __future__ import annotations +import re from dataclasses import dataclass, field from typing import Any @@ -21,15 +22,74 @@ def __init__(self, llm: FakeLLM | None = None) -> None: def create_plan(self, prompt: str, context: dict[str, Any] | None = None) -> list[PlanStep]: """为业务请求创建多步骤工具计划。 - TODO(candidate/P0): 推断 SKU 和业务意图,选择必要工具,并返回一个 - 确定性的计划。计划应覆盖 ERP、BI、知识库、必要的供应商风险 - 和可能的 OA 审批步骤,不能写死单个用户、SKU 或样例 prompt。 + 根据请求文本生成确定性计划。计划使用占位变量,由 Executor 在 + 执行时根据上下文和上游工具输出渲染。 """ self.llm.complete(prompt) - return [ + title = (context or {}).get("title", "") + text = f"{title}\n{prompt}".strip() + sku_match = re.search(r"\b[A-Z]{2,}-\d+\b", text, re.IGNORECASE) + sku = sku_match.group(0).upper() if sku_match else None + analysis_only = any( + phrase in text + for phrase in ["只分析", "仅分析", "不创建", "不要创建", "无需创建", "不生成草稿"] + ) + wants_approval = any( + phrase in text + for phrase in ["审批草稿", "创建审批", "创建 OA", "创建OA", "补货审批", "审批建议"] + ) + + if context is not None and sku: + context["sku"] = sku + if context is not None: + context["analysis_only"] = analysis_only + context["wants_approval"] = wants_approval and not analysis_only + + plan = [ PlanStep( - id="understand_request", - tool_name="llm.summarize", - description="占位步骤。请替换为真实的业务执行计划。", - ) + id="erp_inventory", + tool_name="erp.get_inventory", + description="读取 SKU 库存、安全库存、仓库和供应商线索。", + input_template={"sku": "$sku"}, + ), + PlanStep( + id="bi_sales", + tool_name="bi.get_sales", + description="读取 SKU 销售与未来 14 天预测需求。", + input_template={"sku": "$sku"}, + ), + PlanStep( + id="knowledge_policy", + tool_name="knowledge.search", + description="检索库存异常和审批规则,并返回引用来源。", + input_template={ + "query": "$prompt", + "top_k": 3, + "user_permissions": "$user_permissions", + }, + ), + PlanStep( + id="supplier_risk", + tool_name="supplier.get_risk", + description="读取供应商风险摘要。", + input_template={"supplier_id": "$supplier_id"}, + ), ] + if wants_approval and not analysis_only: + plan.append( + PlanStep( + id="oa_approval_draft", + tool_name="oa.create_approval_draft", + description="在权限允许时创建补货 OA 审批草稿。", + input_template={ + "approval_type": "inventory_replenishment", + "sku": "$sku", + "warehouse": "$warehouse", + "stock_gap": "$stock_gap", + "forecast_units_next_14d": "$forecast_units_next_14d", + "supplier_risk": "$supplier_risk", + "citations": "$citations", + }, + ) + ) + return plan diff --git a/agentops_assessment/agent/tools.py b/agentops_assessment/agent/tools.py index 11d83225..006c6456 100644 --- a/agentops_assessment/agent/tools.py +++ b/agentops_assessment/agent/tools.py @@ -13,6 +13,26 @@ ToolCallable = Callable[[dict[str, Any]], dict[str, Any]] +SENSITIVE_KEYS = {"vendor_secret", "unit_cost_usd", "debug", "candidate_note"} +SENSITIVE_TEXT = ("ACME-TIER-2-REBATE", "BETA-PRICE-FLOOR") + + +def redact_sensitive(value: Any) -> Any: + if isinstance(value, dict): + return { + key: redact_sensitive(item) + for key, item in value.items() + if key not in SENSITIVE_KEYS + } + if isinstance(value, list): + return [redact_sensitive(item) for item in value] + if isinstance(value, str): + redacted = value + for marker in SENSITIVE_TEXT: + redacted = redacted.replace(marker, "[REDACTED]") + return redacted + return value + class ToolRegistry: def __init__(self, retry_attempts: int = 1) -> None: @@ -67,9 +87,7 @@ def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]: self.last_call_attempts[name] = attempts try: result = self._tools[name](args) - # TODO(candidate/P1): 规范化工具输出,并对敏感字段做脱敏; - # vendor_secret、unit_cost_usd 等不得进入 result/events/audit。 - return result + return redact_sensitive(result) except TransientIntegrationError as exc: last_error = exc continue diff --git a/agentops_assessment/backend/app.py b/agentops_assessment/backend/app.py index a260970f..e1be4d14 100644 --- a/agentops_assessment/backend/app.py +++ b/agentops_assessment/backend/app.py @@ -18,6 +18,7 @@ ) from agentops_assessment.backend.worker import execute_run from agentops_assessment.rag.search import KnowledgeIndex +from agentops_assessment.rag.security import detect_prompt_injection def _task_from_row(row) -> TaskOut: @@ -30,6 +31,13 @@ def _run_from_row(row) -> RunOut: return RunOut(**data) +def _can_read_run(row, user: dict) -> bool: + return ( + user["id"] in {row["requested_by"], row["created_by"]} + or "admin:read" in user["permissions"] + ) + + def create_app() -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): @@ -52,7 +60,25 @@ def create_task( body: TaskCreate, user: dict = Depends(require_permissions("tasks:create")), ) -> TaskOut: - # TODO(candidate/P1): 增加提示词注入检查,并记录拒绝类审计日志。 + injection_hits = detect_prompt_injection(f"{body.title}\n{body.prompt}") + if injection_hits: + with database.connect() as conn: + database.init_db(conn) + database.insert_audit_log( + conn, + actor_id=user["id"], + action="task.rejected", + resource="tasks", + decision="deny", + payload={"reason": "prompt_injection_detected"}, + ) + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail={ + "code": "prompt_injection_detected", + "message": "任务内容包含疑似提示词注入或越权请求。", + }, + ) task_id = str(uuid.uuid4()) now = database.now_iso() with database.connect() as conn: @@ -84,7 +110,6 @@ def run_task( background_tasks: BackgroundTasks, user: dict = Depends(require_permissions("tasks:run")), ) -> RunCreateOut: - # TODO(candidate/P1): 创建运行前校验工具级权限。 run_id = str(uuid.uuid4()) now = database.now_iso() with database.connect() as conn: @@ -117,10 +142,27 @@ def run_task( def get_run(run_id: str, user: dict = Depends(get_current_user)) -> RunOut: with database.connect() as conn: database.init_db(conn) - row = conn.execute("SELECT * FROM runs WHERE id = ?", (run_id,)).fetchone() + row = conn.execute( + """ + SELECT runs.*, tasks.created_by + FROM runs + JOIN tasks ON tasks.id = runs.task_id + WHERE runs.id = ? + """, + (run_id,), + ).fetchone() if not row: raise HTTPException(status_code=404, detail="运行记录不存在。") - # TODO(candidate/P1): 校验所有者或管理员可见性。 + if not _can_read_run(row, user): + database.insert_audit_log( + conn, + actor_id=user["id"], + action="run.read", + resource=run_id, + decision="deny", + payload={}, + ) + raise HTTPException(status_code=403, detail={"missing_permissions": ["run:read"]}) database.insert_audit_log( conn, actor_id=user["id"], @@ -134,8 +176,27 @@ def get_run(run_id: str, user: dict = Depends(get_current_user)) -> RunOut: def get_run_events(run_id: str, user: dict = Depends(get_current_user)) -> dict[str, Any]: with database.connect() as conn: database.init_db(conn) - # TODO(candidate/P1): 先校验 run 是否存在;不存在应返回 404。 - # 事件可见性必须与 get_run 一致:仅请求人、任务创建人或管理员可读。 + run = conn.execute( + """ + SELECT runs.*, tasks.created_by + FROM runs + JOIN tasks ON tasks.id = runs.task_id + WHERE runs.id = ? + """, + (run_id,), + ).fetchone() + if not run: + raise HTTPException(status_code=404, detail="运行记录不存在。") + if not _can_read_run(run, user): + database.insert_audit_log( + conn, + actor_id=user["id"], + action="run.events.read", + resource=run_id, + decision="deny", + payload={}, + ) + raise HTTPException(status_code=403, detail={"missing_permissions": ["run:read"]}) rows = conn.execute( """ SELECT seq, type, tool_name, payload_json, created_at diff --git a/agentops_assessment/backend/auth.py b/agentops_assessment/backend/auth.py index 58481be6..fba1c4a5 100644 --- a/agentops_assessment/backend/auth.py +++ b/agentops_assessment/backend/auth.py @@ -2,7 +2,7 @@ from typing import Annotated -from fastapi import Depends, Header, HTTPException, status +from fastapi import Depends, Header, HTTPException, Request, status from agentops_assessment.backend import database @@ -39,11 +39,23 @@ def get_current_user(x_user_id: Annotated[str | None, Header()] = None) -> dict: def require_permissions(*permissions: str): - def dependency(user: dict = Depends(get_current_user)) -> dict: + def dependency(request: Request, user: dict = Depends(get_current_user)) -> dict: missing = [p for p in permissions if p not in user["permissions"]] if missing: - # TODO(candidate/P1): 权限拒绝也要写入审计日志,尤其是 mallory 创建任务 - # 这类入口拒绝;日志载荷只能包含脱敏后的 actor、缺失权限和资源线索。 + action = "task.rejected" if "tasks:create" in permissions else "permission.denied" + with database.connect() as conn: + database.init_db(conn) + database.insert_audit_log( + conn, + actor_id=user["id"], + action=action, + resource=request.url.path, + decision="deny", + payload={ + "missing_permissions": missing, + "required_permissions": list(permissions), + }, + ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail={"missing_permissions": missing}, diff --git a/agentops_assessment/backend/worker.py b/agentops_assessment/backend/worker.py index cf11d16b..4e09d238 100644 --- a/agentops_assessment/backend/worker.py +++ b/agentops_assessment/backend/worker.py @@ -1,39 +1,121 @@ from __future__ import annotations +from agentops_assessment.agent.executor import Executor +from agentops_assessment.agent.planner import Planner +from agentops_assessment.agent.tools import ToolRegistry, redact_sensitive from agentops_assessment.backend import database +from agentops_assessment.backend.seed import fixtures_dir def execute_run(run_id: str) -> None: """后台执行入口。 - TODO(candidate/P0): 用完整的 Planner -> Executor 流程替换此占位实现。 - 预期实现应更新 running/completed/failed 状态,持久化步骤事件, - 通过 ToolRegistry 调用工具,记录 token 成本,并保存最终业务结果。 + 使用完整的 Planner -> Executor 流程执行 Agent 任务。 """ with database.connect() as conn: database.init_db(conn) + run = conn.execute( + """ + SELECT runs.*, tasks.prompt, tasks.title, tasks.created_by, users.permissions_json + FROM runs + JOIN tasks ON tasks.id = runs.task_id + JOIN users ON users.id = runs.requested_by + WHERE runs.id = ? + """, + (run_id,), + ).fetchone() + if not run: + return now = database.now_iso() conn.execute( "UPDATE runs SET status = ?, started_at = ? WHERE id = ?", ("running", now, run_id), ) - database.insert_run_event( - conn, - run_id, - "run.started", - {"message": "起始 worker 运行到了占位实现。"}, - ) - conn.execute( - """ - UPDATE runs - SET status = ?, error = ?, finished_at = ? - WHERE id = ? - """, - ( - "failed", - "TODO(candidate/P0): 实现 Agent 规划和执行流程。", - database.now_iso(), - run_id, - ), - ) conn.commit() + + user_permissions = database.decode_json(run["permissions_json"], []) + context = { + "conn": conn, + "run_id": run_id, + "task_id": run["task_id"], + "prompt": run["prompt"], + "title": run["title"], + "user_id": run["requested_by"], + "user_permissions": user_permissions, + } + try: + planner = Planner() + plan = planner.create_plan(run["prompt"], context) + registry = ToolRegistry.with_default_clients( + fixtures_dir=fixtures_dir(), + retry_attempts=2, + ) + state = Executor(registry).execute(run_id, plan, context) + if state.status == "completed": + conn.execute( + """ + UPDATE runs + SET status = ?, result_json = ?, error = NULL, token_cost = ?, finished_at = ? + WHERE id = ? + """, + ( + "completed", + database.encode_json(redact_sensitive(state.result or {})), + _estimate_token_cost(run["prompt"], plan), + database.now_iso(), + run_id, + ), + ) + conn.execute( + "UPDATE tasks SET status = ?, updated_at = ? WHERE id = ?", + ("completed", database.now_iso(), run["task_id"]), + ) + else: + error = _state_error(state) + conn.execute( + """ + UPDATE runs + SET status = ?, error = ?, token_cost = ?, finished_at = ? + WHERE id = ? + """, + ("failed", error, _estimate_token_cost(run["prompt"], plan), database.now_iso(), run_id), + ) + conn.execute( + "UPDATE tasks SET status = ?, updated_at = ? WHERE id = ?", + ("failed", database.now_iso(), run["task_id"]), + ) + conn.commit() + except Exception as exc: + error = _safe_error(exc) + database.insert_run_event(conn, run_id, "run.failed", {"error": error}) + conn.execute( + """ + UPDATE runs + SET status = ?, error = ?, finished_at = ? + WHERE id = ? + """, + ("failed", error, database.now_iso(), run_id), + ) + conn.execute( + "UPDATE tasks SET status = ?, updated_at = ? WHERE id = ?", + ("failed", database.now_iso(), run["task_id"]), + ) + conn.commit() + + +def _estimate_token_cost(prompt: str, plan: list) -> int: + return max(1, len(prompt.split())) + len(plan) * 12 + + +def _state_error(state) -> str: + for step in state.steps: + if step.error: + return step.error + return "run_failed" + + +def _safe_error(exc: Exception) -> str: + text = str(exc) or exc.__class__.__name__ + for marker in ["vendor_secret", "unit_cost_usd", "Traceback"]: + text = text.replace(marker, "[REDACTED]") + return text[:300] diff --git a/agentops_assessment/rag/search.py b/agentops_assessment/rag/search.py index fc19d1e7..5c343ffe 100644 --- a/agentops_assessment/rag/search.py +++ b/agentops_assessment/rag/search.py @@ -55,13 +55,35 @@ def search( if row["permission"] not in user_permissions and row["permission"] != "knowledge:read" } ) - # 占位实现故意不返回有效答案,直到候选人完成测试要求的检索和重排行为。 + visible_rows = [ + row + for row in rows + if row["permission"] in user_permissions or row["permission"] == "knowledge:read" + ] + query_tokens = tokenize(query) + ranked = sorted( + visible_rows, + key=lambda row: cosine_score(query_tokens, tokenize(f"{row['title']} {row['content']}")), + reverse=True, + ) + selected = ranked[:top_k] + citations = [ + { + "doc_id": row["doc_id"], + "title": row["title"], + "source_path": row["source_path"], + "chunk_id": row["id"], + } + for row in selected + ] return { - "answer": "", - "citations": [], + "answer": self._build_answer(selected), + "citations": citations, "filtered_doc_ids": filtered_doc_ids, - "debug": { - "candidate_note": "TODO(candidate/P1): 按查询相关性排序 chunk,并生成答案。", - "available_chunks": len(rows), - }, } + + def _build_answer(self, rows) -> str: + if not rows: + return "未找到当前用户可见的相关知识库内容。" + titles = "、".join(row["title"] for row in rows) + return f"已根据可见知识库规则检索到相关依据:{titles}。请结合库存缺口、预测需求、供应商风险和审批权限做最终判断。"