-
Notifications
You must be signed in to change notification settings - Fork 102
Description
Is your feature request related to a problem? Please describe.
Multiple users have asked to be able to stream back data from multiple simultaneous calls in parallel. This can take multiple forms:
- Streaming out the tokens of the final result
- Streaming out the tokens of all steraming intermediate results
- Streaming out the results of intermediate results as they complete
Another way of putting this -- say we have some zip function that combines streams of parallel subgraphs.
We have:
zip(*[app.stream_result(...) for app in apps])zip(*[app.iterate_stream(...) for app in apps])zip(*[app.iterate(...) for app in apps])
For (2) see this feature.
This is useful in something like deep research mode where we want to see results streaming from subgraphs.
Note we may also want to allow the user to pause, cancel, etc... as we go along, so we want to think about how to do that.
Should work well with sync/async. Results should be streamed as they arrive.
Describe the solution you'd like
Focus on the underlying capability, E.G. (2) above. Add the (optional) parameters to:
- Not stream intermediate results
- Stream just results, not tokens
To enable this, we should leverage the mechanics of the StreamingResultContainer, which has two capabilities:
iterate()-- returns intermediate results in an iteratorget()-- returns the final results, forces exhaustion of the iterator
What we will have is the lower-level TaskBasedParallelAction in streaming mode -- we can consider higher-level constructs later. This will:
- Expose the following methods:
tasks() -> Generator[SubgraphTask]-- specify streaming tasksstream_process(results: generator[StreamItem]) -> Generator[Tuple[dict, State]]to mirror the current streaming action class
- Be parameterized on how to combine results, in the following parameters:
intermediate_stream_outputs=[True, False](default to True)intermediate_nodes=[True, False](default to True)
The StreamItem class will encapsulate two things -- effectively be an event:
class StreamItem:
result: dict | Any # from the node
state_update: Union[State, None] # if the node is a state update
action: Action # the action it came from
task_key: str # task key -- we need to expose this as it's currently an integer
application_id: str -- additionally an application ID -- TBD on the purpose of this versus task_keyThe framework will handle:
- Executing
- Collating
all subgraphs, as that requires a bunch of parallel programming tricks that the framework should know.
This way, all the user writes is a generator to create all subgraphs and a processor to turn results into one.
Merging with flexible execution
The tricky part about collating the subgraphs as they come in is that, in addition to an executor capability, we need a queuing system. This is because we have a multi-producer, single-consumer model -- we need producers to write events out to the consumers. This queuing system is effectively coupled with the execution engine (E.G. the concurrent.futures.executor) we pass in:
MultiThreadingExecutor<->queue.QueueMultiProcessingExecutor<->multiprocessing.QueueRayExecutor<->ray.util.queue.Queueasyncio.gather<->asyncio.queue(only supported item -- we might not need this as asyncio has some higher-level semantics we can use, E.G.as_completed.
We also have to account for the notion that the processors might be complete so we know when to join.
To solve this, we will:
- Create a queuing abstraction, E.G.
class StreamingResultsQueue(abc.ABC):
@abc.abstractmethod
def put(self, item: StreamedType) -> None:
pass
@abc.abstractmethod
def get(self, block: bool = True) -> Optional[StreamedType]:
pass
@abc.abstractmethod
def is_done(self) -> bool:
pass
@staticmethod
def multithreaded() -> StreamingResultsQueue:
... # utility for implementation, additionally one for multiprocessing...- Pass this in in
with_parallel_executor
app_builder = app_builder.witth_parallel_executor(
executor_factory=ThreadPoolExecutor,
queuing_factory=StreamingResultsQueue.multithreaded
)Next up would be to combine these into a utility that gives all the capabilities -- quite likely we will want more distributed systems primitives, but for now we can decouple them (in power-user mode).
We should also add some utility functions to the builder (with_multithreaded_executor(...)) -- this would make using it much easier.
The queue should be unbounded, effectively, as to not block execution, and will likely not be persisted. Meaning that it's possible to drop results if the consumer fails before the producers do. That's OK -- the general plan is to start after the last result that finished, so the intermediate nodes can persist effectively.
Describe alternatives you've considered
This is already doable with a bunch of engineering -- E.G. the above can all be written. We should also consider adding higher-level constructs (mapping states/actions/whatever), but I want to push those off until later -- we should be able to unify the logic with what already exists and make that easy.