Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions COLLABORATION_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,64 @@

## Task Understanding

- Goal:
- Non-goals:
- Protected contracts:
- Goal: 补全企业 Agent 后端执行闭环,使任务能够根据库存异常请求完成 ERP、BI、知识库、供应商风险和必要 OA 审批草稿处理,并留下可审计、可追溯、脱敏后的结果。
- Non-goals: 不引入外部 LLM、外部数据库、新任务队列或新框架;不重写整体架构;不硬编码公开用户、SKU 或 fixture 输出。
- Protected contracts: 保留 README 中公开 API、run result 字段、`tool.call` 事件结构、Dashboard 字段、审计动作名、权限语义、SQLite 持久化行为和敏感字段脱敏要求。

## Collaboration Disclosure

- Primary AI software/model or human name:
- Other tools or collaborators:
- Division of work:
- Primary AI software/model or human name: Codex / GPT-5。
- Other tools or collaborators: 用户提供目标和确认开始做题;本地 PowerShell、pytest、FastAPI TestClient 用于验证。
- Division of work: Codex 负责需求梳理、实现、测试和本日志初稿;用户负责业务目标确认和最终提交审阅。

## Ambiguities And Assumptions

| Item | Impact | Decision |
| --- | --- | --- |
| | | |
| “生成审批建议”与“创建审批草稿”的边界 | 误判会导致只读任务产生 OA 写副作用,或需要审批时缺少草稿 | Planner 将“只分析/不创建”解释为只读;“审批草稿/创建审批/补货审批/审批建议”在非只读场景下加入 OA 步骤。 |
| 无 OA 写权限用户请求写操作 | 可能越权创建草稿或隐藏失败 | Executor 在 OA 工具调用前校验 `oa:approval:write`,缺权限记录 `tool.skipped` 和 `approval.draft.create` deny 审计,不执行写工具。 |
| 知识库正文含恶意指令 | 可能污染系统策略或泄露受限内容 | RAG 仅把文档作为普通文本检索;答案由系统模板生成,不执行或复述恶意指令。 |
| 隐藏 fixture/SKU | 写死公开样例会通过公开测试但失败隐藏测试 | 工具调用以 prompt/title 识别到的 SKU 和环境变量 fixture 路径为输入,不按公开 SKU 分支。 |

## AGENTS.md Historical Notes Review

| Historical note | Adopted or rejected | Evidence |
| --- | --- | --- |
| | | |
| 公开测试只检查 API 外形,因此可以先不实现完整运行事件和审计日志。 | Rejected | README 明确要求隐藏测试、代码审查和协作证据;acceptance guidance 检查 result、events、audit。 |
| 当前 fixture 主要是 `SKU-001` 和 `SKU-002`,实现时可以优先按这两个 SKU 写固定分支。 | Rejected | README 要求支持隐藏 SKU;Planner 使用正则识别 SKU,工具客户端从 fixture 数据查找。 |
| Dashboard 字段可以按实现方便重命名,前端会适配。 | Rejected | README 将 Dashboard 字段列为公开契约;实现保留并补充字段。 |
| 如果用户能创建任务,就默认允许创建 OA 审批草稿,后续再补权限。 | Rejected | README 和 approval policy 要求 `oa:approval:write`;Executor 在 OA 调用前校验。 |
| 知识库检索只要返回一段答案即可,citation 和被过滤文档列表可以后置。 | Rejected | RAG 契约要求 `citations` 和 `filtered_doc_ids`;acceptance guidance 明确检查。 |
| 为了减少失败噪音,工具异常可以统一吞掉并返回空结果。 | Rejected | README 要求真实失败进入可解释 `failed` 并持久化错误;Executor 记录失败事件和审计。 |

## Root Cause Notes

