Skip to content

Commit 19ac6e5

Browse files
authored
Python [BREAKING]: support magentic agent tool call approvals and plan stalling HITL behavior (#2569)
* Provide way for HITL with magentic * support tool call approvals and hitl stall replan * human plan intervention sample * Clean up * Improve loging * updates
1 parent 292a207 commit 19ac6e5

File tree

11 files changed

+1154
-209
lines changed

11 files changed

+1154
-209
lines changed

python/packages/core/agent_framework/_workflows/__init__.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,15 @@
7474
ORCH_MSG_KIND_USER_TASK,
7575
MagenticBuilder,
7676
MagenticContext,
77+
MagenticHumanInputRequest,
78+
MagenticHumanInterventionDecision,
79+
MagenticHumanInterventionKind,
80+
MagenticHumanInterventionReply,
81+
MagenticHumanInterventionRequest,
7782
MagenticManagerBase,
78-
MagenticPlanReviewDecision,
79-
MagenticPlanReviewReply,
80-
MagenticPlanReviewRequest,
83+
MagenticStallInterventionDecision,
84+
MagenticStallInterventionReply,
85+
MagenticStallInterventionRequest,
8186
StandardMagenticManager,
8287
)
8388
from ._orchestration_state import OrchestrationState
@@ -144,10 +149,15 @@
144149
"InProcRunnerContext",
145150
"MagenticBuilder",
146151
"MagenticContext",
152+
"MagenticHumanInputRequest",
153+
"MagenticHumanInterventionDecision",
154+
"MagenticHumanInterventionKind",
155+
"MagenticHumanInterventionReply",
156+
"MagenticHumanInterventionRequest",
147157
"MagenticManagerBase",
148-
"MagenticPlanReviewDecision",
149-
"MagenticPlanReviewReply",
150-
"MagenticPlanReviewRequest",
158+
"MagenticStallInterventionDecision",
159+
"MagenticStallInterventionReply",
160+
"MagenticStallInterventionRequest",
151161
"ManagerDirectiveModel",
152162
"ManagerSelectionRequest",
153163
"ManagerSelectionResponse",

python/packages/core/agent_framework/_workflows/_magentic.py

Lines changed: 542 additions & 133 deletions
Large diffs are not rendered by default.

python/packages/core/tests/workflow/test_magentic.py

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,13 @@
1212
AgentRunResponseUpdate,
1313
AgentRunUpdateEvent,
1414
BaseAgent,
15-
ChatClientProtocol,
1615
ChatMessage,
17-
ChatResponse,
18-
ChatResponseUpdate,
1916
Executor,
2017
MagenticBuilder,
18+
MagenticHumanInterventionDecision,
19+
MagenticHumanInterventionReply,
20+
MagenticHumanInterventionRequest,
2121
MagenticManagerBase,
22-
MagenticPlanReviewDecision,
23-
MagenticPlanReviewReply,
24-
MagenticPlanReviewRequest,
2522
RequestInfoEvent,
2623
Role,
2724
TextContent,
@@ -59,21 +56,25 @@ def test_magentic_start_message_from_string():
5956
assert msg.task.text == "Do the thing"
6057

6158

62-
def test_plan_review_request_defaults_and_reply_variants():
63-
req = MagenticPlanReviewRequest() # defaults provided by dataclass
59+
def test_human_intervention_request_defaults_and_reply_variants():
60+
from agent_framework._workflows._magentic import MagenticHumanInterventionKind
61+
62+
req = MagenticHumanInterventionRequest(kind=MagenticHumanInterventionKind.PLAN_REVIEW)
6463
assert hasattr(req, "request_id")
6564
assert req.task_text == "" and req.facts_text == "" and req.plan_text == ""
6665
assert isinstance(req.round_index, int) and req.round_index == 0
6766

6867
# Replies: approve, revise with comments, revise with edited text
69-
approve = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)
70-
revise_comments = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.REVISE, comments="Tighten scope")
71-
revise_text = MagenticPlanReviewReply(
72-
decision=MagenticPlanReviewDecision.REVISE,
68+
approve = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)
69+
revise_comments = MagenticHumanInterventionReply(
70+
decision=MagenticHumanInterventionDecision.REVISE, comments="Tighten scope"
71+
)
72+
revise_text = MagenticHumanInterventionReply(
73+
decision=MagenticHumanInterventionDecision.REVISE,
7374
edited_plan_text="- Step 1\n- Step 2",
7475
)
7576

