Skip to content

Parallel Streaming constructs #534

@elijahbenizzy

Description

@elijahbenizzy

Is your feature request related to a problem? Please describe.
Multiple users have asked to be able to stream back data from multiple simultaneous calls in parallel. This can take multiple forms:

  1. Streaming out the tokens of the final result
  2. Streaming out the tokens of all steraming intermediate results
  3. Streaming out the results of intermediate results as they complete

Another way of putting this -- say we have some zip function that combines streams of parallel subgraphs.

We have:

  1. zip(*[app.stream_result(...) for app in apps])
  2. zip(*[app.iterate_stream(...) for app in apps])
  3. zip(*[app.iterate(...) for app in apps])

For (2) see this feature.

This is useful in something like deep research mode where we want to see results streaming from subgraphs.
Note we may also want to allow the user to pause, cancel, etc... as we go along, so we want to think about how to do that.

Should work well with sync/async. Results should be streamed as they arrive.

Describe the solution you'd like

Focus on the underlying capability, E.G. (2) above. Add the (optional) parameters to:

  1. Not stream intermediate results
  2. Stream just results, not tokens

To enable this, we should leverage the mechanics of the StreamingResultContainer, which has two capabilities:

  1. iterate() -- returns intermediate results in an iterator
  2. get() -- returns the final results, forces exhaustion of the iterator

What we will have is the lower-level TaskBasedParallelAction in streaming mode -- we can consider higher-level constructs later. This will:

  1. Expose the following methods:
    • tasks() -> Generator[SubgraphTask] -- specify streaming tasks
    • stream_process(results: generator[StreamItem]) -> Generator[Tuple[dict, State]] to mirror the current streaming action class
  2. Be parameterized on how to combine results, in the following parameters:
    • intermediate_stream_outputs=[True, False] (default to True)
    • intermediate_nodes=[True, False] (default to True)

The StreamItem class will encapsulate two things -- effectively be an event:

class StreamItem:
    result: dict | Any # from the node 
    state_update: Union[State, None] # if the node is a state update
    action: Action # the action it came from
    task_key: str # task key -- we need to expose this as it's currently an integer
    application_id: str -- additionally an application ID -- TBD on the purpose of this versus task_key

The framework will handle:

  1. Executing
  2. Collating

all subgraphs, as that requires a bunch of parallel programming tricks that the framework should know.

This way, all the user writes is a generator to create all subgraphs and a processor to turn results into one.

Merging with flexible execution

The tricky part about collating the subgraphs as they come in is that, in addition to an executor capability, we need a queuing system. This is because we have a multi-producer, single-consumer model -- we need producers to write events out to the consumers. This queuing system is effectively coupled with the execution engine (E.G. the concurrent.futures.executor) we pass in:

  1. MultiThreadingExecutor <-> queue.Queue
  2. MultiProcessingExecutor <-> multiprocessing.Queue
  3. RayExecutor <->ray.util.queue.Queue
  4. asyncio.gather <-> asyncio.queue (only supported item -- we might not need this as asyncio has some higher-level semantics we can use, E.G. as_completed.

We also have to account for the notion that the processors might be complete so we know when to join.

To solve this, we will:

  1. Create a queuing abstraction, E.G.
class StreamingResultsQueue(abc.ABC):
    @abc.abstractmethod
    def put(self, item: StreamedType) -> None:
        pass

    @abc.abstractmethod
    def get(self, block: bool = True) -> Optional[StreamedType]:
        pass

    @abc.abstractmethod
    def is_done(self) -> bool:
        pass

   @staticmethod
   def multithreaded() -> StreamingResultsQueue:
        ... # utility for implementation, additionally one for multiprocessing...
  1. Pass this in in with_parallel_executor
app_builder = app_builder.witth_parallel_executor(
    executor_factory=ThreadPoolExecutor, 
    queuing_factory=StreamingResultsQueue.multithreaded
)

Next up would be to combine these into a utility that gives all the capabilities -- quite likely we will want more distributed systems primitives, but for now we can decouple them (in power-user mode).

We should also add some utility functions to the builder (with_multithreaded_executor(...)) -- this would make using it much easier.

The queue should be unbounded, effectively, as to not block execution, and will likely not be persisted. Meaning that it's possible to drop results if the consumer fails before the producers do. That's OK -- the general plan is to start after the last result that finished, so the intermediate nodes can persist effectively.

Describe alternatives you've considered

This is already doable with a bunch of engineering -- E.G. the above can all be written. We should also consider adding higher-level constructs (mapping states/actions/whatever), but I want to push those off until later -- we should be able to unify the logic with what already exists and make that easy.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions