diff --git a/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py b/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py index faae8692..86f86956 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/sourcer/_dtypes.py @@ -74,7 +74,7 @@ def __init__( payload: bytes, offset: Offset, event_time: datetime, - keys: list[str] = None, + keys: Optional[list[str]] = None, headers: Optional[dict[str, str]] = None, user_metadata: Optional[UserMetadata] = None, ): @@ -261,12 +261,6 @@ class Sourcer(metaclass=ABCMeta): """ - def __call__(self, *args, **kwargs): - """ - Allow to call handler function directly if class instance is sent - """ - return self.handler(*args, **kwargs) - @abstractmethod async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator): """ @@ -278,28 +272,28 @@ async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator): pass @abstractmethod - def ack_handler(self, ack_request: AckRequest): + async def ack_handler(self, ack_request: AckRequest): """ The ack handler is used to acknowledge the offsets that have been read """ pass @abstractmethod - def nack_handler(self, nack_request: NackRequest): + async def nack_handler(self, nack_request: NackRequest): """ The nack handler is used to negatively acknowledge the offsets on the source """ pass @abstractmethod - def pending_handler(self) -> PendingResponse: + async def pending_handler(self) -> PendingResponse: """ The simple source always returns zero to indicate there is no pending record. """ pass @abstractmethod - def partitions_handler(self) -> PartitionsResponse: + async def partitions_handler(self) -> PartitionsResponse: """ The simple source always returns zero to indicate there is no pending record. """ diff --git a/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py b/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py index a9da9340..3d34f7dd 100644 --- a/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py +++ b/packages/pynumaflow/pynumaflow/sourcer/servicer/async_servicer.py @@ -213,7 +213,7 @@ async def PendingFn( except BaseException as err: _LOGGER.critical("PendingFn Error", exc_info=True) await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING) - return + raise resp = source_pb2.PendingResponse.Result(count=count.count) return source_pb2.PendingResponse(result=resp) @@ -228,7 +228,7 @@ async def PartitionsFn( except BaseException as err: _LOGGER.critical("PartitionsFn Error", exc_info=True) await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING) - return + raise resp = source_pb2.PartitionsResponse.Result(partitions=partitions.partitions) return source_pb2.PartitionsResponse(result=resp)