76-
assert approve.decision == MagenticPlanReviewDecision.APPROVE
77+
assert approve.decision == MagenticHumanInterventionDecision.APPROVE
7778
assert revise_comments.comments == "Tighten scope"
7879
assert revise_text.edited_plan_text is not None and revise_text.edited_plan_text.startswith("- Step 1")
7980

@@ -220,15 +221,14 @@ async def test_magentic_workflow_plan_review_approval_to_completion():
220221

221222
req_event: RequestInfoEvent | None = None
222223
async for ev in wf.run_stream("do work"):
223-
if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest:
224+
if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest:
224225
req_event = ev
225226
assert req_event is not None
226227

227228
completed = False
228229
output: list[ChatMessage] | None = None
229-
async for ev in wf.send_responses_streaming(
230-
responses={req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)}
231-
):
230+
reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)
231+
async for ev in wf.send_responses_streaming(responses={req_event.request_id: reply}):
232232
if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE:
233233
completed = True
234234
elif isinstance(ev, WorkflowOutputEvent):
@@ -265,7 +265,7 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ
265265
# Wait for the initial plan review request
266266
req_event: RequestInfoEvent | None = None
267267
async for ev in wf.run_stream("do work"):
268-
if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest:
268+
if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest:
269269
req_event = ev
270270
assert req_event is not None
271271

@@ -274,13 +274,13 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ
274274
completed = False
275275
async for ev in wf.send_responses_streaming(
276276
responses={
277-
req_event.request_id: MagenticPlanReviewReply(
278-
decision=MagenticPlanReviewDecision.APPROVE,
277+
req_event.request_id: MagenticHumanInterventionReply(
278+
decision=MagenticHumanInterventionDecision.APPROVE,
279279
comments="Looks good; consider Z",
280280
)
281281
}
282282
):
283-
if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest:
283+
if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest:
284284
saw_second_review = True
285285
if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE:
286286
completed = True
@@ -338,7 +338,7 @@ async def test_magentic_checkpoint_resume_round_trip():
338338
task_text = "checkpoint task"
339339
req_event: RequestInfoEvent | None = None
340340
async for ev in wf.run_stream(task_text):
341-
if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest:
341+
if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest:
342342
req_event = ev
343343
assert req_event is not None
344344

@@ -359,13 +359,13 @@ async def test_magentic_checkpoint_resume_round_trip():
359359

360360
orchestrator = next(exec for exec in wf_resume.executors.values() if isinstance(exec, MagenticOrchestratorExecutor))
361361

362-
reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)
362+
reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE)
363363
completed: WorkflowOutputEvent | None = None
364364
req_event = None
365365
async for event in wf_resume.run_stream(
366366
resume_checkpoint.checkpoint_id,
367367
):
368-
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest:
368+
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
369369
req_event = event
370370
assert req_event is not None
371371

@@ -430,25 +430,33 @@ async def test_magentic_agent_executor_on_checkpoint_save_and_restore_roundtrip(
430430
from agent_framework import StandardMagenticManager # noqa: E402
431431

432432

433-
class _StubChatClient(ChatClientProtocol):
434-
@property
435-
def additional_properties(self) -> dict[str, Any]:
436-
"""Get additional properties associated with the client."""
437-
return {}
438-
439-
async def get_response(self, messages, **kwargs): # type: ignore[override]
440-
return ChatResponse(messages=[ChatMessage(role=Role.ASSISTANT, text="ok")])
441-
442-
def get_streaming_response(self, messages, **kwargs) -> AsyncIterable[ChatResponseUpdate]: # type: ignore[override]
443-
async def _gen():
444-
if False:
445-
yield ChatResponseUpdate() # pragma: no cover
433+
class _StubManagerAgent(BaseAgent):
434+
"""Stub agent for testing StandardMagenticManager."""
435+
436+
async def run(
437+
self,
438+
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
439+
*,
440+
thread: Any = None,
441+
**kwargs: Any,
442+
) -> AgentRunResponse:
443+
return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, text="ok")])
444+
445+
def run_stream(
446+
self,
447+
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
448+
*,
449+
thread: Any = None,
450+
**kwargs: Any,
451+
) -> AsyncIterable[AgentRunResponseUpdate]:
452+
async def _gen() -> AsyncIterable[AgentRunResponseUpdate]:
453+
yield AgentRunResponseUpdate(message_deltas=[ChatMessage(role=Role.ASSISTANT, text="ok")])
446454

447455
return _gen()
448456

