Skip to content

Conversation

@TaoChenOSU
Copy link
Contributor

@TaoChenOSU TaoChenOSU commented Dec 10, 2025

Motivation and Context

Follow up for #2486

Description

This PR adds factory to the concurrent orchestration builder.

Contribution Checklist

  • The code builds clean without any errors or warnings
  • The PR follows the Contribution Guidelines
  • All unit tests pass, and I have added new tests where possible
  • Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.

@TaoChenOSU TaoChenOSU self-assigned this Dec 10, 2025
Copilot AI review requested due to automatic review settings December 10, 2025 00:46
@TaoChenOSU TaoChenOSU added python workflows Related to Workflows in agent-framework labels Dec 10, 2025
@github-actions github-actions bot changed the title Add factory pattern to concurrent orchestration builder Python: Add factory pattern to concurrent orchestration builder Dec 10, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds factory pattern support to the ConcurrentBuilder class, allowing participants and aggregators to be created through factory functions rather than requiring pre-instantiated objects. This enhancement enables better state isolation between workflow instances created from the same builder.

Key changes:

  • Added register_participants() method accepting factory functions for creating participants
  • Enhanced with_aggregator() to accept executor factories in addition to instances and callbacks
  • Updated build() method to handle both direct instances and factories

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.

File Description
python/packages/core/agent_framework/_workflows/_concurrent.py Core implementation adding factory pattern support with new register_participants() method and enhanced with_aggregator() to handle executor factories
python/packages/core/tests/workflow/test_concurrent.py Comprehensive test coverage for factory patterns including tests for executor instance aggregators, executor factory aggregators, and participant factories
python/samples/getting_started/workflows/orchestration/concurrent_participant_factories.py New sample demonstrating concurrent orchestration using participant factories and custom aggregator with proper state isolation across workflow instances
python/samples/getting_started/workflows/orchestration/concurrent_custom_aggregator.py Updated documentation comment to reflect method rename from with_custom_aggregator to with_aggregator

Comment on lines +244 to +246
Raises:
ValueError: if `participant_factories` is empty, contains duplicates, or `.participants()` was called
TypeError: if any entry is not AgentProtocol or Executor
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring claims to raise ValueError if participant_factories "contains duplicates", but there is no duplicate validation logic in this method. Either implement duplicate checking for factory functions or remove this claim from the docstring.

Note: The participants() method validates for duplicate executor IDs and agent instances, but register_participants() does not perform any such validation.

Copilot uses AI. Check for mistakes.
wf = ConcurrentBuilder().register_participants([create_researcher, create_marketer, create_legal]).build()
# Mixing agent(s) and executor(s) is supported
wf2 = ConcurrentBuilder().register_participants([create_researcher, my_custom_executor]).build()
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example incorrectly uses my_custom_executor which appears to be an executor instance, not a factory. Since register_participants() expects factories, this should be either:

  • A factory function: lambda: MyCustomExecutor() or create_my_custom_executor
  • Or clarify that this is a callable factory

Example:

wf2 = ConcurrentBuilder().register_participants([create_researcher, lambda: MyCustomExecutor()]).build()
Suggested change
wf2 = ConcurrentBuilder().register_participants([create_researcher, my_custom_executor]).build()
wf2 = ConcurrentBuilder().register_participants([create_researcher, lambda: MyCustomExecutor()]).build()

Copilot uses AI. Check for mistakes.
Comment on lines +441 to +443
if self._participant_factories:
for factory in self._participant_factories:
p = factory()
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing duplicate validation when creating participants from factories. The participants() method validates for duplicate executor IDs and agent instances (lines 311-325), but when using register_participants(), the build() method creates participants without checking for duplicates. This could lead to workflow errors if factories return executors with duplicate IDs.

Suggested fix: Add validation after creating participants from factories:

participants: list[Executor | AgentProtocol] = []
if self._participant_factories:
    seen_executor_ids: set[str] = set()
    seen_agent_ids: set[int] = set()
    for factory in self._participant_factories:
        p = factory()
        # Validate for duplicates (similar to participants() method)
        if isinstance(p, Executor):
            if p.id in seen_executor_ids:
                raise ValueError(f"Duplicate executor participant detected: id '{p.id}'")
            seen_executor_ids.add(p.id)
        elif isinstance(p, AgentProtocol):
            pid = id(p)
            if pid in seen_agent_ids:
                raise ValueError("Duplicate agent participant detected")
            seen_agent_ids.add(pid)
        participants.append(p)
else:
    participants = self._participants
Suggested change
if self._participant_factories:
for factory in self._participant_factories:
p = factory()
if self._participant_factories:
seen_executor_ids: set[str] = set()
seen_agent_ids: set[int] = set()
for factory in self._participant_factories:
p = factory()
# Validate for duplicates (similar to participants() method)
if isinstance(p, Executor):
if p.id in seen_executor_ids:
raise ValueError(f"Duplicate executor participant detected: id '{p.id}'")
seen_executor_ids.add(p.id)
elif isinstance(p, AgentProtocol):
pid = id(p)
if pid in seen_agent_ids:
raise ValueError("Duplicate agent participant detected")
seen_agent_ids.add(pid)

Copilot uses AI. Check for mistakes.
class SummarizationExecutor(Executor):
"""Custom aggregator executor that synthesizes expert outputs into a concise summary."""

def __init__(self):
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing return type annotation for __init__ method. According to C# sample code guidelines, the structure should follow patterns in the same directories. Python best practices also recommend explicit type hints.

Suggested fix:

def __init__(self) -> None:
    super().__init__(id="summarization_executor")
    self.chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

Copilot generated this review using guidance from repository custom instructions.


async def test_concurrent_with_aggregator_executor_factory_with_default_id() -> None:
"""Test with_aggregator using an Executor factory."""
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring is not specific enough and duplicates the one from test_concurrent_with_aggregator_executor_factory (line 197). This test specifically validates that an Executor class (not a lambda factory) can be passed directly and will be instantiated with default parameters.

Suggested docstring:

"""Test with_aggregator using an Executor class directly (with default __init__ parameters)."""
Suggested change
"""Test with_aggregator using an Executor factory."""
"""Test with_aggregator using an Executor class directly (with default __init__ parameters)."""

Copilot uses AI. Check for mistakes.
Comment on lines +415 to +450
async def test_concurrent_with_register_participants() -> None:
"""Test workflow creation using register_participants with factories."""

def create_agent1() -> Executor:
return _FakeAgentExec("agentA", "Alpha")

def create_agent2() -> Executor:
return _FakeAgentExec("agentB", "Beta")

def create_agent3() -> Executor:
return _FakeAgentExec("agentC", "Gamma")

wf = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()

completed = False
output: list[ChatMessage] | None = None
async for ev in wf.run_stream("test prompt"):
if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE:
completed = True
elif isinstance(ev, WorkflowOutputEvent):
output = cast(list[ChatMessage], ev.data)
if completed and output is not None:
break

assert completed
assert output is not None
messages: list[ChatMessage] = output

# Expect one user message + one assistant message per participant
assert len(messages) == 1 + 3
assert messages[0].role == Role.USER
assert "test prompt" in messages[0].text

assistant_texts = {m.text for m in messages[1:]}
assert assistant_texts == {"Alpha", "Beta", "Gamma"}
assert all(m.role == Role.ASSISTANT for m in messages[1:])
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for duplicate validation when using register_participants(). While there's a test for duplicate validation with .participants() (line 48: test_concurrent_builder_rejects_duplicate_executors), there's no equivalent test for factories.

Consider adding a test that validates duplicate executor IDs are detected when factories return executors with the same ID:

def test_concurrent_builder_rejects_duplicate_executors_from_factories() -> None:
    """Test that duplicate executor IDs from factories are detected at build time."""
    def create_dup1() -> Executor:
        return _FakeAgentExec("dup", "A")
    
    def create_dup2() -> Executor:
        return _FakeAgentExec("dup", "B")  # same executor id
    
    builder = ConcurrentBuilder().register_participants([create_dup1, create_dup2])
    with pytest.raises(ValueError, match="Duplicate executor"):
        builder.build()

Copilot uses AI. Check for mistakes.
Raises:
ValueError: if `participant_factories` is empty, contains duplicates, or `.participants()` was called
TypeError: if any entry is not AgentProtocol or Executor
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring states "TypeError: if any entry is not AgentProtocol or Executor", but this is inaccurate. The method accepts factories (callables), not instances. The TypeError would only be raised at build() time when the factories are called and return values are validated, not during register_participants() itself.

Consider updating to:

TypeError: if any factory returns a value that is not AgentProtocol or Executor (raised at build time)
Suggested change
TypeError: if any entry is not AgentProtocol or Executor
TypeError: if any factory returns a value that is not AgentProtocol or Executor (raised at build time)

Copilot uses AI. Check for mistakes.
Comment on lines +240 to +242
Accepts AgentProtocol instances (e.g., created by a chat client) or Executor
factories. Each participant created by a factory is wired as a parallel branch
using fan-out edges from an internal dispatcher.
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring incorrectly describes the parameter as accepting "AgentProtocol instances" when it actually accepts factories (callables) that return AgentProtocol or Executor instances.

Suggested correction:

r"""Define the parallel participants for this concurrent workflow.

Accepts factories (callables) that return AgentProtocol instances (e.g., created 
by a chat client) or Executor instances. Each participant created by a factory 
is wired as a parallel branch using fan-out edges from an internal dispatcher.
Suggested change
Accepts AgentProtocol instances (e.g., created by a chat client) or Executor
factories. Each participant created by a factory is wired as a parallel branch
using fan-out edges from an internal dispatcher.
Accepts factories (callables) that return AgentProtocol instances (e.g., created
by a chat client) or Executor instances. Each participant created by a factory
is wired as a parallel branch using fan-out edges from an internal dispatcher.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

python workflows Related to Workflows in agent-framework

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

1 participant