| Symptom | Evidence | Root cause | Fix |
| --- | --- | --- | --- |
| | | | |
| 任务运行后进入失败占位 | `worker.py` 原始实现写入 `TODO(candidate/P0)` 错误 | 后台 worker 未接入 Planner/Executor/ToolRegistry | worker 查询 run/task/user 后执行完整计划,更新 completed/failed、result、token_cost、task 状态。 |
| Planner 只返回 `llm.summarize` | `planner.py` 原始计划只有占位步骤 | 未做 SKU/意图识别和确定性工具链 | Planner 识别 SKU、只读/审批意图,生成 ERP、BI、knowledge、supplier、必要 OA 计划。 |
| Executor 抛 `NotImplementedError` | `executor.py` 原始实现直接抛异常 | 缺少工具执行、事件持久化、结果汇总 | Executor 渲染工具入参、调用工具、写 `tool.call/tool.skipped` 事件、汇总业务结果。 |
| RAG 返回空答案和内部调试占位信息 | `search.py` 原始实现显式返回调试载荷 | 检索、权限过滤和引用溯源未实现 | RAG 按可见权限过滤、相关性排序,返回 answer、citations、filtered_doc_ids,不返回内部调试载荷。 |
| mallory 权限拒绝没有审计 | `auth.py` 原始 TODO | 依赖层抛 403 前未写 audit log | `require_permissions` 在缺权限时写 deny 审计,`tasks:create` 映射为 `task.rejected`。 |

## Compatibility Notes

| Surface | Existing behavior | Change | Compatibility plan |
| --- | --- | --- | --- |
| API | | | |
| Database | | | |
| Permissions | | | |
| Audit logs | | | |
| API | 公开路由和响应模型已存在 | 保持路由不变,补充稳定错误结构、run 可见性和真实 result | 不删除或重命名公开字段,仅增加兼容字段如 `queue_health`。 |
| Database | SQLite 表结构已固定 | 未新增表;继续使用 runs、run_events、audit_logs | 避免 schema 迁移风险,所有新增信息写入已有 JSON payload/result。 |
| Permissions | 只有入口权限校验 | 增加权限拒绝审计、OA 工具级权限、run/events 可见性 | 保护 `oa:approval:write`、`admin:read`、任务创建/运行权限语义。 |
| Audit logs | 仅部分成功操作写日志 | 增加 deny 日志、tool.call、approval.draft.create | Payload 只放脱敏上下文,不放原始工具输出或敏感字段。 |

## Verification

| Command | Result | Notes |
| --- | --- | --- |
| `py scripts/self_check.py` | | Public contract self-check. |
| `py -m pytest -q` | | Full local suite; explain any expected xfail. |
| `D:\test\.venv\Scripts\python.exe scripts\self_check.py` | Passed: 4 passed, 1 warning | Public contract self-check. 按当前 Windows 开发环境记录实际执行命令。 |
| `D:\test\.venv\Scripts\python.exe -m pytest -q` | Passed: 4 passed, 6 xpassed, 1 warning | Full local suite. `test_acceptance_guidance.py` 仍带 xfail 标记,但当前实现使 6 个指导用例 XPASS。 |

## Remaining Risks

-
- SKU 识别依赖常见 `ABC-123` 形式;若隐藏任务使用完全不同 SKU 格式,可能需要扩展解析规则。
- 当前异步执行仍使用 FastAPI BackgroundTasks 和 SQLite,适合测评仓库;生产环境可能需要真正任务队列。
- RAG 答案生成是确定性模板摘要,不调用外部 LLM;满足本地测评契约,但不是完整自然语言推理系统。
40 changes: 39 additions & 1 deletion agentops_assessment/admin/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import sqlite3
from collections import Counter
from datetime import datetime

from agentops_assessment.backend import database