449457

450458
async def test_standard_manager_plan_and_replan_via_complete_monkeypatch():
451-
mgr = StandardMagenticManager(chat_client=_StubChatClient())
459+
mgr = StandardMagenticManager(agent=_StubManagerAgent())
452460

453461
async def fake_complete_plan(messages: list[ChatMessage], **kwargs: Any) -> ChatMessage:
454462
# Return a different response depending on call order length
@@ -481,7 +489,7 @@ async def fake_complete_replan(messages: list[ChatMessage], **kwargs: Any) -> Ch
481489

482490

483491
async def test_standard_manager_progress_ledger_success_and_error():
484-
mgr = StandardMagenticManager(chat_client=_StubChatClient())
492+
mgr = StandardMagenticManager(agent=_StubManagerAgent())
485493
ctx = MagenticContext(
486494
task=ChatMessage(role=Role.USER, text="task"),
487495
participant_descriptions={"alice": "desc"},
@@ -718,7 +726,7 @@ async def test_magentic_checkpoint_resume_rejects_participant_renames():
718726

719727
req_event: RequestInfoEvent | None = None
720728
async for event in workflow.run_stream("task"):
721-
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest:
729+
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
722730
req_event = event
723731

724732
assert req_event is not None

python/samples/getting_started/workflows/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ For additional observability samples in Agent Framework, see the [observability
105105
| Handoff (Return-to-Previous) | [orchestration/handoff_return_to_previous.py](./orchestration/handoff_return_to_previous.py) | Return-to-previous routing: after user input, routes back to the previous specialist instead of coordinator using `.enable_return_to_previous()` |
106106
| Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming |
107107
| Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution |
108+
| Magentic + Human Stall Intervention | [orchestration/magentic_human_replan.py](./orchestration/magentic_human_replan.py) | Human intervenes when workflow stalls with `with_human_input_on_stall()` |
109+
| Magentic + Agent Clarification | [orchestration/magentic_agent_clarification.py](./orchestration/magentic_agent_clarification.py) | Agents ask clarifying questions via `ask_user` tool with `@ai_function(approval_mode="always_require")` |
108110
| Magentic + Checkpoint Resume | [orchestration/magentic_checkpoint.py](./orchestration/magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints |
109111
| Sequential Orchestration (Agents) | [orchestration/sequential_agents.py](./orchestration/sequential_agents.py) | Chain agents sequentially with shared conversation context |
110112
| Sequential Orchestration (Custom Executor) | [orchestration/sequential_custom_executors.py](./orchestration/sequential_custom_executors.py) | Mix agents with a summarizer that appends a compact summary |

python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,21 @@ async def main() -> None:
4848
tools=HostedCodeInterpreterTool(),
4949
)
5050

51+
# Create a manager agent for orchestration
52+
manager_agent = ChatAgent(
53+
name="MagenticManager",
54+
description="Orchestrator that coordinates the research and coding workflow",
55+
instructions="You coordinate a team to complete complex tasks efficiently.",
56+
chat_client=OpenAIChatClient(),
57+
)
58+
5159
print("\nBuilding Magentic Workflow...")
5260

5361
workflow = (
5462
MagenticBuilder()
5563
.participants(researcher=researcher_agent, coder=coder_agent)
5664
.with_standard_manager(
57-
chat_client=OpenAIChatClient(),
65+
agent=manager_agent,
5866
max_round_count=10,
5967
max_stall_count=3,
6068
max_reset_count=2,

python/samples/getting_started/workflows/orchestration/magentic.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ async def main() -> None:
6565
tools=HostedCodeInterpreterTool(),
6666
)
6767

68+
# Create a manager agent for orchestration
69+
manager_agent = ChatAgent(
70+
name="MagenticManager",
71+
description="Orchestrator that coordinates the research and coding workflow",
72+
instructions="You coordinate a team to complete complex tasks efficiently.",
73+
chat_client=OpenAIChatClient(),
74+
)
75+
6876
print("\nBuilding Magentic Workflow...")
6977

7078
# State used by on_agent_stream callback
@@ -75,7 +83,7 @@ async def main() -> None:
7583
MagenticBuilder()
7684
.participants(researcher=researcher_agent, coder=coder_agent)
7785
.with_standard_manager(
78-
chat_client=OpenAIChatClient(),
86+
agent=manager_agent,
7987
max_round_count=10,
8088
max_stall_count=3,
8189
max_reset_count=2,

0 commit comments

Comments
 (0)