Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions packages/pynumaflow/examples/sink/all_sinks/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
####################################################################################################
# Stage 1: Base Builder - installs core dependencies using poetry
####################################################################################################
FROM python:3.10-slim-bullseye AS base-builder

ENV PYSETUP_PATH="/opt/pysetup"
WORKDIR $PYSETUP_PATH

# Copy only core dependency files first for better caching
COPY pyproject.toml poetry.lock README.md ./
COPY pynumaflow/ ./pynumaflow/
RUN apt-get update && apt-get install --no-install-recommends -y \
curl wget build-essential git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& pip install poetry \
&& poetry install --no-root --no-interaction

####################################################################################################
# Stage 2: UDF Builder - adds UDF code and installs UDF-specific deps
####################################################################################################
FROM base-builder AS udf-builder

ENV EXAMPLE_PATH="/opt/pysetup/examples/sink/all_sinks"
ENV POETRY_VIRTUALENVS_IN_PROJECT=true

WORKDIR $EXAMPLE_PATH
COPY examples/sink/all_sinks/ ./
RUN poetry install --no-root --no-interaction

####################################################################################################
# Stage 3: UDF Runtime - clean container with only needed stuff
####################################################################################################
FROM python:3.10-slim-bullseye AS udf

ENV PYSETUP_PATH="/opt/pysetup"
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/sink/all_sinks"
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
ENV PATH="$VENV_PATH/bin:$PATH"

RUN apt-get update && apt-get install --no-install-recommends -y wget \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
&& chmod +x /dumb-init

WORKDIR $PYSETUP_PATH
COPY --from=udf-builder $VENV_PATH $VENV_PATH
COPY --from=udf-builder $EXAMPLE_PATH $EXAMPLE_PATH

WORKDIR $EXAMPLE_PATH

ENTRYPOINT ["/dumb-init", "--"]
CMD ["python", "$EXAMPLE_PATH/example.py"]

EXPOSE 5000
22 changes: 22 additions & 0 deletions packages/pynumaflow/examples/sink/all_sinks/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/all-sinks:${TAG}
DOCKER_FILE_PATH = examples/sink/all_sinks/Dockerfile

.PHONY: update
update:
poetry update -vv

.PHONY: image-push
image-push: update
cd ../../../ && docker buildx build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} \
--platform linux/amd64,linux/arm64 . --push

.PHONY: image
image: update
cd ../../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
74 changes: 74 additions & 0 deletions packages/pynumaflow/examples/sink/all_sinks/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from collections.abc import AsyncIterable
from pynumaflow.sinker import Datum, Responses, Response, Sinker, Message
from pynumaflow.sinker import SinkAsyncServer
import logging
import random

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


class UserDefinedSink(Sinker):
async def handler(self, datums: AsyncIterable[Datum]) -> Responses:
responses = Responses()
async for msg in datums:
if primary_sink_write_status():
logger.info(
"Write to User Defined Sink succeeded, writing %s to onSuccess sink",
msg.value.decode("utf-8"),
)
# create a message to be sent to onSuccess sink
on_success_message = Response.as_on_success(
msg.id,
Message(msg.value, ["on_success"], msg.user_metadata),
)
responses.append(on_success_message)
# Sending `None`, on the other hand, specifies that simply send
# the original message to the onSuccess sink
# `responses.append(Response.as_on_success(msg.id, None))`
else:
logger.info(
"Write to User Defined Sink failed, writing %s to fallback sink",
msg.value.decode("utf-8"),
)
responses.append(Response.as_fallback(msg.id))
return responses


async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses:
responses = Responses()
async for msg in datums:
if primary_sink_write_status():
logger.info(
"Write to User Defined Sink succeeded, writing %s to onSuccess sink",
msg.value.decode("utf-8"),
)
# create a message to be sent to onSuccess sink
on_success_message = Response.as_on_success(
msg.id,
Message(msg.value).with_keys(["on_success"]).with_user_metadata(msg.user_metadata),
)
responses.append(on_success_message)
# Sending `None`, on the other hand, specifies that simply send
# the original message to the onSuccess sink
# `responses.append(Response.as_on_success(msg.id, None))`
else:
logger.info(
"Write to User Defined Sink failed, writing %s to fallback sink",
msg.value.decode("utf-8"),
)
responses.append(Response.as_fallback(msg.id))
return responses


def primary_sink_write_status():
# simulate writing to primary sink and return status of the same
# return True if writing to primary sink succeeded
# return False if writing to primary sink failed
return random.randint(0, 1) == 1


if __name__ == "__main__":
sink_handler = UserDefinedSink()
grpc_server = SinkAsyncServer(sink_handler)
grpc_server.start()
44 changes: 44 additions & 0 deletions packages/pynumaflow/examples/sink/all_sinks/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: all-sinks-pipeline
spec:
vertices:
- name: in
source:
generator:
rpu: 1
duration: 1s
msgSize: 10
- name: out
sink:
udsink:
container:
args:
- python
- example.py
image: quay.io/numaio/numaflow-python/all-sinks:stable
imagePullPolicy: IfNotPresent
env:
- name: PYTHONDEBUG
value: "true"
- name: INVOKE
value: "func_handler"
fallback:
udsink:
container:
image: quay.io/numaio/numaflow-python/sink-log:stable
imagePullPolicy: IfNotPresent
onSuccess:
udsink:
container:
image: quay.io/numaio/numaflow-rs/sink-log:stable
imagePullPolicy: IfNotPresent
- name: log-output
sink:
log: {}
edges:
- from: in
to: out
- from: in
to: log-output
15 changes: 15 additions & 0 deletions packages/pynumaflow/examples/sink/all_sinks/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[tool.poetry]
name = "example-sink"
version = "0.2.4"
description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = ">=3.10,<3.13"
pynumaflow = { path = "../../../"}

