From b08fb41cbd405785392739ca36c2e4b0edabc4cd Mon Sep 17 00:00:00 2001 From: Daniel Miller Date: Tue, 20 Jan 2026 09:52:07 -0500 Subject: [PATCH] Add evals --- .../demos/procurement_agent/evals/README.md | 63 + .../demos/procurement_agent/evals/__init__.py | 0 .../demos/procurement_agent/evals/conftest.py | 227 ++++ .../evals/fixtures/__init__.py | 0 .../evals/fixtures/events.py | 108 ++ .../evals/graders/__init__.py | 0 .../evals/graders/database.py | 187 +++ .../evals/graders/tool_calls.py | 80 ++ .../demos/procurement_agent/evals/pytest.ini | 8 + .../demos/procurement_agent/evals/report.html | 1094 +++++++++++++++++ .../procurement_agent/evals/tasks/__init__.py | 0 .../evals/tasks/test_inspection_failed.py | 162 +++ .../evals/tasks/test_inspection_passed.py | 116 ++ .../evals/tasks/test_shipment_arrived.py | 119 ++ .../evals/tasks/test_shipment_departed.py | 202 +++ .../evals/tasks/test_submittal_approved.py | 87 ++ .../project/agents/procurement_agent.py | 13 + .../project/scripts/happy_path.py | 184 +++ .../project/scripts/human_in_the_loop.py | 157 +++ .../project/scripts/out_of_order.py | 180 +++ .../demos/procurement_agent/pyproject.toml | 3 +- uv.lock | 2 +- 22 files changed, 2990 insertions(+), 2 deletions(-) create mode 100644 examples/demos/procurement_agent/evals/README.md create mode 100644 examples/demos/procurement_agent/evals/__init__.py create mode 100644 examples/demos/procurement_agent/evals/conftest.py create mode 100644 examples/demos/procurement_agent/evals/fixtures/__init__.py create mode 100644 examples/demos/procurement_agent/evals/fixtures/events.py create mode 100644 examples/demos/procurement_agent/evals/graders/__init__.py create mode 100644 examples/demos/procurement_agent/evals/graders/database.py create mode 100644 examples/demos/procurement_agent/evals/graders/tool_calls.py create mode 100644 examples/demos/procurement_agent/evals/pytest.ini create mode 100644 examples/demos/procurement_agent/evals/report.html create mode 100644 examples/demos/procurement_agent/evals/tasks/__init__.py create mode 100644 examples/demos/procurement_agent/evals/tasks/test_inspection_failed.py create mode 100644 examples/demos/procurement_agent/evals/tasks/test_inspection_passed.py create mode 100644 examples/demos/procurement_agent/evals/tasks/test_shipment_arrived.py create mode 100644 examples/demos/procurement_agent/evals/tasks/test_shipment_departed.py create mode 100644 examples/demos/procurement_agent/evals/tasks/test_submittal_approved.py create mode 100644 examples/demos/procurement_agent/project/scripts/happy_path.py create mode 100644 examples/demos/procurement_agent/project/scripts/human_in_the_loop.py create mode 100644 examples/demos/procurement_agent/project/scripts/out_of_order.py diff --git a/examples/demos/procurement_agent/evals/README.md b/examples/demos/procurement_agent/evals/README.md new file mode 100644 index 000000000..ddb96573c --- /dev/null +++ b/examples/demos/procurement_agent/evals/README.md @@ -0,0 +1,63 @@ +# Procurement Agent Evals + +Integration tests for the procurement agent that verify tool calls and database state. + +## Prerequisites + +1. AgentEx backend running (`make dev` from scale-agentex) +2. Procurement agent running: + ```bash + cd examples/demos/procurement_agent + export ENVIRONMENT=development + uv run agentex agents run --manifest manifest.yaml + ``` + +## Running Tests + +From the `procurement_agent` directory: + +```bash +# Run all tests +cd evals && uv run pytest + +# Run specific test file +cd evals && uv run pytest tasks/test_shipment_departed.py -v + +# Run single test +cd evals && uv run pytest tasks/test_shipment_departed.py::test_departed_01_no_flag_5_days_early -v +``` + +## Test Structure + +| File | Event Type | Focus | +|------|------------|-------| +| `test_submittal_approved.py` | Submittal_Approved | PO issued, DB entry | +| `test_shipment_departed.py` | Shipment_Departed | **False positive detection** | +| `test_shipment_arrived.py` | Shipment_Arrived | Team notification, inspection | +| `test_inspection_failed.py` | Inspection_Failed | Human-in-the-loop | +| `test_inspection_passed.py` | Inspection_Passed | Status update | + +## Test Cases Summary + +| Event | Tests | Key Assertions | +|-------|-------|----------------| +| Submittal_Approved | 2 | `issue_purchase_order` called, DB item created | +| Shipment_Departed | 6 | Forbidden: `flag_potential_issue` when ETA < required_by | +| Shipment_Arrived | 2 | `notify_team`, `schedule_inspection` called | +| Inspection_Failed | 3 | Human-in-loop: approve, approve+extra, reject+delete | +| Inspection_Passed | 2 | Forbidden: `wait_for_human`, `flag_potential_issue` | + +## Graders + +- **tool_calls.py**: Verifies required and forbidden tool calls in transcripts +- **database.py**: Verifies database state changes + +## False Positive Detection + +The `test_shipment_departed.py` tests are specifically designed to catch the false positive issue where the agent incorrectly flags conflicts. + +**Conflict logic:** +- **Flag if** ETA >= required_by (zero/negative buffer) +- **Don't flag if** ETA < required_by (has buffer remaining) + +The tests use `assert_forbidden_tools(["flag_potential_issue"])` to catch cases where the agent incorrectly escalates. diff --git a/examples/demos/procurement_agent/evals/__init__.py b/examples/demos/procurement_agent/evals/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/demos/procurement_agent/evals/conftest.py b/examples/demos/procurement_agent/evals/conftest.py new file mode 100644 index 000000000..28e299b45 --- /dev/null +++ b/examples/demos/procurement_agent/evals/conftest.py @@ -0,0 +1,227 @@ +""" +Pytest fixtures for procurement agent evals. + +Provides workflow setup, transcript extraction, and human input simulation. +""" +from __future__ import annotations + +import os +import uuid +import asyncio +from typing import Any, AsyncGenerator +from datetime import datetime as dt + +import pytest +import pytest_asyncio +from temporalio.client import Client, WorkflowHandle + +from agentex.types.task import Task +from agentex.types.agent import Agent +from agentex.lib.types.acp import CreateTaskParams + +# Set environment variables for local development +os.environ.setdefault("AGENT_NAME", "procurement-agent") +os.environ.setdefault("ACP_URL", "http://localhost:8000") +os.environ.setdefault("WORKFLOW_NAME", "procurement-agent") +os.environ.setdefault("WORKFLOW_TASK_QUEUE", "procurement_agent_queue") +os.environ.setdefault("TEMPORAL_ADDRESS", "localhost:7233") + + +@pytest.fixture(scope="session") +def event_loop(): + """Create an event loop for the test session.""" + loop = asyncio.get_event_loop_policy().new_event_loop() + yield loop + loop.close() + + +@pytest_asyncio.fixture(scope="session") +async def temporal_client() -> AsyncGenerator[Client, None]: + """Create a Temporal client for the test session.""" + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233") + ) + yield client + # Client doesn't need explicit close + + +@pytest_asyncio.fixture +async def workflow_handle(temporal_client: Client) -> AsyncGenerator[WorkflowHandle, None]: + """ + Start a fresh workflow for each test. + + Creates a unique workflow ID and starts the procurement agent workflow. + Yields the handle for sending signals and querying state. + """ + workflow_id = f"eval-{uuid.uuid4()}" + task_queue = os.environ.get("WORKFLOW_TASK_QUEUE", "procurement_agent_queue") + workflow_name = os.environ.get("WORKFLOW_NAME", "procurement-agent") + + # Create agent and task params + now = dt.now() + agent = Agent( + id="procurement-agent", + name="procurement-agent", + acp_type="agentic", + description="Procurement agent for construction delivery management", + created_at=now, + updated_at=now, + ) + task = Task(id=workflow_id) + create_task_params = CreateTaskParams(agent=agent, task=task, params=None) + + # Start the workflow + handle = await temporal_client.start_workflow( + workflow_name, + create_task_params, + id=workflow_id, + task_queue=task_queue, + ) + + # Give workflow time to initialize + await asyncio.sleep(2) + + yield handle + + # Cleanup: terminate the workflow after test + try: + await handle.terminate("Test completed") + except Exception: + pass # Workflow may have already completed + + +async def send_event(handle: WorkflowHandle, event: Any) -> None: + """ + Send an event to the workflow via signal. + + Args: + handle: The workflow handle + event: A Pydantic event model (will be serialized to JSON) + """ + event_json = event.model_dump_json() + await handle.signal("send_event", event_json) + + +async def send_human_response(handle: WorkflowHandle, response: str) -> None: + """ + Send a human response to the workflow. + + This simulates a user responding in the UI to a wait_for_human escalation. + + Args: + handle: The workflow handle + response: The human's text response + """ + # Import here to avoid circular imports + from agentex.types.task import Task + from agentex.types.agent import Agent + from agentex.types.event import Event + from agentex.lib.types.acp import SendEventParams + from agentex.types.text_content import TextContent + + now = dt.now() + agent = Agent( + id="procurement-agent", + name="procurement-agent", + acp_type="agentic", + description="Procurement agent for construction delivery management", + created_at=now, + updated_at=now, + ) + task = Task(id=handle.id) + event = Event( + id=str(uuid.uuid4()), + agent_id="procurement-agent", + task_id=handle.id, + sequence_id=1, + content=TextContent(author="user", content=response), + ) + params = SendEventParams(agent=agent, task=task, event=event) + + await handle.signal("receive_event", params) + + +async def wait_for_processing(_handle: WorkflowHandle, timeout_seconds: float = 60) -> None: + """ + Wait for the workflow to finish processing an event. + + Polls the workflow until no more activities are running. + + Args: + _handle: The workflow handle (unused, reserved for future polling) + timeout_seconds: Maximum time to wait + """ + # Simple approach: wait a fixed time for agent to process + # In production, you'd poll workflow state more intelligently + await asyncio.sleep(timeout_seconds) + + +async def get_workflow_transcript(handle: WorkflowHandle) -> list[dict[str, Any]]: + """ + Extract the conversation transcript from workflow history. + + Queries the workflow to get the internal state containing tool calls. + + Args: + handle: The workflow handle + + Returns: + List of message dicts containing tool calls and responses + """ + # Query workflow state to get the input_list (conversation history) + # This requires the workflow to expose a query handler + + # For now, we'll extract from workflow history events + # The tool calls appear in activity completions + transcript = [] + + async for event in handle.fetch_history_events(): + # Look for activity completed events + if hasattr(event, 'activity_task_completed_event_attributes'): + attrs = event.activity_task_completed_event_attributes + if attrs and hasattr(attrs, 'result'): + # Activity results contain tool execution info + transcript.append({ + "type": "activity_completed", + "result": str(attrs.result) if attrs.result else None, + }) + + # Look for activity scheduled events (contains tool name) + if hasattr(event, 'activity_task_scheduled_event_attributes'): + attrs = event.activity_task_scheduled_event_attributes + if attrs and hasattr(attrs, 'activity_type'): + activity_name = attrs.activity_type.name if attrs.activity_type else None + transcript.append({ + "type": "function_call", + "name": activity_name, + }) + + return transcript + + +async def get_transcript_event_count(handle: WorkflowHandle) -> int: + """Get the current number of events in the transcript.""" + transcript = await get_workflow_transcript(handle) + return len(transcript) + + +def get_new_tool_calls( + full_transcript: list[dict[str, Any]], + previous_count: int +) -> list[dict[str, Any]]: + """ + Get only the new tool calls since the previous checkpoint. + + Args: + full_transcript: The complete transcript from get_workflow_transcript + previous_count: The transcript length before the event was sent + + Returns: + List of new tool call entries + """ + return full_transcript[previous_count:] + + +def get_workflow_id(handle: WorkflowHandle) -> str: + """Get the workflow ID from a handle.""" + return handle.id diff --git a/examples/demos/procurement_agent/evals/fixtures/__init__.py b/examples/demos/procurement_agent/evals/fixtures/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/demos/procurement_agent/evals/fixtures/events.py b/examples/demos/procurement_agent/evals/fixtures/events.py new file mode 100644 index 000000000..31f1c919b --- /dev/null +++ b/examples/demos/procurement_agent/evals/fixtures/events.py @@ -0,0 +1,108 @@ +""" +Event fixtures for eval test cases. + +Provides factory functions to create events with configurable parameters. +""" +from typing import Optional +from datetime import datetime, timedelta + +from project.models.events import ( + EventType, + InspectionFailedEvent, + InspectionPassedEvent, + SubmitalApprovalEvent, + ShipmentArrivedSiteEvent, + ShipmentDepartedFactoryEvent, +) + + +def create_submittal_approved(item: str) -> SubmitalApprovalEvent: + """Create a Submittal_Approved event.""" + return SubmitalApprovalEvent( + event_type=EventType.SUBMITTAL_APPROVED, + item=item, + document_name=f"{item} Submittal.pdf", + document_url=f"/submittals/{item.lower().replace(' ', '_')}.pdf", + ) + + +def create_shipment_departed( + item: str, + eta: datetime, + date_departed: Optional[datetime] = None, +) -> ShipmentDepartedFactoryEvent: + """ + Create a Shipment_Departed_Factory event. + + Args: + item: The item name + eta: Estimated time of arrival (this is what gets compared to required_by) + date_departed: When shipment left factory (defaults to 7 days before ETA) + """ + if date_departed is None: + date_departed = eta - timedelta(days=7) + + return ShipmentDepartedFactoryEvent( + event_type=EventType.SHIPMENT_DEPARTED_FACTORY, + item=item, + eta=eta, + date_departed=date_departed, + location_address="218 W 18th St, New York, NY 10011", + ) + + +def create_shipment_arrived( + item: str, + date_arrived: datetime, +) -> ShipmentArrivedSiteEvent: + """Create a Shipment_Arrived_Site event.""" + return ShipmentArrivedSiteEvent( + event_type=EventType.SHIPMENT_ARRIVED_SITE, + item=item, + date_arrived=date_arrived, + location_address="650 Townsend St, San Francisco, CA 94103", + ) + + +def create_inspection_failed( + item: str, + inspection_date: Optional[datetime] = None, +) -> InspectionFailedEvent: + """Create an Inspection_Failed event.""" + if inspection_date is None: + inspection_date = datetime.now() + + return InspectionFailedEvent( + event_type=EventType.INSPECTION_FAILED, + item=item, + inspection_date=inspection_date, + document_name=f"{item} Inspection Report.pdf", + document_url=f"/inspections/{item.lower().replace(' ', '_')}_failed.pdf", + ) + + +def create_inspection_passed( + item: str, + inspection_date: Optional[datetime] = None, +) -> InspectionPassedEvent: + """Create an Inspection_Passed event.""" + if inspection_date is None: + inspection_date = datetime.now() + + return InspectionPassedEvent( + event_type=EventType.INSPECTION_PASSED, + item=item, + inspection_date=inspection_date, + document_name=f"{item} Inspection Report.pdf", + document_url=f"/inspections/{item.lower().replace(' ', '_')}_passed.pdf", + ) + + +# Default schedule reference (matches database.py DEFAULT_SCHEDULE) +SCHEDULE_REFERENCE = { + "Steel Beams": {"required_by": "2026-02-15", "buffer_days": 5}, + "HVAC Units": {"required_by": "2026-03-01", "buffer_days": 7}, + "Windows": {"required_by": "2026-03-15", "buffer_days": 10}, + "Flooring Materials": {"required_by": "2026-04-01", "buffer_days": 3}, + "Electrical Panels": {"required_by": "2026-04-15", "buffer_days": 5}, +} diff --git a/examples/demos/procurement_agent/evals/graders/__init__.py b/examples/demos/procurement_agent/evals/graders/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/demos/procurement_agent/evals/graders/database.py b/examples/demos/procurement_agent/evals/graders/database.py new file mode 100644 index 000000000..e1651662c --- /dev/null +++ b/examples/demos/procurement_agent/evals/graders/database.py @@ -0,0 +1,187 @@ +""" +Database grader - verifies database state after agent actions. +""" +from __future__ import annotations + +import json +from typing import Any, Optional +from pathlib import Path + +import aiosqlite # type: ignore[import-not-found] + +# Use the same DB path as the main application +DB_PATH = Path(__file__).parent.parent.parent / "project" / "data" / "procurement.db" + + +async def get_procurement_item(workflow_id: str, item: str) -> Optional[dict[str, Any]]: + """ + Get a procurement item from the database. + + Args: + workflow_id: The Temporal workflow ID + item: The item name + + Returns: + Dict with item fields or None if not found + """ + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + """ + SELECT workflow_id, item, status, eta, date_arrived, purchase_order_id, + created_at, updated_at + FROM procurement_items + WHERE workflow_id = ? AND item = ? + """, + (workflow_id, item) + ) as cursor: + row = await cursor.fetchone() + if row: + return dict(row) + return None + + +async def get_schedule_delivery(workflow_id: str, item: str) -> Optional[dict[str, Any]]: + """ + Get a delivery item from the master construction schedule. + + Args: + workflow_id: The Temporal workflow ID + item: The item name + + Returns: + Dict with delivery fields or None if not found + """ + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + """ + SELECT schedule_json + FROM master_construction_schedule + WHERE workflow_id = ? + """, + (workflow_id,) + ) as cursor: + row = await cursor.fetchone() + if row: + schedule = json.loads(row["schedule_json"]) + for delivery in schedule.get("deliveries", []): + if delivery.get("item") == item: + return delivery + return None + + +async def assert_procurement_item_exists( + workflow_id: str, + item: str, + expected_status: Optional[str] = None, + expected_po_id_not_null: bool = False, + expected_eta: Optional[str] = None, + expected_date_arrived: Optional[str] = None, +) -> dict[str, Any]: + """ + Assert a procurement item exists with expected fields. + + Args: + workflow_id: The Temporal workflow ID + item: The item name + expected_status: If provided, assert status matches + expected_po_id_not_null: If True, assert purchase_order_id is not null + expected_eta: If provided, assert ETA matches + expected_date_arrived: If provided, assert date_arrived matches + + Returns: + The procurement item record + + Raises: + AssertionError: If item doesn't exist or fields don't match + """ + record = await get_procurement_item(workflow_id, item) + + if record is None: + raise AssertionError( + f"Procurement item not found: workflow_id={workflow_id}, item={item}" + ) + + if expected_status is not None: + assert record["status"] == expected_status, ( + f"Expected status '{expected_status}', got '{record['status']}'" + ) + + if expected_po_id_not_null: + assert record["purchase_order_id"] is not None, ( + "Expected purchase_order_id to be set, but it was null" + ) + + if expected_eta is not None: + assert record["eta"] == expected_eta, ( + f"Expected ETA '{expected_eta}', got '{record['eta']}'" + ) + + if expected_date_arrived is not None: + assert record["date_arrived"] == expected_date_arrived, ( + f"Expected date_arrived '{expected_date_arrived}', got '{record['date_arrived']}'" + ) + + return record + + +async def assert_procurement_item_not_exists(workflow_id: str, item: str) -> None: + """ + Assert a procurement item does NOT exist (was deleted). + + Args: + workflow_id: The Temporal workflow ID + item: The item name + + Raises: + AssertionError: If item still exists + """ + record = await get_procurement_item(workflow_id, item) + if record is not None: + raise AssertionError( + f"Procurement item should not exist but was found: {record}" + ) + + +async def assert_schedule_item_not_exists(workflow_id: str, item: str) -> None: + """ + Assert an item is NOT in the master construction schedule (was removed). + + Args: + workflow_id: The Temporal workflow ID + item: The item name + + Raises: + AssertionError: If item still in schedule + """ + delivery = await get_schedule_delivery(workflow_id, item) + if delivery is not None: + raise AssertionError( + f"Schedule item should not exist but was found: {delivery}" + ) + + +async def assert_schedule_delivery_date( + workflow_id: str, + item: str, + expected_required_by: str +) -> None: + """ + Assert a delivery item has the expected required_by date. + + Args: + workflow_id: The Temporal workflow ID + item: The item name + expected_required_by: The expected date string + + Raises: + AssertionError: If date doesn't match + """ + delivery = await get_schedule_delivery(workflow_id, item) + if delivery is None: + raise AssertionError(f"Schedule delivery not found for item: {item}") + + assert delivery["required_by"] == expected_required_by, ( + f"Expected required_by '{expected_required_by}', got '{delivery['required_by']}'" + ) diff --git a/examples/demos/procurement_agent/evals/graders/tool_calls.py b/examples/demos/procurement_agent/evals/graders/tool_calls.py new file mode 100644 index 000000000..7c28a8626 --- /dev/null +++ b/examples/demos/procurement_agent/evals/graders/tool_calls.py @@ -0,0 +1,80 @@ +""" +Tool call grader - extracts and verifies tool calls from workflow transcripts. +""" +from __future__ import annotations + +from typing import Any + + +def extract_tool_calls(transcript: list[dict[str, Any]]) -> list[str]: + """ + Extract tool/function names from a workflow transcript. + + The transcript is the messages array from the agent run, containing + items with type="function_call" for tool invocations. + + Args: + transcript: List of message dicts from agent execution + + Returns: + List of tool names that were called + """ + tool_calls = [] + for item in transcript: + if isinstance(item, dict): + # Handle function_call type (from OpenAI agents format) + if item.get("type") == "function_call": + tool_name = item.get("name") + if tool_name: + tool_calls.append(tool_name) + # Handle tool_calls nested in assistant messages + if item.get("role") == "assistant" and "tool_calls" in item: + for tc in item.get("tool_calls", []): + if isinstance(tc, dict) and "function" in tc: + tool_name = tc["function"].get("name") + if tool_name: + tool_calls.append(tool_name) + return tool_calls + + +def assert_required_tools(transcript: list[dict[str, Any]], required: list[str]) -> None: + """ + Assert that all required tools were called. + + Args: + transcript: The workflow transcript + required: List of tool names that must appear + + Raises: + AssertionError: If any required tool is missing + """ + called = set(extract_tool_calls(transcript)) + missing = set(required) - called + if missing: + raise AssertionError( + f"Required tools not called: {missing}. " + f"Tools that were called: {called}" + ) + + +def assert_forbidden_tools(transcript: list[dict[str, Any]], forbidden: list[str]) -> None: + """ + Assert that forbidden tools were NOT called. + + This is critical for catching false positives (e.g., flagging conflicts + when there shouldn't be any). + + Args: + transcript: The workflow transcript + forbidden: List of tool names that must NOT appear + + Raises: + AssertionError: If any forbidden tool was called + """ + called = set(extract_tool_calls(transcript)) + violations = called & set(forbidden) + if violations: + raise AssertionError( + f"Forbidden tools were called: {violations}. " + f"These tools should NOT have been invoked in this scenario." + ) diff --git a/examples/demos/procurement_agent/evals/pytest.ini b/examples/demos/procurement_agent/evals/pytest.ini new file mode 100644 index 000000000..71d66ba7f --- /dev/null +++ b/examples/demos/procurement_agent/evals/pytest.ini @@ -0,0 +1,8 @@ +[pytest] +asyncio_mode = auto +testpaths = tasks +python_files = test_*.py +python_functions = test_* +markers = + asyncio: mark test as async +addopts = -v --tb=short diff --git a/examples/demos/procurement_agent/evals/report.html b/examples/demos/procurement_agent/evals/report.html new file mode 100644 index 000000000..8b3b6ea04 --- /dev/null +++ b/examples/demos/procurement_agent/evals/report.html @@ -0,0 +1,1094 @@ + + + + + report.html + + + + +

