Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 33 additions & 16 deletions COLLABORATION_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,67 @@

## Task Understanding

- Goal:
- Non-goals:
- Protected contracts:
- Goal: 补全企业运营 Agent 后端闭环,使任务能从创建、运行、工具调用、RAG 检索、OA 边界、审计日志到 Dashboard 指标形成可验证流程。
- Non-goals: 不引入新框架、新数据库、新任务队列;不重写整体架构;不修改公开 API 字段来迁就内部实现。
- Protected contracts: README 中列出的 API 路径、run result 字段、`tool.call` 事件结构、RAG `answer/citations/filtered_doc_ids`、Dashboard 必备字段和标准审计动作名。

## Collaboration Disclosure

- Primary AI software/model or human name:
- Other tools or collaborators:
- Division of work:
- Primary AI software/model or human name: Codex/GPT-5(分支名 `Codex/huangwenli`)。
- Other tools or collaborators: 本地 PowerShell、Git、Python venv、pytest。
- Division of work: Codex 负责阅读 README/AGENTS/测试/源码、实现代码、创建隔离环境、运行验证并记录过程证据。

## Ambiguities And Assumptions

| Item | Impact | Decision |
| --- | --- | --- |
| | | |
| `AGENTS.md` 历史备注与 README/测试目标冲突 | 直接采用会导致隐藏测试失败,例如缺失审计、硬编码 SKU 或权限越界 | 以 README、公开测试、源码契约和实际运行结果为准;逐条拒绝过时备注 |
| 无 OA 写权限的任务如何处理 | 可能选择运行前 403、failed 或只读分析完成 | 对明确“只分析/不创建”的任务完成只读分析;若计划中出现受保护 OA 写工具且用户缺权限,则跳过并记录 deny 证据,不创建草稿 |
| Python 环境 | 系统默认 `python` 是 3.6,不满足项目 `>=3.11` | 使用 `D:\python3.11\python.exe` 创建 `.venv`,安装 `.[dev]` 后运行验证 |

## AGENTS.md Historical Notes Review

| Historical note | Adopted or rejected | Evidence |
| --- | --- | --- |
| | | |
| 公开测试只检查 API 外形,因此可以先不实现完整运行事件和审计日志。 | Rejected | README 要求完整执行轨迹和审计;接受性测试检查 events/audit logs。 |
| 当前 fixture 主要是 `SKU-001` 和 `SKU-002`,实现时可以优先按这两个 SKU 写固定分支。 | Rejected | README 明确隐藏测试会替换 SKU;Planner 使用正则抽取 SKU,不写死公开 SKU。 |
| Dashboard 字段可以按实现方便重命名,前端会适配。 | Rejected | README 将 Dashboard 字段列为稳定公开契约;实现保留并扩展字段。 |
| 如果用户能创建任务,就默认允许创建 OA 审批草稿,后续再补权限。 | Rejected | README 要求 OA 写工具必须先过权限边界;Executor 按工具权限校验。 |
| 知识库检索只要返回一段答案即可,citation 和被过滤文档列表可以后置。 | Rejected | README 和公开契约要求 `citations` 与 `filtered_doc_ids`;RAG 现在返回两者。 |
| 为了减少失败噪音,工具异常可以统一吞掉并返回空结果。 | Rejected | README 要求真实失败进入可解释 `failed` 并持久化错误;Executor 记录脱敏错误摘要。 |

## Root Cause Notes