[tool.poetry.dev-dependencies]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
3 changes: 3 additions & 0 deletions packages/pynumaflow/pynumaflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
SOURCE_SOCK_PATH = "/var/run/numaflow/source.sock"
MULTIPROC_MAP_SOCK_ADDR = "/var/run/numaflow/multiproc"
FALLBACK_SINK_SOCK_PATH = "/var/run/numaflow/fb-sink.sock"
ON_SUCCESS_SINK_SOCK_PATH = "/var/run/numaflow/ons-sink.sock"
BATCH_MAP_SOCK_PATH = "/var/run/numaflow/batchmap.sock"
ACCUMULATOR_SOCK_PATH = "/var/run/numaflow/accumulator.sock"

Expand All @@ -37,10 +38,12 @@
SIDE_INPUT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info"
SOURCE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcer-server-info"
FALLBACK_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info"
ON_SUCCESS_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/ons-sinker-server-info"
ACCUMULATOR_SERVER_INFO_FILE_PATH = "/var/run/numaflow/accumulator-server-info"

ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"
UD_CONTAINER_FALLBACK_SINK = "fb-udsink"
UD_CONTAINER_ON_SUCCESS_SINK = "ons-udsink"

# TODO: need to make sure the DATUM_KEY value is the same as
# https://github.com/numaproj/numaflow-go/blob/main/pkg/function/configs.go#L6
Expand Down
8 changes: 8 additions & 0 deletions packages/pynumaflow/pynumaflow/proto/sinker/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,28 @@ enum Status {
FAILURE = 1;
FALLBACK = 2;
SERVE = 3;
ON_SUCCESS = 4;
}

/**
* SinkResponse is the individual response of each message written to the sink.
*/
message SinkResponse {
message Result {
message Message {
bytes value = 1;
repeated string keys = 2;
common.Metadata metadata = 3;
}
// id is the ID of the message, can be used to uniquely identify the message.
string id = 1;
// status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK.
Status status = 2;
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
optional bytes serve_response = 4;
// on_success_msg is the message to be sent to on_success sink.
optional Message on_success_msg = 5;
}
repeated Result results = 1;
optional Handshake handshake = 2;
Expand Down
18 changes: 10 additions & 8 deletions packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions packages/pynumaflow/pynumaflow/proto/sinker/sink_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ class Status(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
FAILURE: _ClassVar[Status]
FALLBACK: _ClassVar[Status]
SERVE: _ClassVar[Status]
ON_SUCCESS: _ClassVar[Status]
SUCCESS: Status
FAILURE: Status
FALLBACK: Status
SERVE: Status
ON_SUCCESS: Status

class SinkRequest(_message.Message):
__slots__ = ("request", "status", "handshake")
Expand Down Expand Up @@ -78,16 +80,27 @@ class TransmissionStatus(_message.Message):
class SinkResponse(_message.Message):
__slots__ = ("results", "handshake", "status")
class Result(_message.Message):
__slots__ = ("id", "status", "err_msg", "serve_response")
__slots__ = ("id", "status", "err_msg", "serve_response", "on_success_msg")
class Message(_message.Message):
__slots__ = ("value", "keys", "metadata")
VALUE_FIELD_NUMBER: _ClassVar[int]
KEYS_FIELD_NUMBER: _ClassVar[int]
METADATA_FIELD_NUMBER: _ClassVar[int]
value: bytes
keys: _containers.RepeatedScalarFieldContainer[str]
metadata: _metadata_pb2.Metadata
def __init__(self, value: _Optional[bytes] = ..., keys: _Optional[_Iterable[str]] = ..., metadata: _Optional[_Union[_metadata_pb2.Metadata, _Mapping]] = ...) -> None: ...
ID_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
ERR_MSG_FIELD_NUMBER: _ClassVar[int]
SERVE_RESPONSE_FIELD_NUMBER: _ClassVar[int]
ON_SUCCESS_MSG_FIELD_NUMBER: _ClassVar[int]
id: str
status: Status
err_msg: str
serve_response: bytes
def __init__(self, id: _Optional[str] = ..., status: _Optional[_Union[Status, str]] = ..., err_msg: _Optional[str] = ..., serve_response: _Optional[bytes] = ...) -> None: ...
on_success_msg: SinkResponse.Result.Message
def __init__(self, id: _Optional[str] = ..., status: _Optional[_Union[Status, str]] = ..., err_msg: _Optional[str] = ..., serve_response: _Optional[bytes] = ..., on_success_msg: _Optional[_Union[SinkResponse.Result.Message, _Mapping]] = ...) -> None: ...
RESULTS_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
Expand Down
3 changes: 2 additions & 1 deletion packages/pynumaflow/pynumaflow/sinker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from pynumaflow.sinker.server import SinkServer

from pynumaflow._metadata import UserMetadata, SystemMetadata
from pynumaflow.sinker._dtypes import Response, Responses, Datum, Sinker
from pynumaflow.sinker._dtypes import Response, Responses, Datum, Sinker, Message

__all__ = [
"Message",
"Response",
"Responses",
"Datum",
Expand Down
Loading