-
Notifications
You must be signed in to change notification settings - Fork 878
Python: Add factory pattern to concurrent orchestration builder #2738
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Python: Add factory pattern to concurrent orchestration builder #2738
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds factory pattern support to the ConcurrentBuilder class, allowing participants and aggregators to be created through factory functions rather than requiring pre-instantiated objects. This enhancement enables better state isolation between workflow instances created from the same builder.
Key changes:
- Added
register_participants()method accepting factory functions for creating participants - Enhanced
with_aggregator()to accept executor factories in addition to instances and callbacks - Updated
build()method to handle both direct instances and factories
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
python/packages/core/agent_framework/_workflows/_concurrent.py |
Core implementation adding factory pattern support with new register_participants() method and enhanced with_aggregator() to handle executor factories |
python/packages/core/tests/workflow/test_concurrent.py |
Comprehensive test coverage for factory patterns including tests for executor instance aggregators, executor factory aggregators, and participant factories |
python/samples/getting_started/workflows/orchestration/concurrent_participant_factories.py |
New sample demonstrating concurrent orchestration using participant factories and custom aggregator with proper state isolation across workflow instances |
python/samples/getting_started/workflows/orchestration/concurrent_custom_aggregator.py |
Updated documentation comment to reflect method rename from with_custom_aggregator to with_aggregator |
| Raises: | ||
| ValueError: if `participant_factories` is empty, contains duplicates, or `.participants()` was called | ||
| TypeError: if any entry is not AgentProtocol or Executor |
Copilot
AI
Dec 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring claims to raise ValueError if participant_factories "contains duplicates", but there is no duplicate validation logic in this method. Either implement duplicate checking for factory functions or remove this claim from the docstring.
Note: The participants() method validates for duplicate executor IDs and agent instances, but register_participants() does not perform any such validation.
| wf = ConcurrentBuilder().register_participants([create_researcher, create_marketer, create_legal]).build() | ||
| # Mixing agent(s) and executor(s) is supported | ||
| wf2 = ConcurrentBuilder().register_participants([create_researcher, my_custom_executor]).build() |
Copilot
AI
Dec 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The example incorrectly uses my_custom_executor which appears to be an executor instance, not a factory. Since register_participants() expects factories, this should be either:
- A factory function:
lambda: MyCustomExecutor()orcreate_my_custom_executor - Or clarify that this is a callable factory
Example:
wf2 = ConcurrentBuilder().register_participants([create_researcher, lambda: MyCustomExecutor()]).build()| wf2 = ConcurrentBuilder().register_participants([create_researcher, my_custom_executor]).build() | |
| wf2 = ConcurrentBuilder().register_participants([create_researcher, lambda: MyCustomExecutor()]).build() |
| if self._participant_factories: | ||
| for factory in self._participant_factories: | ||
| p = factory() |
Copilot
AI
Dec 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing duplicate validation when creating participants from factories. The participants() method validates for duplicate executor IDs and agent instances (lines 311-325), but when using register_participants(), the build() method creates participants without checking for duplicates. This could lead to workflow errors if factories return executors with duplicate IDs.
Suggested fix: Add validation after creating participants from factories:
participants: list[Executor | AgentProtocol] = []
if self._participant_factories:
seen_executor_ids: set[str] = set()
seen_agent_ids: set[int] = set()
for factory in self._participant_factories:
p = factory()
# Validate for duplicates (similar to participants() method)
if isinstance(p, Executor):
if p.id in seen_executor_ids:
raise ValueError(f"Duplicate executor participant detected: id '{p.id}'")
seen_executor_ids.add(p.id)
elif isinstance(p, AgentProtocol):
pid = id(p)
if pid in seen_agent_ids:
raise ValueError("Duplicate agent participant detected")
seen_agent_ids.add(pid)
participants.append(p)
else:
participants = self._participants| if self._participant_factories: | |
| for factory in self._participant_factories: | |
| p = factory() | |
| if self._participant_factories: | |
| seen_executor_ids: set[str] = set() | |
| seen_agent_ids: set[int] = set() | |
| for factory in self._participant_factories: | |
| p = factory() | |
| # Validate for duplicates (similar to participants() method) | |
| if isinstance(p, Executor): | |
| if p.id in seen_executor_ids: | |
| raise ValueError(f"Duplicate executor participant detected: id '{p.id}'") | |
| seen_executor_ids.add(p.id) | |
| elif isinstance(p, AgentProtocol): | |
| pid = id(p) | |
| if pid in seen_agent_ids: | |
| raise ValueError("Duplicate agent participant detected") | |
| seen_agent_ids.add(pid) |
| class SummarizationExecutor(Executor): | ||
| """Custom aggregator executor that synthesizes expert outputs into a concise summary.""" | ||
|
|
||
| def __init__(self): |
Copilot
AI
Dec 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing return type annotation for __init__ method. According to C# sample code guidelines, the structure should follow patterns in the same directories. Python best practices also recommend explicit type hints.
Suggested fix:
def __init__(self) -> None:
super().__init__(id="summarization_executor")
self.chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())|
|
||
|
|
||
| async def test_concurrent_with_aggregator_executor_factory_with_default_id() -> None: | ||
| """Test with_aggregator using an Executor factory.""" |
Copilot
AI
Dec 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring is not specific enough and duplicates the one from test_concurrent_with_aggregator_executor_factory (line 197). This test specifically validates that an Executor class (not a lambda factory) can be passed directly and will be instantiated with default parameters.
Suggested docstring:
"""Test with_aggregator using an Executor class directly (with default __init__ parameters)."""| """Test with_aggregator using an Executor factory.""" | |
| """Test with_aggregator using an Executor class directly (with default __init__ parameters).""" |
| async def test_concurrent_with_register_participants() -> None: | ||
| """Test workflow creation using register_participants with factories.""" | ||
|
|
||
| def create_agent1() -> Executor: | ||
| return _FakeAgentExec("agentA", "Alpha") | ||
|
|
||
| def create_agent2() -> Executor: | ||
| return _FakeAgentExec("agentB", "Beta") | ||
|
|
||
| def create_agent3() -> Executor: | ||
| return _FakeAgentExec("agentC", "Gamma") | ||
|
|
||
| wf = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build() | ||
|
|
||
| completed = False | ||
| output: list[ChatMessage] | None = None | ||
| async for ev in wf.run_stream("test prompt"): | ||
| if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: | ||
| completed = True | ||
| elif isinstance(ev, WorkflowOutputEvent): | ||
| output = cast(list[ChatMessage], ev.data) | ||
| if completed and output is not None: | ||
| break | ||
|
|
||
| assert completed | ||
| assert output is not None | ||
| messages: list[ChatMessage] = output | ||
|
|
||
| # Expect one user message + one assistant message per participant | ||
| assert len(messages) == 1 + 3 | ||
| assert messages[0].role == Role.USER | ||
| assert "test prompt" in messages[0].text | ||
|
|
||
| assistant_texts = {m.text for m in messages[1:]} | ||
| assert assistant_texts == {"Alpha", "Beta", "Gamma"} | ||
| assert all(m.role == Role.ASSISTANT for m in messages[1:]) |
Copilot
AI
Dec 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing test coverage for duplicate validation when using register_participants(). While there's a test for duplicate validation with .participants() (line 48: test_concurrent_builder_rejects_duplicate_executors), there's no equivalent test for factories.
Consider adding a test that validates duplicate executor IDs are detected when factories return executors with the same ID:
def test_concurrent_builder_rejects_duplicate_executors_from_factories() -> None:
"""Test that duplicate executor IDs from factories are detected at build time."""
def create_dup1() -> Executor:
return _FakeAgentExec("dup", "A")
def create_dup2() -> Executor:
return _FakeAgentExec("dup", "B") # same executor id
builder = ConcurrentBuilder().register_participants([create_dup1, create_dup2])
with pytest.raises(ValueError, match="Duplicate executor"):
builder.build()| Raises: | ||
| ValueError: if `participant_factories` is empty, contains duplicates, or `.participants()` was called | ||
| TypeError: if any entry is not AgentProtocol or Executor |
Copilot
AI
Dec 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring states "TypeError: if any entry is not AgentProtocol or Executor", but this is inaccurate. The method accepts factories (callables), not instances. The TypeError would only be raised at build() time when the factories are called and return values are validated, not during register_participants() itself.
Consider updating to:
TypeError: if any factory returns a value that is not AgentProtocol or Executor (raised at build time)| TypeError: if any entry is not AgentProtocol or Executor | |
| TypeError: if any factory returns a value that is not AgentProtocol or Executor (raised at build time) |
| Accepts AgentProtocol instances (e.g., created by a chat client) or Executor | ||
| factories. Each participant created by a factory is wired as a parallel branch | ||
| using fan-out edges from an internal dispatcher. |
Copilot
AI
Dec 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring incorrectly describes the parameter as accepting "AgentProtocol instances" when it actually accepts factories (callables) that return AgentProtocol or Executor instances.
Suggested correction:
r"""Define the parallel participants for this concurrent workflow.
Accepts factories (callables) that return AgentProtocol instances (e.g., created
by a chat client) or Executor instances. Each participant created by a factory
is wired as a parallel branch using fan-out edges from an internal dispatcher.| Accepts AgentProtocol instances (e.g., created by a chat client) or Executor | |
| factories. Each participant created by a factory is wired as a parallel branch | |
| using fan-out edges from an internal dispatcher. | |
| Accepts factories (callables) that return AgentProtocol instances (e.g., created | |
| by a chat client) or Executor instances. Each participant created by a factory | |
| is wired as a parallel branch using fan-out edges from an internal dispatcher. |
Motivation and Context
Follow up for #2486
Description
This PR adds factory to the concurrent orchestration builder.
Contribution Checklist