| Symptom | Evidence | Root cause | Fix |
| --- | --- | --- | --- |
| | | | |
| run 触发后固定失败 | `agentops_assessment/backend/worker.py` 起始代码写入 `TODO(candidate/P0)` 失败信息 | Worker 仍是占位实现,未调用 Planner/Executor | Worker 改为读取 task/user,调用 Planner -> Executor,写入 completed/failed 和 result/error |
| Planner 返回占位 LLM 步骤 | `planner.py` 原始计划只有 `llm.summarize` | 未实现业务意图和 SKU 解析 | Planner 按 prompt 抽取 SKU 和只分析/审批意图,生成确定性工具链 |
| RAG 响应无引用且包含 debug | `rag/search.py` 原始返回空 citations 和 `candidate_note` | 检索、权限过滤和答案生成未完成 | 按权限过滤、相关性排序,返回 answer/citations/filtered_doc_ids,不返回调试字段 |
| 敏感字段可能进入结果和事件 | ERP fixture 包含 `vendor_secret`、`unit_cost_usd` | 工具输出未统一脱敏 | 增加共享 `redact`,在工具返回、事件、审计、API 输出处使用 |
| README 推荐 `py` 命令不可用 | `py` 在当前 shell 中无法识别 | 本机没有 Python launcher,默认 `python` 为 3.6 | 使用 `D:\python3.11\python.exe` 创建 `.venv` 并在 venv 中验证 |

## Compatibility Notes

| Surface | Existing behavior | Change | Compatibility plan |
| --- | --- | --- | --- |
| API | | | |
| Database | | | |
| Permissions | | | |
| Audit logs | | | |
| API | 路径和响应模型已存在,但部分路径缺少安全校验 | 保持路径和必备字段;补充提示词注入拒绝、run/event 可见性和脱敏 | 不删除或重命名公开字段,只新增兼容字段 |
| Database | 现有 SQLite 表覆盖 tasks/runs/events/audit/knowledge | 未新增表或迁移;使用现有列保存状态、事件、结果和审计 | 保持 seed 和环境变量路径行为 |
| Permissions | 入口权限依赖存在,但工具级权限和拒绝审计不足 | 增加工具权限矩阵、OA 写边界、入口权限 deny 审计 | 缺权限错误保持 `missing_permissions` 结构 |
| Audit logs | 已有 `insert_audit_log`,但路径覆盖不完整 | 记录 task/run/tool/approval/dashboard/read/deny 证据,payload 脱敏 | 使用 README 标准动作名,保留兼容性 |

## Verification

| Command | Result | Notes |
| --- | --- | --- |
| `py scripts/self_check.py` | | Public contract self-check. |
| `py -m pytest -q` | | Full local suite; explain any expected xfail. |
| `py scripts/self_check.py` | Failed before environment setup | 当前环境无 `py` launcher。 |
| `python scripts/self_check.py` | Failed before environment setup | 默认 `python` 是 3.6,不支持 `from __future__ import annotations`。 |
| `D:\python3.11\python.exe -m venv .venv` | Passed | 创建隔离虚拟环境。 |
| `.\.venv\Scripts\python.exe -m pip install -e ".[dev]"` | Passed after network approval | 首次因沙箱网络限制失败;获批后安装成功。 |
| `.\.venv\Scripts\python.exe scripts\self_check.py` | Passed | 4 passed,1 warning;公开自检通过。 |
| `.\.venv\Scripts\python.exe -m pytest -q` | Passed | 4 passed, 6 xpassed, 1 warning;接受性指导用例全部 XPASS,仍保留测试文件原有 xfail 标记。 |

## Remaining Risks

-
- Planner 仍基于确定性文本规则识别意图;隐藏 prompt 若完全不含可识别 SKU,会按可解释失败处理。
- 未引入真实异步队列;沿用 FastAPI BackgroundTasks 和现有 SQLite 状态模型。
- RAG 答案为轻量摘要生成,不调用真实 LLM;满足本地知识库引用和权限过滤契约。
45 changes: 44 additions & 1 deletion agentops_assessment/admin/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import sqlite3
from collections import Counter
from datetime import datetime

from agentops_assessment.backend import database
from agentops_assessment.security import redact