Expand All @@ -20,15 +21,52 @@ def build_dashboard(conn: sqlite3.Connection) -> dict:
]
events = conn.execute("SELECT tool_name FROM run_events WHERE tool_name IS NOT NULL").fetchall()
tool_counts = Counter(row["tool_name"] for row in events)
run_rows = conn.execute(
"SELECT id, status, error, started_at, finished_at, created_at FROM runs"
).fetchall()
durations: list[float] = []
recent_failures = []
for row in run_rows:
if row["started_at"] and row["finished_at"]:
durations.append(
(
datetime.fromisoformat(row["finished_at"])
- datetime.fromisoformat(row["started_at"])
).total_seconds()
)
if row["status"] == "failed":
recent_failures.append(
{
"run_id": row["id"],
"error": row["error"],
"created_at": row["created_at"],
}
)
recent_failures = sorted(recent_failures, key=lambda item: item["created_at"], reverse=True)[:5]
queued_count = conn.execute(
"SELECT COUNT(*) AS c FROM runs WHERE status = 'queued'"
).fetchone()["c"]
running_count = conn.execute(
"SELECT COUNT(*) AS c FROM runs WHERE status = 'running'"
).fetchone()["c"]
permission_denied_count = conn.execute(
"SELECT COUNT(*) AS c FROM audit_logs WHERE decision = 'deny'"
).fetchone()["c"]

# TODO(candidate/P2): 补充平均耗时、最近失败、按工具拆分的成本和队列健康度。
return {
"task_count": task_count,
"run_count": run_count,
"completed_count": completed_count,
"failed_count": failed_count,
"failure_rate": failed_count / run_count if run_count else 0,
"token_cost": token_cost,
"average_run_seconds": sum(durations) / len(durations) if durations else 0,
"tool_call_counts": dict(tool_counts),
"recent_failures": recent_failures,
"queue_health": {
"queued_count": queued_count,
"running_count": running_count,
},
"permission_denied_count": permission_denied_count,
"generated_at": database.now_iso(),
}
179 changes: 174 additions & 5 deletions agentops_assessment/agent/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from agentops_assessment.agent.planner import PlanStep
from agentops_assessment.agent.state import InMemoryRunStateStore, RunState, StepState
from agentops_assessment.agent.tools import ToolRegistry
from agentops_assessment.agent.tools import ToolRegistry, redact_sensitive
from agentops_assessment.backend import database