report.html

+

Report generated on 20-Jan-2026 at 11:45:33 by pytest-html + v4.2.0

+
+

Environment

+
+
+ + + + + +
+
+

Summary

+
+
+

15 tests took 00:23:01.

+

(Un)check the boxes to filter the results.

+
+ +
+
+
+
+ + 2 Failed, + + 13 Passed, + + 0 Skipped, + + 0 Expected failures, + + 0 Unexpected passes, + + 0 Errors, + + 0 Reruns + + 0 Retried, +
+
+  /  +
+
+
+
+
+
+
+
+ + + + + + + + + +
ResultTestDurationLinks
+ + + \ No newline at end of file diff --git a/examples/demos/procurement_agent/evals/tasks/__init__.py b/examples/demos/procurement_agent/evals/tasks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/demos/procurement_agent/evals/tasks/test_inspection_failed.py b/examples/demos/procurement_agent/evals/tasks/test_inspection_failed.py new file mode 100644 index 000000000..a975ecd5d --- /dev/null +++ b/examples/demos/procurement_agent/evals/tasks/test_inspection_failed.py @@ -0,0 +1,162 @@ +""" +Tests for Inspection_Failed event handling with human-in-the-loop. + +Verifies: +- Agent escalates to human (wait_for_human called) +- Agent responds correctly to different human inputs: + 1. "Yes" - executes recommended action + 2. "Yes, and also..." - executes action + additional request + 3. "No, delete..." - removes item from schedule +""" +from datetime import datetime + +import pytest + +from evals.conftest import ( + send_event, + get_workflow_id, + send_human_response, + wait_for_processing, + get_workflow_transcript, +) +from evals.fixtures.events import ( + create_shipment_arrived, + create_inspection_failed, + create_shipment_departed, + create_submittal_approved, +) +from evals.graders.database import ( + assert_schedule_delivery_date, + assert_procurement_item_exists, + assert_schedule_item_not_exists, + assert_procurement_item_not_exists, +) +from evals.graders.tool_calls import assert_required_tools + + +async def setup_through_arrived(workflow_handle, item: str) -> None: + """Helper to set up item through shipment arrived state.""" + eta = datetime(2026, 2, 15, 11, 0) + arrival = datetime(2026, 2, 15, 10, 30) + + await send_event(workflow_handle, create_submittal_approved(item)) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + await send_event(workflow_handle, create_shipment_departed(item, eta=eta)) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + await send_event(workflow_handle, create_shipment_arrived(item, date_arrived=arrival)) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + +@pytest.mark.asyncio +async def test_failed_01_human_approves(workflow_handle): + """ + Test Inspection_Failed where human approves recommendation. + + Human response: "Yes" + Expected: Agent executes its recommended action + """ + item = "HVAC Units" + workflow_id = get_workflow_id(workflow_handle) + + # Setup through arrived state + await setup_through_arrived(workflow_handle, item) + + # Send inspection failed + event = create_inspection_failed(item) + await send_event(workflow_handle, event) + + # Wait for agent to escalate (call wait_for_human) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Send human approval + await send_human_response(workflow_handle, "Yes") + + # Wait for agent to process response + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Note: wait_for_human is a function_tool (not Temporal activity), + # so we verify the workflow responded correctly by checking DB state + + # DB should still have the item (agent executed recommendation) + await assert_procurement_item_exists( + workflow_id=workflow_id, + item=item, + ) + + +@pytest.mark.asyncio +async def test_failed_02_human_approves_with_extra_action(workflow_handle): + """ + Test Inspection_Failed where human approves + requests extra action. + + Human response: "Yes, and also update the delivery date to 2026-03-15" + Expected: Agent executes recommendation AND updates delivery date + """ + item = "HVAC Units" + workflow_id = get_workflow_id(workflow_handle) + + await setup_through_arrived(workflow_handle, item) + + event = create_inspection_failed(item) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Human approves AND requests delivery date update + await send_human_response( + workflow_handle, + "Yes, and also update the delivery date to 2026-03-15" + ) + await wait_for_processing(workflow_handle, timeout_seconds=60) # More time for extra action + + transcript = await get_workflow_transcript(workflow_handle) + # Note: wait_for_human is a function_tool (not visible in Temporal history) + # Verify the agent responded to human input by calling update_delivery_date_for_item + assert_required_tools(transcript, [ + "update_delivery_date_for_item", # Should update schedule + ]) + + # Verify schedule was updated + await assert_schedule_delivery_date( + workflow_id=workflow_id, + item=item, + expected_required_by="2026-03-15", + ) + + +@pytest.mark.asyncio +async def test_failed_03_human_rejects_delete(workflow_handle): + """ + Test Inspection_Failed where human rejects and requests deletion. + + Human response: "No, remove it from the master schedule entirely" + Expected: Item removed from schedule AND procurement items + """ + item = "HVAC Units" + workflow_id = get_workflow_id(workflow_handle) + + await setup_through_arrived(workflow_handle, item) + + event = create_inspection_failed(item) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Human rejects and requests deletion + await send_human_response( + workflow_handle, + "No, remove it from the master schedule entirely" + ) + await wait_for_processing(workflow_handle, timeout_seconds=60) + + transcript = await get_workflow_transcript(workflow_handle) + # Note: wait_for_human is a function_tool (not visible in Temporal history) + # Verify the agent responded to human input by removing/deleting items + assert_required_tools(transcript, [ + "remove_delivery_item", # Remove from schedule + "delete_procurement_item_activity", # Delete tracking record + ]) + + # Verify item was deleted from both places + await assert_procurement_item_not_exists(workflow_id, item) + await assert_schedule_item_not_exists(workflow_id, item) diff --git a/examples/demos/procurement_agent/evals/tasks/test_inspection_passed.py b/examples/demos/procurement_agent/evals/tasks/test_inspection_passed.py new file mode 100644 index 000000000..815a5ab23 --- /dev/null +++ b/examples/demos/procurement_agent/evals/tasks/test_inspection_passed.py @@ -0,0 +1,116 @@ +""" +Tests for Inspection_Passed event handling. + +Verifies: +- Procurement item status updated to passed +- No escalation to human (forbidden tools) +""" +from datetime import datetime + +import pytest + +from evals.conftest import ( + send_event, + get_workflow_id, + get_new_tool_calls, + wait_for_processing, + get_workflow_transcript, + get_transcript_event_count, +) +from evals.fixtures.events import ( + create_shipment_arrived, + create_inspection_passed, + create_shipment_departed, + create_submittal_approved, +) +from evals.graders.database import assert_procurement_item_exists +from evals.graders.tool_calls import assert_required_tools, assert_forbidden_tools + + +async def setup_through_arrived(workflow_handle, item: str, eta: datetime, arrival: datetime) -> None: + """Helper to set up item through shipment arrived state.""" + await send_event(workflow_handle, create_submittal_approved(item)) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + await send_event(workflow_handle, create_shipment_departed(item, eta=eta)) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + await send_event(workflow_handle, create_shipment_arrived(item, date_arrived=arrival)) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + +@pytest.mark.asyncio +async def test_passed_01_steel_beams(workflow_handle): + """ + Test Inspection_Passed for Steel Beams. + + Expected: + - update_procurement_item_activity called + - NO wait_for_human (should not escalate on success) + - NO flag_potential_issue + - DB shows inspection_passed status + """ + item = "Steel Beams" + workflow_id = get_workflow_id(workflow_handle) + + eta = datetime(2026, 2, 10, 14, 30) + arrival = datetime(2026, 2, 10, 15, 45) + await setup_through_arrived(workflow_handle, item, eta, arrival) + + # Get transcript count BEFORE sending inspection_passed + previous_count = await get_transcript_event_count(workflow_handle) + + # Send inspection passed + event = create_inspection_passed(item) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Verify tool calls for THIS EVENT ONLY (not entire workflow) + full_transcript = await get_workflow_transcript(workflow_handle) + new_tool_calls = get_new_tool_calls(full_transcript, previous_count) + assert_required_tools(new_tool_calls, ["update_procurement_item_activity"]) + assert_forbidden_tools(new_tool_calls, [ + "wait_for_human", # Should NOT escalate on success + "flag_potential_issue", # Should NOT flag issues + ]) + + # Verify DB state + await assert_procurement_item_exists( + workflow_id=workflow_id, + item=item, + expected_status="inspection_passed", + ) + + +@pytest.mark.asyncio +async def test_passed_02_windows(workflow_handle): + """ + Test Inspection_Passed for Windows. + + Same expectations as Steel Beams. + """ + item = "Windows" + workflow_id = get_workflow_id(workflow_handle) + + eta = datetime(2026, 3, 5, 16, 0) + arrival = datetime(2026, 3, 5, 16, 20) + await setup_through_arrived(workflow_handle, item, eta, arrival) + + # Get transcript count BEFORE sending inspection_passed + previous_count = await get_transcript_event_count(workflow_handle) + + event = create_inspection_passed(item) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Verify tool calls for THIS EVENT ONLY + full_transcript = await get_workflow_transcript(workflow_handle) + new_tool_calls = get_new_tool_calls(full_transcript, previous_count) + assert_required_tools(new_tool_calls, ["update_procurement_item_activity"]) + assert_forbidden_tools(new_tool_calls, ["wait_for_human", "flag_potential_issue"]) + + await assert_procurement_item_exists( + workflow_id=workflow_id, + item=item, + expected_status="inspection_passed", + ) diff --git a/examples/demos/procurement_agent/evals/tasks/test_shipment_arrived.py b/examples/demos/procurement_agent/evals/tasks/test_shipment_arrived.py new file mode 100644 index 000000000..9d9ee293f --- /dev/null +++ b/examples/demos/procurement_agent/evals/tasks/test_shipment_arrived.py @@ -0,0 +1,119 @@ +""" +Tests for Shipment_Arrived_Site event handling. + +Verifies: +- Team notification sent +- Inspection scheduled +- Procurement item updated with arrival date +""" +from datetime import datetime + +import pytest + +from evals.conftest import ( + send_event, + get_workflow_id, + get_new_tool_calls, + wait_for_processing, + get_workflow_transcript, + get_transcript_event_count, +) +from evals.fixtures.events import ( + create_shipment_arrived, + create_shipment_departed, + create_submittal_approved, +) +from evals.graders.database import assert_procurement_item_exists +from evals.graders.tool_calls import assert_required_tools + + +async def setup_through_departed(workflow_handle, item: str, eta: datetime) -> None: + """Helper to set up item through shipment departed state.""" + # Submittal approved + await send_event(workflow_handle, create_submittal_approved(item)) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Shipment departed + await send_event(workflow_handle, create_shipment_departed(item, eta=eta)) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + +@pytest.mark.asyncio +async def test_arrived_01_steel_beams(workflow_handle): + """ + Test Shipment_Arrived_Site for Steel Beams. + + Expected: + - notify_team_shipment_arrived called + - schedule_inspection called + - update_procurement_item_activity called + - DB shows shipment_arrived status with date + """ + item = "Steel Beams" + workflow_id = get_workflow_id(workflow_handle) + arrival_date = datetime(2026, 2, 10, 15, 45) + + # Setup through departed state + eta = datetime(2026, 2, 10, 14, 30) + await setup_through_departed(workflow_handle, item, eta) + + # Get transcript count BEFORE sending arrived event + previous_count = await get_transcript_event_count(workflow_handle) + + # Send arrived event + event = create_shipment_arrived(item, date_arrived=arrival_date) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Verify tool calls for THIS EVENT ONLY + full_transcript = await get_workflow_transcript(workflow_handle) + new_tool_calls = get_new_tool_calls(full_transcript, previous_count) + assert_required_tools(new_tool_calls, [ + "notify_team_shipment_arrived", + "schedule_inspection", + "update_procurement_item_activity", + ]) + + # Verify DB state + await assert_procurement_item_exists( + workflow_id=workflow_id, + item=item, + expected_status="shipment_arrived", + ) + + +@pytest.mark.asyncio +async def test_arrived_02_windows(workflow_handle): + """ + Test Shipment_Arrived_Site for Windows. + + Same expectations as Steel Beams. + """ + item = "Windows" + workflow_id = get_workflow_id(workflow_handle) + arrival_date = datetime(2026, 3, 5, 16, 20) + + eta = datetime(2026, 3, 5, 16, 0) + await setup_through_departed(workflow_handle, item, eta) + + # Get transcript count BEFORE sending arrived event + previous_count = await get_transcript_event_count(workflow_handle) + + event = create_shipment_arrived(item, date_arrived=arrival_date) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Verify tool calls for THIS EVENT ONLY + full_transcript = await get_workflow_transcript(workflow_handle) + new_tool_calls = get_new_tool_calls(full_transcript, previous_count) + assert_required_tools(new_tool_calls, [ + "notify_team_shipment_arrived", + "schedule_inspection", + "update_procurement_item_activity", + ]) + + await assert_procurement_item_exists( + workflow_id=workflow_id, + item=item, + expected_status="shipment_arrived", + ) diff --git a/examples/demos/procurement_agent/evals/tasks/test_shipment_departed.py b/examples/demos/procurement_agent/evals/tasks/test_shipment_departed.py new file mode 100644 index 000000000..4e64172e7 --- /dev/null +++ b/examples/demos/procurement_agent/evals/tasks/test_shipment_departed.py @@ -0,0 +1,202 @@ +""" +Tests for Shipment_Departed_Factory event handling. + +CRITICAL: These tests catch the false positive issue where the agent +incorrectly flags conflicts when ETA is before the required_by date. + +Conflict logic: +- Flag if ETA >= required_by (zero/negative buffer) +- Don't flag if ETA < required_by (has buffer remaining) +""" +from datetime import datetime + +import pytest + +from evals.conftest import ( + send_event, + get_workflow_id, + get_new_tool_calls, + wait_for_processing, + get_workflow_transcript, + get_transcript_event_count, +) +from evals.fixtures.events import ( + create_shipment_departed, + create_submittal_approved, +) +from evals.graders.database import assert_procurement_item_exists +from evals.graders.tool_calls import assert_required_tools, assert_forbidden_tools + + +async def setup_submittal_approved(workflow_handle, item: str) -> None: + """Helper to set up item through submittal approved state.""" + event = create_submittal_approved(item) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + +# ============================================================================= +# NO FLAG CASES - ETA < required_by +# ============================================================================= + +@pytest.mark.asyncio +async def test_departed_01_no_flag_5_days_early(workflow_handle): + """ + Steel Beams: ETA 2026-02-10, Required 2026-02-15 + 5 days early - well within buffer, should NOT flag. + """ + item = "Steel Beams" + workflow_id = get_workflow_id(workflow_handle) + + # Setup: submittal approved first + await setup_submittal_approved(workflow_handle, item) + + # Get transcript count BEFORE sending departed event + previous_count = await get_transcript_event_count(workflow_handle) + + # Send shipment departed with ETA 5 days early + eta = datetime(2026, 2, 10, 14, 30) # Feb 10 + event = create_shipment_departed(item, eta=eta) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Verify tool calls for THIS EVENT ONLY + full_transcript = await get_workflow_transcript(workflow_handle) + new_tool_calls = get_new_tool_calls(full_transcript, previous_count) + assert_required_tools(new_tool_calls, ["update_procurement_item_activity"]) + assert_forbidden_tools(new_tool_calls, ["flag_potential_issue"]) # MUST NOT FLAG + + # Verify DB state + await assert_procurement_item_exists( + workflow_id=workflow_id, + item=item, + expected_status="shipment_departed", + ) + + +@pytest.mark.asyncio +async def test_departed_02_no_flag_1_day_early(workflow_handle): + """ + Steel Beams: ETA 2026-02-14, Required 2026-02-15 + 1 day early - boundary case but still OK, should NOT flag. + """ + item = "Steel Beams" + workflow_id = get_workflow_id(workflow_handle) + + await setup_submittal_approved(workflow_handle, item) + + previous_count = await get_transcript_event_count(workflow_handle) + + eta = datetime(2026, 2, 14, 14, 30) # Feb 14 - 1 day before required + event = create_shipment_departed(item, eta=eta) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + full_transcript = await get_workflow_transcript(workflow_handle) + new_tool_calls = get_new_tool_calls(full_transcript, previous_count) + assert_required_tools(new_tool_calls, ["update_procurement_item_activity"]) + assert_forbidden_tools(new_tool_calls, ["flag_potential_issue"]) # MUST NOT FLAG + + +@pytest.mark.asyncio +async def test_departed_05_no_flag_windows_10_days_early(workflow_handle): + """ + Windows: ETA 2026-03-05, Required 2026-03-15 + 10 days early - uses buffer but still OK, should NOT flag. + """ + item = "Windows" + workflow_id = get_workflow_id(workflow_handle) + + await setup_submittal_approved(workflow_handle, item) + + previous_count = await get_transcript_event_count(workflow_handle) + + eta = datetime(2026, 3, 5, 16, 0) # Mar 5 - 10 days before required + event = create_shipment_departed(item, eta=eta) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + full_transcript = await get_workflow_transcript(workflow_handle) + new_tool_calls = get_new_tool_calls(full_transcript, previous_count) + assert_required_tools(new_tool_calls, ["update_procurement_item_activity"]) + assert_forbidden_tools(new_tool_calls, ["flag_potential_issue"]) # MUST NOT FLAG + + +@pytest.mark.asyncio +async def test_departed_06_no_flag_hvac_1_day_early(workflow_handle): + """ + HVAC Units: ETA 2026-02-28, Required 2026-03-01 + 1 day early - tight boundary case, should NOT flag. + """ + item = "HVAC Units" + workflow_id = get_workflow_id(workflow_handle) + + await setup_submittal_approved(workflow_handle, item) + + previous_count = await get_transcript_event_count(workflow_handle) + + eta = datetime(2026, 2, 28, 11, 0) # Feb 28 - 1 day before Mar 1 + event = create_shipment_departed(item, eta=eta) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + full_transcript = await get_workflow_transcript(workflow_handle) + new_tool_calls = get_new_tool_calls(full_transcript, previous_count) + assert_required_tools(new_tool_calls, ["update_procurement_item_activity"]) + assert_forbidden_tools(new_tool_calls, ["flag_potential_issue"]) # MUST NOT FLAG + + +# ============================================================================= +# FLAG CASES - ETA >= required_by +# ============================================================================= + +@pytest.mark.asyncio +async def test_departed_03_flag_on_deadline(workflow_handle): + """ + Steel Beams: ETA 2026-02-15, Required 2026-02-15 + Arrives ON deadline - zero buffer, SHOULD FLAG. + """ + item = "Steel Beams" + workflow_id = get_workflow_id(workflow_handle) + + await setup_submittal_approved(workflow_handle, item) + + previous_count = await get_transcript_event_count(workflow_handle) + + eta = datetime(2026, 2, 15, 14, 30) # Feb 15 = required date + event = create_shipment_departed(item, eta=eta) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + full_transcript = await get_workflow_transcript(workflow_handle) + new_tool_calls = get_new_tool_calls(full_transcript, previous_count) + assert_required_tools(new_tool_calls, [ + "flag_potential_issue", # MUST FLAG + "update_procurement_item_activity", + ]) + + +@pytest.mark.asyncio +async def test_departed_04_flag_late(workflow_handle): + """ + Steel Beams: ETA 2026-02-20, Required 2026-02-15 + 5 days LATE - definite conflict, SHOULD FLAG. + """ + item = "Steel Beams" + workflow_id = get_workflow_id(workflow_handle) + + await setup_submittal_approved(workflow_handle, item) + + previous_count = await get_transcript_event_count(workflow_handle) + + eta = datetime(2026, 2, 20, 14, 30) # Feb 20 - 5 days after required + event = create_shipment_departed(item, eta=eta) + await send_event(workflow_handle, event) + await wait_for_processing(workflow_handle, timeout_seconds=30) + + full_transcript = await get_workflow_transcript(workflow_handle) + new_tool_calls = get_new_tool_calls(full_transcript, previous_count) + assert_required_tools(new_tool_calls, [ + "flag_potential_issue", # MUST FLAG + "update_procurement_item_activity", + ]) diff --git a/examples/demos/procurement_agent/evals/tasks/test_submittal_approved.py b/examples/demos/procurement_agent/evals/tasks/test_submittal_approved.py new file mode 100644 index 000000000..3182eab7e --- /dev/null +++ b/examples/demos/procurement_agent/evals/tasks/test_submittal_approved.py @@ -0,0 +1,87 @@ +""" +Tests for Submittal_Approved event handling. + +Verifies: +- Purchase order is issued (tool call) +- Procurement item created in DB with correct status and PO ID +""" +import pytest + +from evals.conftest import ( + send_event, + get_workflow_id, + wait_for_processing, + get_workflow_transcript, +) +from evals.fixtures.events import create_submittal_approved +from evals.graders.database import assert_procurement_item_exists +from evals.graders.tool_calls import assert_required_tools + + +@pytest.mark.asyncio +async def test_submittal_01_steel_beams(workflow_handle): + """ + Test Submittal_Approved for Steel Beams. + + Expected: + - issue_purchase_order tool called + - create_procurement_item_activity called + - DB has procurement item with status and PO ID + """ + item = "Steel Beams" + workflow_id = get_workflow_id(workflow_handle) + + # Send event + event = create_submittal_approved(item) + await send_event(workflow_handle, event) + + # Wait for processing + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Get transcript and verify tool calls + transcript = await get_workflow_transcript(workflow_handle) + assert_required_tools(transcript, [ + "issue_purchase_order", + "create_procurement_item_activity", # Activity name in Temporal + ]) + + # Verify DB state + await assert_procurement_item_exists( + workflow_id=workflow_id, + item=item, + expected_status="purchase_order_issued", + expected_po_id_not_null=True, + ) + + +@pytest.mark.asyncio +async def test_submittal_02_hvac_units(workflow_handle): + """ + Test Submittal_Approved for HVAC Units. + + Same expectations as Steel Beams - verifies consistency. + """ + item = "HVAC Units" + workflow_id = get_workflow_id(workflow_handle) + + # Send event + event = create_submittal_approved(item) + await send_event(workflow_handle, event) + + # Wait for processing + await wait_for_processing(workflow_handle, timeout_seconds=30) + + # Get transcript and verify tool calls + transcript = await get_workflow_transcript(workflow_handle) + assert_required_tools(transcript, [ + "issue_purchase_order", + "create_procurement_item_activity", + ]) + + # Verify DB state + await assert_procurement_item_exists( + workflow_id=workflow_id, + item=item, + expected_status="purchase_order_issued", + expected_po_id_not_null=True, + ) diff --git a/examples/demos/procurement_agent/project/agents/procurement_agent.py b/examples/demos/procurement_agent/project/agents/procurement_agent.py index 475cd83e7..45858d45e 100644 --- a/examples/demos/procurement_agent/project/agents/procurement_agent.py +++ b/examples/demos/procurement_agent/project/agents/procurement_agent.py @@ -456,6 +456,19 @@ def new_procurement_agent(master_construction_schedule: str, human_input_learnin If the user says no or has feedback, please come up with another solution and call the wait_for_human tool again (you can call it as many times as needed). +## CRITICAL: When to Flag Potential Issues (Shipment_Departed_Factory events) + +When processing a Shipment_Departed_Factory event, you MUST compare the ETA to the required_by date from the master schedule: + +- **ONLY flag_potential_issue if ETA >= required_by** (zero buffer or late - this is a problem!) +- **DO NOT flag_potential_issue if ETA < required_by** (there is still buffer remaining - no issue!) + +Example 1: Item required_by 2026-02-15, ETA is 2026-02-10 → DO NOT FLAG (5 days buffer remaining) +Example 2: Item required_by 2026-02-15, ETA is 2026-02-15 → FLAG (zero buffer - on the deadline!) +Example 3: Item required_by 2026-02-15, ETA is 2026-02-20 → FLAG (5 days late!) + +The buffer_days field in the schedule is informational only. What matters is: Does ETA arrive BEFORE the required_by date? + ## Context Master Construction Schedule: diff --git a/examples/demos/procurement_agent/project/scripts/happy_path.py b/examples/demos/procurement_agent/project/scripts/happy_path.py new file mode 100644 index 000000000..44ed6247c --- /dev/null +++ b/examples/demos/procurement_agent/project/scripts/happy_path.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python +""" +Happy path demo script - shows two items going through successfully. +Both items pass inspection and arrive within time buffers. +""" + +import os +import sys +import asyncio +from datetime import datetime + +from temporalio.client import Client + +from project.models.events import ( + EventType, + InspectionPassedEvent, + SubmitalApprovalEvent, + ShipmentArrivedSiteEvent, + ShipmentDepartedFactoryEvent, +) +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables + +# Set defaults for local development +os.environ.setdefault("AGENT_NAME", "procurement-agent") +os.environ.setdefault("ACP_URL", "http://localhost:8000") +os.environ.setdefault("WORKFLOW_NAME", "procurement-agent") +os.environ.setdefault("WORKFLOW_TASK_QUEUE", "procurement_agent_queue") +os.environ.setdefault("TEMPORAL_ADDRESS", "localhost:7233") + +logger = make_logger(__name__) +environment_variables = EnvironmentVariables.refresh() + +# Delay between events (seconds) +EVENT_DELAY = 15 + + +async def send_happy_path_events(workflow_id: str): + """Send happy path events: two items, both pass inspection.""" + + # Connect to Temporal + temporal_url = environment_variables.TEMPORAL_ADDRESS or "localhost:7233" + client = await Client.connect(temporal_url) + + # Get handle to the workflow + handle = client.get_workflow_handle(workflow_id) + + # Item 1: Steel Beams - will PASS inspection + # Required by: 2026-02-15, Buffer: 5 days + # Arriving on 2026-02-10 (5 days early - within buffer) + steel_events = [ + SubmitalApprovalEvent( + event_type=EventType.SUBMITTAL_APPROVED, + item="Steel Beams", + document_name="Steel Beams Submittal.pdf", + document_url="/submittal_approval.pdf" + ), + ShipmentDepartedFactoryEvent( + event_type=EventType.SHIPMENT_DEPARTED_FACTORY, + item="Steel Beams", + eta=datetime(2026, 2, 10, 14, 30), + date_departed=datetime(2026, 2, 3, 9, 15), + location_address="218 W 18th St, New York, NY 10011" + ), + ShipmentArrivedSiteEvent( + event_type=EventType.SHIPMENT_ARRIVED_SITE, + item="Steel Beams", + date_arrived=datetime(2026, 2, 10, 15, 45), + location_address="650 Townsend St, San Francisco, CA 94103" + ), + InspectionPassedEvent( + event_type=EventType.INSPECTION_PASSED, + item="Steel Beams", + inspection_date=datetime(2026, 2, 11, 10, 20), + document_name="Steel Beams Inspection Report.pdf", + document_url="/inspection_passed.pdf" + ) + ] + + # Item 2: Windows - will PASS inspection + # Required by: 2026-03-15, Buffer: 10 days + # Arriving on 2026-03-05 (10 days early - within buffer) + windows_events = [ + SubmitalApprovalEvent( + event_type=EventType.SUBMITTAL_APPROVED, + item="Windows", + document_name="Windows Submittal.pdf", + document_url="/submittal_approval.pdf" + ), + ShipmentDepartedFactoryEvent( + event_type=EventType.SHIPMENT_DEPARTED_FACTORY, + item="Windows", + eta=datetime(2026, 3, 5, 16, 0), + date_departed=datetime(2026, 2, 20, 8, 30), + location_address="218 W 18th St, New York, NY 10011" + ), + ShipmentArrivedSiteEvent( + event_type=EventType.SHIPMENT_ARRIVED_SITE, + item="Windows", + date_arrived=datetime(2026, 3, 5, 16, 20), + location_address="650 Townsend St, San Francisco, CA 94103" + ), + InspectionPassedEvent( + event_type=EventType.INSPECTION_PASSED, + item="Windows", + inspection_date=datetime(2026, 3, 6, 9, 45), + document_name="Windows Inspection Report.pdf", + document_url="/inspection_passed.pdf" + ) + ] + + all_events = [ + ("Steel Beams", steel_events), + ("Windows", windows_events), + ] + + print(f"Connected to workflow: {workflow_id}") + print("=" * 60) + print("HAPPY PATH DEMO: Two items, both pass inspection") + print(f"Event delay: {EVENT_DELAY}s") + print("=" * 60) + + for item_name, events in all_events: + print(f"\n{'=' * 60}") + print(f"Processing: {item_name}") + print("=" * 60) + + for i, event in enumerate(events, 1): + print(f"\n[{i}/4] Sending: {event.event_type.value}") + print(f" Item: {event.item}") + + if hasattr(event, 'eta'): + print(f" ETA: {event.eta}") + if hasattr(event, 'date_arrived'): + print(f" Date Arrived: {event.date_arrived}") + if hasattr(event, 'inspection_date'): + print(f" Inspection Date: {event.inspection_date}") + + try: + event_data = event.model_dump_json() + await handle.signal("send_event", event_data) + print(f" ✓ Sent!") + + await asyncio.sleep(EVENT_DELAY) + + except Exception as e: + print(f" ✗ Error: {e}") + logger.error(f"Failed to send event: {e}") + + print("\n" + "=" * 60) + print("Happy path demo complete! Both items passed inspection.") + print("Check the UI to see processed events.") + print("=" * 60) + + +async def main(): + """Main entry point.""" + + if len(sys.argv) > 1: + workflow_id = sys.argv[1] + else: + print("Enter Workflow ID:") + workflow_id = input("Workflow ID: ").strip() + + if not workflow_id: + print("Error: Workflow ID required!") + print("\nUsage: python happy_path.py [workflow_id]") + return + + try: + await send_happy_path_events(workflow_id) + except KeyboardInterrupt: + print("\n\nInterrupted. Goodbye!") + except Exception as e: + logger.error(f"Unexpected error: {e}") + print(f"Error: {e}") + print("\nMake sure:") + print("1. The workflow is running") + print("2. The workflow ID is correct") + print("3. Temporal is accessible at", environment_variables.TEMPORAL_ADDRESS) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/demos/procurement_agent/project/scripts/human_in_the_loop.py b/examples/demos/procurement_agent/project/scripts/human_in_the_loop.py new file mode 100644 index 000000000..c2e2ebc53 --- /dev/null +++ b/examples/demos/procurement_agent/project/scripts/human_in_the_loop.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python +""" +Human-in-the-loop demo script - shows an item that fails inspection. +Demonstrates the need for human intervention when inspection fails. +""" + +import os +import sys +import asyncio +from datetime import datetime + +from temporalio.client import Client + +from project.models.events import ( + EventType, + InspectionFailedEvent, + SubmitalApprovalEvent, + ShipmentArrivedSiteEvent, + ShipmentDepartedFactoryEvent, +) +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables + +# Set defaults for local development +os.environ.setdefault("AGENT_NAME", "procurement-agent") +os.environ.setdefault("ACP_URL", "http://localhost:8000") +os.environ.setdefault("WORKFLOW_NAME", "procurement-agent") +os.environ.setdefault("WORKFLOW_TASK_QUEUE", "procurement_agent_queue") +os.environ.setdefault("TEMPORAL_ADDRESS", "localhost:7233") + +logger = make_logger(__name__) +environment_variables = EnvironmentVariables.refresh() + +# Delay between events (seconds) +EVENT_DELAY = 3 +# Longer delay after inspection failure to observe the failure handling +POST_FAILURE_DELAY = 30 + + +async def send_human_in_the_loop_events(workflow_id: str): + """Send events for one item that fails inspection.""" + + # Connect to Temporal + temporal_url = environment_variables.TEMPORAL_ADDRESS or "localhost:7233" + client = await Client.connect(temporal_url) + + # Get handle to the workflow + handle = client.get_workflow_handle(workflow_id) + + # HVAC Units - will FAIL inspection + # Required by: 2026-03-01, Buffer: 7 days + # Arriving on 2026-02-15 (14 days early - well within buffer) + hvac_events = [ + SubmitalApprovalEvent( + event_type=EventType.SUBMITTAL_APPROVED, + item="HVAC Units", + document_name="HVAC Units Submittal.pdf", + document_url="/submittal_approval.pdf" + ), + ShipmentDepartedFactoryEvent( + event_type=EventType.SHIPMENT_DEPARTED_FACTORY, + item="HVAC Units", + eta=datetime(2026, 2, 15, 11, 0), + date_departed=datetime(2026, 2, 8, 13, 45), + location_address="218 W 18th St, New York, NY 10011" + ), + ShipmentArrivedSiteEvent( + event_type=EventType.SHIPMENT_ARRIVED_SITE, + item="HVAC Units", + date_arrived=datetime(2026, 2, 15, 10, 30), + location_address="650 Townsend St, San Francisco, CA 94103" + ), + InspectionFailedEvent( + event_type=EventType.INSPECTION_FAILED, + item="HVAC Units", + inspection_date=datetime(2026, 2, 16, 14, 15), + document_name="HVAC Units Inspection Report.pdf", + document_url="/inspection_failed.pdf" + ) + ] + + print(f"Connected to workflow: {workflow_id}") + print("=" * 60) + print("HUMAN-IN-THE-LOOP DEMO: Item fails inspection") + print(f"Event delay: {EVENT_DELAY}s") + print("=" * 60) + + print(f"\n{'=' * 60}") + print("Processing: HVAC Units (will FAIL inspection)") + print("=" * 60) + + for i, event in enumerate(hvac_events, 1): + print(f"\n[{i}/4] Sending: {event.event_type.value}") + print(f" Item: {event.item}") + + if hasattr(event, 'eta'): + print(f" ETA: {event.eta}") + if hasattr(event, 'date_arrived'): + print(f" Date Arrived: {event.date_arrived}") + if hasattr(event, 'inspection_date'): + print(f" Inspection Date: {event.inspection_date}") + + try: + event_data = event.model_dump_json() + await handle.signal("send_event", event_data) + print(f" ✓ Sent!") + + # Use longer delay after inspection failure + is_last_event = (i == len(hvac_events)) + if is_last_event: + print(f"\n ⚠️ INSPECTION FAILED!") + print(f" ⏳ Waiting {POST_FAILURE_DELAY}s to observe failure handling...") + print(f" 💡 Check the UI - agent should request human input") + await asyncio.sleep(POST_FAILURE_DELAY) + else: + await asyncio.sleep(EVENT_DELAY) + + except Exception as e: + print(f" ✗ Error: {e}") + logger.error(f"Failed to send event: {e}") + + print("\n" + "=" * 60) + print("Human-in-the-loop demo complete!") + print("The agent should now be waiting for human input to resolve") + print("the inspection failure. Check the UI to provide input.") + print("=" * 60) + + +async def main(): + """Main entry point.""" + + if len(sys.argv) > 1: + workflow_id = sys.argv[1] + else: + print("Enter Workflow ID:") + workflow_id = input("Workflow ID: ").strip() + + if not workflow_id: + print("Error: Workflow ID required!") + print("\nUsage: python human_in_the_loop.py [workflow_id]") + return + + try: + await send_human_in_the_loop_events(workflow_id) + except KeyboardInterrupt: + print("\n\nInterrupted. Goodbye!") + except Exception as e: + logger.error(f"Unexpected error: {e}") + print(f"Error: {e}") + print("\nMake sure:") + print("1. The workflow is running") + print("2. The workflow ID is correct") + print("3. Temporal is accessible at", environment_variables.TEMPORAL_ADDRESS) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/demos/procurement_agent/project/scripts/out_of_order.py b/examples/demos/procurement_agent/project/scripts/out_of_order.py new file mode 100644 index 000000000..164c4a9e5 --- /dev/null +++ b/examples/demos/procurement_agent/project/scripts/out_of_order.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python +""" +Out-of-order events demo script - tests agent's ability to handle duplicate/out-of-order signals. +Sends a submittal approval event again after shipment arrives but before inspection, +to verify the agent recognizes it already happened and ignores the duplicate. +""" + +import os +import sys +import asyncio +from datetime import datetime + +from temporalio.client import Client + +from project.models.events import ( + EventType, + InspectionPassedEvent, + SubmitalApprovalEvent, + ShipmentArrivedSiteEvent, + ShipmentDepartedFactoryEvent, +) +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables + +# Set defaults for local development +os.environ.setdefault("AGENT_NAME", "procurement-agent") +os.environ.setdefault("ACP_URL", "http://localhost:8000") +os.environ.setdefault("WORKFLOW_NAME", "procurement-agent") +os.environ.setdefault("WORKFLOW_TASK_QUEUE", "procurement_agent_queue") +os.environ.setdefault("TEMPORAL_ADDRESS", "localhost:7233") + +logger = make_logger(__name__) +environment_variables = EnvironmentVariables.refresh() + +# Delay between events (seconds) +EVENT_DELAY = 3 +# Longer delay after duplicate to observe how agent handles it +POST_DUPLICATE_DELAY = 10 + + +async def send_out_of_order_events(workflow_id: str): + """Send events with a duplicate submittal approval after shipment arrives.""" + + # Connect to Temporal + temporal_url = environment_variables.TEMPORAL_ADDRESS or "localhost:7233" + client = await Client.connect(temporal_url) + + # Get handle to the workflow + handle = client.get_workflow_handle(workflow_id) + + # Flooring Materials - will PASS inspection, but with duplicate submittal event + # Required by: 2026-04-01, Buffer: 3 days (so buffer deadline is 2026-03-29) + # Arriving on 2026-03-20 (12 days early - well within buffer, no warnings) + events = [ + # 1. Normal: Submittal approved + SubmitalApprovalEvent( + event_type=EventType.SUBMITTAL_APPROVED, + item="Flooring Materials", + document_name="Flooring Materials Submittal.pdf", + document_url="/submittal_approval.pdf" + ), + # 2. Normal: Shipment departs + ShipmentDepartedFactoryEvent( + event_type=EventType.SHIPMENT_DEPARTED_FACTORY, + item="Flooring Materials", + eta=datetime(2026, 3, 20, 13, 15), + date_departed=datetime(2026, 3, 13, 11, 30), + location_address="218 W 18th St, New York, NY 10011" + ), + # 3. Normal: Shipment arrives + ShipmentArrivedSiteEvent( + event_type=EventType.SHIPMENT_ARRIVED_SITE, + item="Flooring Materials", + date_arrived=datetime(2026, 3, 20, 12, 45), + location_address="650 Townsend St, San Francisco, CA 94103" + ), + # 4. OUT OF ORDER: Duplicate submittal approval (should be ignored) + SubmitalApprovalEvent( + event_type=EventType.SUBMITTAL_APPROVED, + item="Flooring Materials", + document_name="Flooring Materials Submittal.pdf", + document_url="/submittal_approval.pdf" + ), + # 5. Normal: Inspection passes + InspectionPassedEvent( + event_type=EventType.INSPECTION_PASSED, + item="Flooring Materials", + inspection_date=datetime(2026, 3, 21, 15, 30), + document_name="Flooring Materials Inspection Report.pdf", + document_url="/inspection_passed.pdf" + ) + ] + + event_labels = [ + "Submittal Approved (initial)", + "Shipment Departed", + "Shipment Arrived", + "Submittal Approved (DUPLICATE - should be ignored)", + "Inspection Passed" + ] + + print(f"Connected to workflow: {workflow_id}") + print("=" * 60) + print("OUT-OF-ORDER DEMO: Testing duplicate event handling") + print(f"Event delay: {EVENT_DELAY}s") + print("=" * 60) + + print(f"\n{'=' * 60}") + print("Processing: Flooring Materials (with duplicate submittal)") + print("=" * 60) + + for i, (event, label) in enumerate(zip(events, event_labels), 1): + is_duplicate = (i == 4) + + print(f"\n[{i}/5] Sending: {label}") + print(f" Event Type: {event.event_type.value}") + print(f" Item: {event.item}") + + if is_duplicate: + print(f" ⚠️ This is a DUPLICATE event - agent should recognize and ignore") + + if hasattr(event, 'eta'): + print(f" ETA: {event.eta}") + if hasattr(event, 'date_arrived'): + print(f" Date Arrived: {event.date_arrived}") + if hasattr(event, 'inspection_date'): + print(f" Inspection Date: {event.inspection_date}") + + try: + event_data = event.model_dump_json() + await handle.signal("send_event", event_data) + print(f" ✓ Sent!") + + # Use longer delay after duplicate to observe handling + if is_duplicate: + print(f" ⏳ Waiting {POST_DUPLICATE_DELAY}s to observe duplicate handling...") + await asyncio.sleep(POST_DUPLICATE_DELAY) + else: + await asyncio.sleep(EVENT_DELAY) + + except Exception as e: + print(f" ✗ Error: {e}") + logger.error(f"Failed to send event: {e}") + + print("\n" + "=" * 60) + print("Out-of-order demo complete!") + print("The agent should have recognized the duplicate submittal") + print("approval and ignored it. Check the UI to verify.") + print("=" * 60) + + +async def main(): + """Main entry point.""" + + if len(sys.argv) > 1: + workflow_id = sys.argv[1] + else: + print("Enter Workflow ID:") + workflow_id = input("Workflow ID: ").strip() + + if not workflow_id: + print("Error: Workflow ID required!") + print("\nUsage: python out_of_order.py [workflow_id]") + return + + try: + await send_out_of_order_events(workflow_id) + except KeyboardInterrupt: + print("\n\nInterrupted. Goodbye!") + except Exception as e: + logger.error(f"Unexpected error: {e}") + print(f"Error: {e}") + print("\nMake sure:") + print("1. The workflow is running") + print("2. The workflow ID is correct") + print("3. Temporal is accessible at", environment_variables.TEMPORAL_ADDRESS) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/demos/procurement_agent/pyproject.toml b/examples/demos/procurement_agent/pyproject.toml index 7ccbf80e0..555819a5d 100644 --- a/examples/demos/procurement_agent/pyproject.toml +++ b/examples/demos/procurement_agent/pyproject.toml @@ -13,6 +13,7 @@ dependencies = [ "temporalio>=1.18.2", "scale-gp", "aiosqlite", + "pytest-html>=4.2.0", ] [project.optional-dependencies] @@ -33,4 +34,4 @@ target-version = ['py312'] [tool.isort] profile = "black" -line_length = 88 \ No newline at end of file +line_length = 88 diff --git a/uv.lock b/uv.lock index 391297102..0bbcf36b0 100644 --- a/uv.lock +++ b/uv.lock @@ -2675,4 +2675,4 @@ source = { registry = "https://pypi.org/simple" } sdist = { url = "https://files.pythonhosted.org/packages/e3/02/0f2892c661036d50ede074e376733dca2ae7c6eb617489437771209d4180/zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166", size = 25547, upload-time = "2025-06-08T17:06:39.4Z" } wheels = [ { url = "https://files.pythonhosted.org/packages/2e/54/647ade08bf0db230bfea292f893923872fd20be6ac6f53b2b936ba839d75/zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e", size = 10276, upload-time = "2025-06-08T17:06:38.034Z" }, -] +] \ No newline at end of file