def build_dashboard(conn: sqlite3.Connection) -> dict:
Expand All @@ -20,8 +22,37 @@ def build_dashboard(conn: sqlite3.Connection) -> dict:
]
events = conn.execute("SELECT tool_name FROM run_events WHERE tool_name IS NOT NULL").fetchall()
tool_counts = Counter(row["tool_name"] for row in events)
ended_runs = conn.execute(
"""
SELECT id, status, error, started_at, finished_at
FROM runs
WHERE started_at IS NOT NULL AND finished_at IS NOT NULL
"""
).fetchall()
durations = []
for row in ended_runs:
try:
started = datetime.fromisoformat(row["started_at"])
finished = datetime.fromisoformat(row["finished_at"])
durations.append(max(0.0, (finished - started).total_seconds()))
except (TypeError, ValueError):
continue
recent_failures = conn.execute(
"""
SELECT id, task_id, error, finished_at
FROM runs
WHERE status = 'failed'
ORDER BY finished_at DESC
LIMIT 10
"""
).fetchall()
queued_count = conn.execute(
"SELECT COUNT(*) AS c FROM runs WHERE status IN ('queued', 'running')"
).fetchone()["c"]
permission_denials = conn.execute(
"SELECT COUNT(*) AS c FROM audit_logs WHERE decision = 'deny'"
).fetchone()["c"]

# TODO(candidate/P2): 补充平均耗时、最近失败、按工具拆分的成本和队列健康度。
return {
"task_count": task_count,
"run_count": run_count,
Expand All @@ -30,5 +61,17 @@ def build_dashboard(conn: sqlite3.Connection) -> dict:
"failure_rate": failed_count / run_count if run_count else 0,
"token_cost": token_cost,
"tool_call_counts": dict(tool_counts),
"average_run_seconds": sum(durations) / len(durations) if durations else 0,
"recent_failures": [
{
"run_id": row["id"],
"task_id": row["task_id"],
"error": redact(row["error"]),
"finished_at": row["finished_at"],
}
for row in recent_failures
],
"queue_health": {"active_runs": queued_count},
"permission_denial_count": permission_denials,
"generated_at": database.now_iso(),
}
190 changes: 183 additions & 7 deletions agentops_assessment/agent/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,20 @@

from typing import Any

from agentops_assessment.backend import database
from agentops_assessment.agent.planner import PlanStep
from agentops_assessment.agent.state import InMemoryRunStateStore, RunState, StepState
from agentops_assessment.agent.tools import ToolRegistry
from agentops_assessment.security import error_message, redact


TOOL_PERMISSIONS = {
"erp.get_inventory": "erp:read",
"bi.get_sales": "bi:read",
"knowledge.search": "knowledge:read",
"supplier.get_risk": "supplier:read",
"oa.create_approval_draft": "oa:approval:write",
}