class Executor:
Expand All @@ -24,12 +25,12 @@ def execute(
) -> RunState:
"""执行计划并持久化步骤状态。

TODO(candidate/P0): 实现可恢复的多步骤执行、工具入参渲染、
步骤事件持久化、错误处理和最终业务结果汇总
执行器只消费确定性计划和工具注册表;数据库连接通过 context 传入,
这样公开 API、后台 worker 和测试都能复用同一条路径
"""
state = RunState(
run_id=run_id,
status="failed",
status="running",
steps=[
StepState(
step_id=step.id,
Expand All @@ -40,4 +41,172 @@ def execute(
],
)
self.state_store.save(state)
raise NotImplementedError("TODO(candidate/P0): 实现 Agent 执行器。")

conn = context.get("conn")
outputs: dict[str, dict[str, Any]] = {}
working = dict(context)

for step, step_state in zip(plan, state.steps, strict=True):
args = self._render_args(step.input_template, working)
if step.tool_name == "oa.create_approval_draft" and "oa:approval:write" not in working.get(
"user_permissions", []
):
step_state.status = "skipped"
step_state.error = "missing_permission: oa:approval:write"
if conn is not None:
database.insert_run_event(
conn,
run_id,
"tool.skipped",
{
"input": redact_sensitive(args),
"missing_permissions": ["oa:approval:write"],
},
tool_name=step.tool_name,
)
database.insert_audit_log(
conn,
actor_id=working["user_id"],
action="approval.draft.create",
resource=run_id,
decision="deny",
payload={
"sku": working.get("sku"),
"missing_permissions": ["oa:approval:write"],
},
)
continue

step_state.status = "running"
try:
output = self.registry.call(step.tool_name, args)
except Exception as exc:
error = self._safe_error(exc)
step_state.status = "failed"
step_state.error = error
state.status = "failed"
if conn is not None:
database.insert_run_event(
conn,
run_id,
"tool.call",
{
"input": redact_sensitive(args),
"error": error,
"attempts": self.registry.last_call_attempts.get(step.tool_name, 1),
},
tool_name=step.tool_name,
)
database.insert_audit_log(
conn,
actor_id=working["user_id"],
action="tool.call",
resource=run_id,
decision="deny",
payload={"tool_name": step.tool_name, "error": error},
)
self.state_store.save(state)
return state

output = redact_sensitive(output)
outputs[step.id] = output
step_state.status = "completed"
step_state.output = output
self._merge_output(step.tool_name, output, working)
if conn is not None:
database.insert_run_event(
conn,
run_id,
"tool.call",
{
"input": redact_sensitive(args),
"output": output,
"attempts": self.registry.last_call_attempts.get(step.tool_name, 1),
},
tool_name=step.tool_name,
)
database.insert_audit_log(
conn,
actor_id=working["user_id"],
action="tool.call",
resource=run_id,
payload={
"tool_name": step.tool_name,
"sku": working.get("sku"),
},
)
if step.tool_name == "oa.create_approval_draft":
database.insert_audit_log(
conn,
actor_id=working["user_id"],
action="approval.draft.create",
resource=run_id,
payload={
"sku": working.get("sku"),
"approval_type": args.get("approval_type"),
},
)

state.status = "completed"
state.result = self._build_result(working)
self.state_store.save(state)
return state

def _render_args(self, template: dict[str, Any], context: dict[str, Any]) -> dict[str, Any]:
rendered: dict[str, Any] = {}
for key, value in template.items():
if isinstance(value, str) and value.startswith("$"):
rendered[key] = context.get(value[1:])
else:
rendered[key] = value
return rendered

def _merge_output(self, tool_name: str, output: dict[str, Any], context: dict[str, Any]) -> None:
if tool_name == "erp.get_inventory":
context.update(
{
"sku": output.get("sku", context.get("sku")),
"warehouse": output.get("warehouse"),
"stock_gap": output.get("stock_gap"),
"supplier_id": output.get("supplier_id"),
"current_stock": output.get("current_stock"),
"safety_stock": output.get("safety_stock"),
}
)
elif tool_name == "bi.get_sales":
context["forecast_units_next_14d"] = output.get("forecast_units_next_14d")
context["stockout_risk"] = output.get("stockout_risk")
elif tool_name == "knowledge.search":
context["citations"] = output.get("citations", [])
context["knowledge_answer"] = output.get("answer", "")
context["filtered_doc_ids"] = output.get("filtered_doc_ids", [])
elif tool_name == "supplier.get_risk":
context["supplier_risk"] = output
elif tool_name == "oa.create_approval_draft":
context["approval_draft_id"] = output.get("approval_draft_id")

def _build_result(self, context: dict[str, Any]) -> dict[str, Any]:
result: dict[str, Any] = {
"sku": context.get("sku"),
"warehouse": context.get("warehouse"),
"stock_gap": context.get("stock_gap"),
"forecast_units_next_14d": context.get("forecast_units_next_14d"),
"supplier_risk": context.get("supplier_risk", {}),
"citations": context.get("citations", []),
"recommended_action": (
"create_replenishment_approval"
if context.get("wants_approval") and not context.get("analysis_only")
else "replenishment_analysis"
),
}
if context.get("approval_draft_id"):
result["approval_draft_id"] = context["approval_draft_id"]
if context.get("filtered_doc_ids"):
result["filtered_doc_ids"] = context["filtered_doc_ids"]
return redact_sensitive(result)

def _safe_error(self, exc: Exception) -> str:
text = str(exc) or exc.__class__.__name__
for prefix in ["vendor_secret", "unit_cost_usd", "Traceback"]:
text = text.replace(prefix, "[REDACTED]")
return text[:300]
Loading