From 5b47e07cd88cbca8bd0ea9497c4d8dfe90de7eac Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Fri, 19 Dec 2025 10:27:09 +0530 Subject: [PATCH 1/2] Sourcer - Bug fixes Signed-off-by: Sreekanth --- packages/pynumaflow/pynumaflow/sourcer/_dtypes.py | 8 +------- .../pynumaflow/sourcer/servicer/async_servicer.py | 4 ++-- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py b/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py index faae8692..4df316bf 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py @@ -74,7 +74,7 @@ def __init__( payload: bytes, offset: Offset, event_time: datetime, - keys: list[str] = None, + keys: Optional[list[str]] = None, headers: Optional[dict[str, str]] = None, user_metadata: Optional[UserMetadata] = None, ): @@ -261,12 +261,6 @@ class Sourcer(metaclass=ABCMeta): """ - def __call__(self, *args, **kwargs): - """ - Allow to call handler function directly if class instance is sent - """ - return self.handler(*args, **kwargs) - @abstractmethod async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator): """ diff --git a/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py b/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py index a9da9340..d420de10 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py +++ b/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py @@ -209,7 +209,7 @@ async def PendingFn( at the user defined source. """ try: - count = await self.__source_pending_handler() + count = self.__source_pending_handler() except BaseException as err: _LOGGER.critical("PendingFn Error", exc_info=True) await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING) @@ -224,7 +224,7 @@ async def PartitionsFn( PartitionsFn returns the partitions of the user defined source. """ try: - partitions = await self.__source_partitions_handler() + partitions = self.__source_partitions_handler() except BaseException as err: _LOGGER.critical("PartitionsFn Error", exc_info=True) await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING) From 6b9dd50acc404eb7c11fcdba4bce240d6bd89b17 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Fri, 19 Dec 2025 11:15:18 +0530 Subject: [PATCH 2/2] convert all Sourcer methods to async Signed-off-by: Sreekanth --- packages/pynumaflow/pynumaflow/sourcer/_dtypes.py | 8 ++++---- .../pynumaflow/sourcer/servicer/async_servicer.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py b/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py index 4df316bf..86f86956 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py @@ -272,28 +272,28 @@ async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator): pass @abstractmethod - def ack_handler(self, ack_request: AckRequest): + async def ack_handler(self, ack_request: AckRequest): """ The ack handler is used to acknowledge the offsets that have been read """ pass @abstractmethod - def nack_handler(self, nack_request: NackRequest): + async def nack_handler(self, nack_request: NackRequest): """ The nack handler is used to negatively acknowledge the offsets on the source """ pass @abstractmethod - def pending_handler(self) -> PendingResponse: + async def pending_handler(self) -> PendingResponse: """ The simple source always returns zero to indicate there is no pending record. """ pass @abstractmethod - def partitions_handler(self) -> PartitionsResponse: + async def partitions_handler(self) -> PartitionsResponse: """ The simple source always returns zero to indicate there is no pending record. """ diff --git a/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py b/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py index d420de10..3d34f7dd 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py +++ b/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py @@ -209,11 +209,11 @@ async def PendingFn( at the user defined source. """ try: - count = self.__source_pending_handler() + count = await self.__source_pending_handler() except BaseException as err: _LOGGER.critical("PendingFn Error", exc_info=True) await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING) - return + raise resp = source_pb2.PendingResponse.Result(count=count.count) return source_pb2.PendingResponse(result=resp) @@ -224,11 +224,11 @@ async def PartitionsFn( PartitionsFn returns the partitions of the user defined source. """ try: - partitions = self.__source_partitions_handler() + partitions = await self.__source_partitions_handler() except BaseException as err: _LOGGER.critical("PartitionsFn Error", exc_info=True) await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING) - return + raise resp = source_pb2.PartitionsResponse.Result(partitions=partitions.partitions) return source_pb2.PartitionsResponse(result=resp)