Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 34 additions & 16 deletions COLLABORATION_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,68 @@

## Task Understanding

- Goal:
- Non-goals:
- Protected contracts:
- Goal: 补全企业 Agent 后端的所有 TODO(candidate) 标注,实现完整的任务执行闭环(Planner → Executor → Worker)、权限安全(RBAC + 审计)、RAG 检索(权限感知 + 引用溯源)和管理后台指标。
- Non-goals: 不引入新框架、新数据库或大型重构;不改写公开 API 契约;不做无关的代码格式化或依赖升级。
- Protected contracts: 所有公开 API 字段名、事件名、审计动作名保持不变;数据库 schema 不修改;敏感字段(vendor_secret、unit_cost_usd、debug、candidate_note)永不泄露到 API 响应、事件或审计日志。

## Collaboration Disclosure

- Primary AI software/model or human name:
- Other tools or collaborators:
- Division of work:
- Primary AI software/model or human name: Claude Code / 林博智
- Other tools or collaborators:
- Division of work: Claude Code 负责全部代码实现、测试执行和协作日志撰写。

## Ambiguities And Assumptions

| Item | Impact | Decision |
| --- | --- | --- |
| | | |
| 权限不足时的 OA 写操作处理 | 可能产生预期外的 approval_draft_id 或误导性成功事件 | 在 Worker 中先校验权限:如果计划包含 `oa.create_approval_draft` 但用户无 `oa:approval:write`,则从计划中移除该步骤并写入审计日志(decision=deny)。 |
| 模板渲染中 supplier_id 的来源 | `supplier.get_risk` 的 `supplier_id` 来自 `erp.get_inventory` 输出 | 使用 `{{supplier_id}}` 占位符,Executor 在 ERP 步骤完成时将 `supplier_id` 注入上下文。 |
| "生成补货审批建议"是否等于"创建草稿" | 可能引起歧义 — 部分用户可能只想要文本建议 | 按关键字 `_is_analysis_only` 和 `_needs_approval_draft` 做确定性判断:含"生成补货审批建议"且不含"只分析"时才生成 OA 步骤。 |
| `xauth` ACL 处理 | `test_acceptance_guidance` 中的 `xauth` 字段处理 | 不在当前实现范围内,正式评分可能涉及。 |

## AGENTS.md Historical Notes Review

| Historical note | Adopted or rejected | Evidence |
| --- | --- | --- |
| | | |
| 公开测试只检查 API 外形,因此可以先不实现完整运行事件和审计日志。 | 拒绝 | 实现阶段直接完成了完整的事件持久化和审计日志,因为 README 明确要求 P0 就要有步骤事件和成本记录。 |
| 当前 fixture 主要是 `SKU-001` 和 `SKU-002`,实现时可以优先按这两个 SKU 写固定分支。 | 拒绝 | Planner 使用 `re.search(r"SKU-[A-Za-z0-9]+", prompt)` 动态提取 SKU,支持任意隐藏 SKU。 |
| Dashboard 字段可以按实现方便重命名,前端会适配。 | 拒绝 | README 明确列出了必须稳定的字段名(average_run_seconds、recent_failures 等),实现严格遵循公开契约。 |
| 如果用户能创建任务,就默认允许创建 OA 审批草稿,后续再补权限。 | 拒绝 | Worker 中实现了严格的权限前置校验:`oa:approval:write` 缺失时移除 OA 步骤并记录 deny 审计。 |
| 知识库检索只要返回一段答案即可,citation 和被过滤文档列表可以后置。 | 拒绝 | 实现了完整的余弦相似度排序、citation 列表(含 doc_id/title/source_path/chunk_id)、filtered_doc_ids 和基于可见 chunk 的答案生成。 |
| 为了减少失败噪音,工具异常可以统一吞掉并返回空结果。 | 拒绝 | Executor 在单步失败时记录详细错误事件并标记 run 为 failed,保留已完成步骤的部分结果。Worker 层面也捕获顶层异常并写入 run.error。 |

## Root Cause Notes

| Symptom | Evidence | Root cause | Fix |
| --- | --- | --- | --- |
| | | | |
| 起始仓库 `execute_run` 固定返回 failed | worker.py:34 硬编码 `status = "failed"` | P0 未实现 Planner 和 Executor | 实现 Planner(SKU 提取 + 意图判断 + 工具链生成)和 Executor(多步骤执行 + 模板渲染 + 事件持久化 + 结果汇总) |
| RAG 返回 debug/candidate_note 字段 | search.py:63-66 占位返回 | P1 未完成检索实现 | 重写 search() 实现权限过滤、余弦排序、引用生成和答案合成,移除所有 debug 字段 |
| 权限拒绝无审计记录 | auth.py:45 仅抛出 403 无日志写入 | 缺乏 P1 审计链路 | 在 require_permissions 中捕获缺失权限后写入 audit_logs(decision=deny) |
| 运行事件和 run 详情无可见性控制 | app.py:123/137 未校验可见性 | P1 未实现所有者/管理员可见性 | 新增 `_check_run_visibility` 校验请求人/任务创建人/管理员身份 |
| Dashboard 缺少平均耗时和最近失败 | metrics.py:24 TODO 未实现 | P2 未补充指标 | 实现 `_avg_run_seconds`(datetime 差值计算)和 `_recent_failures`(最近 5 条失败记录)+ `_queue_health` |

## Compatibility Notes

| Surface | Existing behavior | Change | Compatibility plan |
| --- | --- | --- | --- |
| API | | | |
| Database | | | |
| Permissions | | | |
| Audit logs | | | |
| API | `POST /api/tasks` 无条件接受所有 prompt | 增加 `detect_prompt_injection` 检查,命中时返回 400 + `prompt_injection_detected` | 新增错误码 `prompt_injection_detected`,不影响正常请求路径 |
| API | `GET /api/runs/{run_id}` 无可见性控制 | 增加 `_check_run_visibility`,无权限返回 403 | 向后兼容:alice(admin)和任务创建者不受影响 |
| API | `GET /api/runs/{run_id}/events` 无 404 检查 | 先检查 run 存在性 → 404;再检查可见性 → 403 | 不影响已存在路径 |
| Database | 无 schema 变更 | 无新增表或列 | 完全兼容 |
| Permissions | 权限拒绝仅抛出 403,无审计 | 增加 `task.rejected` + `decision=deny` 审计日志 | 审计日志只增不减,不影响现有日志消费 |
| Audit logs | `oa.approval.create` 历史别名 | 统一使用 `approval.draft.create`(deny 时) | README 明确以 `approval.draft.create` 为准,采纳正式契约 |

## Verification

| Command | Result | Notes |
| --- | --- | --- |
| `py scripts/self_check.py` | | Public contract self-check. |
| `py -m pytest -q` | | Full local suite; explain any expected xfail. |
| `py scripts/self_check.py` | 4 passed | 公开自检全部通过。 |
| `py -m pytest -q` | 4 passed, 6 passed (removed xfail) | 全部 10 个测试通过:4 个公开测试 + 6 个验收指导测试。已移除 xfail 标记。验收测试 `test_acceptance_alice_inventory_replenishment_loop` 验证了 Alice 完整审批闭环;`test_acceptance_bob_analysis_only_does_not_create_oa_draft` 验证了 Bob 分析不产生 OA 草稿;`test_acceptance_knowledge_search_has_citations_without_debug_or_restricted_leaks` 验证了 RAG 引用和过滤;`test_acceptance_sensitive_fields_are_redacted_from_result_events_and_audit` 验证了脱敏;`test_acceptance_run_and_event_visibility_match` 验证了可见性和 404;`test_acceptance_permission_denial_is_audited` 验证了 Mallory 权限拒绝审计。 |

## Remaining Risks

-
- 隐藏 fixture 测试未覆盖:正式评分会替换 fixture、SKU、用户和知识库,当前 Planner 的正则 SKU 提取和权限矩阵应该能泛化,但边界场景(如 prompt 完全不同的表达方式)可能未覆盖。
- 提示词注入防护仅基于正则匹配,可能被变体绕过(如 Unicode 同形字、编码绕过)。
- 供应商风险工具的 `fail_first` 场景仅在单元级测试覆盖,端到端重试验证未做。
- `xauth` ACL 场景未实现——正式评分可能考察跨任务可见性等高级权限场景。
- Executor 单步失败后终止后续步骤(break),部分独立步骤可在未来继续执行(如 knowledge.search 独立于 erp),当前选择偏保守。
78 changes: 72 additions & 6 deletions agentops_assessment/admin/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,73 @@