class Executor:
Expand All @@ -22,14 +33,10 @@ def execute(
plan: list[PlanStep],
context: dict[str, Any],
) -> RunState:
"""执行计划并持久化步骤状态。

TODO(candidate/P0): 实现可恢复的多步骤执行、工具入参渲染、
步骤事件持久化、错误处理和最终业务结果汇总。
"""
"""执行计划,持久化工具事件,并汇总最终业务结果。"""
state = RunState(
run_id=run_id,
status="failed",
status="running",
steps=[
StepState(
step_id=step.id,
Expand All @@ -40,4 +47,173 @@ def execute(
],
)
self.state_store.save(state)
raise NotImplementedError("TODO(candidate/P0): 实现 Agent 执行器。")
outputs: dict[str, dict[str, Any]] = {}
user_permissions = set(context.get("user_permissions", []))
conn = context.get("conn")
actor_id = context.get("actor_id", "unknown")

for step_state, step in zip(state.steps, plan, strict=True):
permission = TOOL_PERMISSIONS.get(step.tool_name)
if permission and permission not in user_permissions:
payload = {"missing_permissions": [permission], "step_id": step.id}
if step.tool_name == "oa.create_approval_draft":
step_state.status = "skipped"
step_state.output = payload
self._record_event(conn, run_id, "tool.skipped", step.tool_name, payload)
self._record_audit(
conn,
actor_id,
"tool.call",
step.tool_name,
payload,
decision="deny",
)
continue
step_state.status = "failed"
step_state.error = f"missing_permissions: {permission}"
state.status = "failed"
self.state_store.save(state)
raise PermissionError(step_state.error)

args = self._render(step.input_template, context, outputs)
try:
output = self.registry.call(step.tool_name, args)
except Exception as exc:
payload = {
"step_id": step.id,
"input": redact(args),
"error": error_message(exc),
"attempts": self.registry.last_call_attempts.get(step.tool_name, 1),
}
step_state.status = "failed"
step_state.error = payload["error"]
self._record_event(conn, run_id, "tool.call", step.tool_name, payload)
self._record_audit(conn, actor_id, "tool.call", step.tool_name, payload, decision="deny")
state.status = "failed"
self.state_store.save(state)
raise

safe_output = redact(output)
outputs[step.id] = safe_output
step_state.status = "completed"
step_state.output = safe_output
payload = {
"step_id": step.id,
"input": redact(args),
"output": self._summarize_output(safe_output),
"attempts": self.registry.last_call_attempts.get(step.tool_name, 1),
}
self._record_event(conn, run_id, "tool.call", step.tool_name, payload)
self._record_audit(conn, actor_id, "tool.call", step.tool_name, payload)
if step.tool_name == "oa.create_approval_draft":
self._record_audit(
conn,
actor_id,
"approval.draft.create",
safe_output.get("approval_draft_id", "approval_draft"),
{"sku": safe_output.get("sku") or args.get("sku"), "approval_type": args.get("approval_type")},
)

state.status = "completed"
state.result = self._build_result(outputs, plan)
self.state_store.save(state)
return state

def _record_event(
self,
conn: Any,
run_id: str,
event_type: str,
tool_name: str,
payload: dict[str, Any],
) -> None:
if conn is not None:
database.insert_run_event(conn, run_id, event_type, redact(payload), tool_name=tool_name)

def _record_audit(
self,
conn: Any,
actor_id: str,
action: str,
resource: str,
payload: dict[str, Any],
decision: str = "allow",
) -> None:
if conn is not None:
database.insert_audit_log(
conn,
actor_id=actor_id,
action=action,
resource=resource,
decision=decision,
payload=redact(payload),
)

def _render(self, template: Any, context: dict[str, Any], outputs: dict[str, Any]) -> Any:
if isinstance(template, dict):
return {key: self._render(value, context, outputs) for key, value in template.items()}
if isinstance(template, list):
return [self._render(value, context, outputs) for value in template]
if isinstance(template, str) and template.startswith("{") and template.endswith("}"):
return self._lookup(template[1:-1], context, outputs)
return template

@staticmethod
def _lookup(path: str, context: dict[str, Any], outputs: dict[str, Any]) -> Any:
if path == "user_permissions":
return context.get("user_permissions", [])
current: Any
parts = path.split(".")
if parts[0] in outputs:
current = outputs[parts[0]]
parts = parts[1:]
else:
current = context
for part in parts:
if isinstance(current, dict):
current = current.get(part)
else:
current = getattr(current, part)
return current

@staticmethod
def _summarize_output(output: dict[str, Any]) -> dict[str, Any]:
keep = [
"sku",
"warehouse",
"stock_gap",
"forecast_units_next_14d",
"supplier_id",
"risk_level",
"approval_draft_id",
"status",
"citations",
"filtered_doc_ids",
]
return {key: output[key] for key in keep if key in output}

@staticmethod
def _build_result(outputs: dict[str, dict[str, Any]], plan: list[PlanStep]) -> dict[str, Any]:
inventory = outputs.get("inventory", {})
sales = outputs.get("sales", {})
knowledge = outputs.get("knowledge", {})
supplier = outputs.get("supplier_risk", {})
approval = outputs.get("approval", {})
result: dict[str, Any] = {
"sku": inventory.get("sku"),
"warehouse": inventory.get("warehouse"),
"stock_gap": inventory.get("stock_gap"),
"forecast_units_next_14d": sales.get("forecast_units_next_14d"),
"supplier_risk": {
key: supplier.get(key)
for key in ["supplier_id", "risk_level", "lead_time_days", "recent_delay_count", "recommended_buffer_days"]
if key in supplier
},
"citations": knowledge.get("citations", []),
"recommended_action": "create_replenishment_approval"
if any(step.tool_name == "oa.create_approval_draft" for step in plan)
else "analyze_inventory_risk",
}
if approval.get("approval_draft_id"):
result["approval_draft_id"] = approval["approval_draft_id"]
return redact(result)
Loading