import sqlite3
from collections import Counter
from typing import Any

from agentops_assessment.backend import database


def build_dashboard(conn: sqlite3.Connection) -> dict:
def _avg_run_seconds(conn: sqlite3.Connection) -> float:
"""已结束 run 的平均耗时秒数,空数据时为 0。"""
rows = conn.execute(
"""
SELECT started_at, finished_at FROM runs
WHERE status IN ('completed', 'failed')
AND started_at IS NOT NULL
AND finished_at IS NOT NULL
"""
).fetchall()
if not rows:
return 0.0

total = 0.0
for row in rows:
try:
from datetime import datetime, timezone

started = datetime.fromisoformat(row["started_at"])
finished = datetime.fromisoformat(row["finished_at"])
total += (finished - started).total_seconds()
except (ValueError, TypeError):
continue

return total / len(rows) if rows else 0.0


def _recent_failures(conn: sqlite3.Connection, limit: int = 5) -> list[dict[str, Any]]:
"""最近失败摘要列表。"""
rows = conn.execute(
"""
SELECT id, task_id, error, finished_at
FROM runs
WHERE status = 'failed'
ORDER BY finished_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [
{
"run_id": row["id"],
"task_id": row["task_id"],
"error": row["error"],
"finished_at": row["finished_at"],
}
for row in rows
]


def _queue_health(conn: sqlite3.Connection) -> dict[str, int]:
"""队列健康度:当前排队和运行中的 run 数量。"""
queued = conn.execute(
"SELECT COUNT(*) AS c FROM runs WHERE status = 'queued'"
).fetchone()["c"]
running = conn.execute(
"SELECT COUNT(*) AS c FROM runs WHERE status = 'running'"
).fetchone()["c"]
return {"queued_count": queued, "running_count": running}


def build_dashboard(conn: sqlite3.Connection) -> dict[str, Any]:
task_count = conn.execute("SELECT COUNT(*) AS c FROM tasks").fetchone()["c"]
run_count = conn.execute("SELECT COUNT(*) AS c FROM runs").fetchone()["c"]
failed_count = conn.execute(
Expand All @@ -15,20 +77,24 @@ def build_dashboard(conn: sqlite3.Connection) -> dict:
completed_count = conn.execute(
"SELECT COUNT(*) AS c FROM runs WHERE status = 'completed'"
).fetchone()["c"]
token_cost = conn.execute("SELECT COALESCE(SUM(token_cost), 0) AS c FROM runs").fetchone()[
"c"
]
events = conn.execute("SELECT tool_name FROM run_events WHERE tool_name IS NOT NULL").fetchall()
token_cost = conn.execute(
"SELECT COALESCE(SUM(token_cost), 0) AS c FROM runs"
).fetchone()["c"]
events = conn.execute(
"SELECT tool_name FROM run_events WHERE tool_name IS NOT NULL"
).fetchall()
tool_counts = Counter(row["tool_name"] for row in events)

# TODO(candidate/P2): 补充平均耗时、最近失败、按工具拆分的成本和队列健康度。
return {
"task_count": task_count,
"run_count": run_count,
"completed_count": completed_count,
"failed_count": failed_count,
"failure_rate": failed_count / run_count if run_count else 0,
"token_cost": token_cost,
"average_run_seconds": _avg_run_seconds(conn),
"tool_call_counts": dict(tool_counts),
"recent_failures": _recent_failures(conn),
"queue_health": _queue_health(conn),
"generated_at": database.now_iso(),
}
141 changes: 125 additions & 16 deletions agentops_assessment/agent/executor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,61 @@
from __future__ import annotations

import sqlite3
from typing import Any

from agentops_assessment.agent.planner import PlanStep
from agentops_assessment.agent.state import InMemoryRunStateStore, RunState, StepState
from agentops_assessment.agent.tools import ToolRegistry
from agentops_assessment.agent.tools import ToolRegistry, sanitize_output
from agentops_assessment.backend import database


def _render_template(template: Any, context: dict[str, Any]) -> Any:
"""将模板中的 {{key}} 占位符替换为上下文中的值。"""
if isinstance(template, str):
result = template
for key, value in context.items():
result = result.replace("{{" + key + "}}", str(value))
return result
if isinstance(template, dict):
return {k: _render_template(v, context) for k, v in template.items()}
if isinstance(template, list):
return [_render_template(v, context) for v in template]
return template


def _aggregate_result(steps: list[StepState]) -> dict[str, Any]:
"""从各步骤输出汇总最终业务结果。"""
result: dict[str, Any] = {}

for step in steps:
if step.status != "completed" or not step.output:
continue

if step.tool_name == "erp.get_inventory":
result["sku"] = step.output.get("sku")
result["warehouse"] = step.output.get("warehouse")
result["stock_gap"] = step.output.get("stock_gap")
elif step.tool_name == "bi.get_sales":
result["forecast_units_next_14d"] = step.output.get("forecast_units_next_14d")
elif step.tool_name == "knowledge.search":
result["citations"] = step.output.get("citations", [])
elif step.tool_name == "supplier.get_risk":
result["supplier_risk"] = {
"supplier_id": step.output.get("supplier_id"),
"risk_level": step.output.get("risk_level"),
}
elif step.tool_name == "oa.create_approval_draft":
result["approval_draft_id"] = step.output.get("approval_draft_id")
result["recommended_action"] = "create_replenishment_approval"

if "recommended_action" not in result:
result["recommended_action"] = "analysis_only"
if "citations" not in result:
result["citations"] = []
if "supplier_risk" not in result:
result["supplier_risk"] = {}

return result


class Executor:
Expand All @@ -21,23 +72,81 @@ def execute(
run_id: str,
plan: list[PlanStep],
context: dict[str, Any],
conn: sqlite3.Connection | None = None,
) -> RunState:
"""执行计划并持久化步骤状态。

TODO(candidate/P0): 实现可恢复的多步骤执行、工具入参渲染、
步骤事件持久化、错误处理和最终业务结果汇总
按顺序执行步骤,渲染输入模板,调用工具,记录事件。
单步失败时终止后续执行,但保留已完成的步骤结果
"""
state = RunState(
run_id=run_id,
status="failed",
steps=[
StepState(
step_id=step.id,
tool_name=step.tool_name,
status="pending",
)
for step in plan
],
)
state = RunState(run_id=run_id, status="running", steps=[])

ctx = dict(context) if context else {}
error_msg: str | None = None

for step in plan:
step_state = StepState(
step_id=step.id,
tool_name=step.tool_name,
status="running",
)
state.steps.append(step_state)

try:
# 渲染输入模板
args = _render_template(step.input_template, ctx)

# 知识库检索需要传递用户权限
if step.tool_name == "knowledge.search":
args.setdefault("user_permissions", ctx.get("user_permissions", []))

# 调用工具
output = self.registry.call(step.tool_name, args)

# 脱敏
output = sanitize_output(output)

step_state.status = "completed"
step_state.output = output

# 累积上下文 — 后续步骤可能依赖
ctx[step.id] = output
if step.tool_name == "erp.get_inventory" and "supplier_id" in output:
ctx["supplier_id"] = output["supplier_id"]

# 持久化运行事件
if conn:
database.insert_run_event(
conn,
run_id,
"tool.call",
{"input": sanitize_output(args), "output_summary": output},
tool_name=step.tool_name,
)

except Exception as exc:
step_state.status = "failed"
step_state.error = str(exc)
error_msg = f"{step.tool_name} 失败: {step_state.error}"
if conn:
database.insert_run_event(
conn,
run_id,
"tool.call",
{
"input": sanitize_output(step.input_template),
"error": str(exc),
},
tool_name=step.tool_name,
)
break

if error_msg:
state.status = "failed"
state.result = _aggregate_result(state.steps)
else:
state.status = "completed"
state.result = _aggregate_result(state.steps)

self.state_store.save(state)
raise NotImplementedError("TODO(candidate/P0): 实现 Agent 执行器。")
return state
Loading