Skip to content

Commit 3f1d0d1

Browse files
Add worker resource and queue metrics to OTel instrumentation (#613)
1 parent 6cf0568 commit 3f1d0d1

4 files changed

Lines changed: 387 additions & 2 deletions

File tree

taskiq/middlewares/opentelemetry_middleware.py

Lines changed: 144 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import logging
2+
from collections.abc import Generator
23
from contextlib import AbstractContextManager
4+
from datetime import datetime, timezone
35
from importlib.metadata import version
46
from typing import Any, TypeVar
57

8+
import psutil
69
from packaging.version import Version, parse
710

811
try:
@@ -16,7 +19,7 @@
1619

1720
from opentelemetry import context as context_api
1821
from opentelemetry import trace
19-
from opentelemetry.metrics import Meter, MeterProvider, get_meter
22+
from opentelemetry.metrics import Meter, MeterProvider, Observation, get_meter
2023
from opentelemetry.propagate import extract, inject
2124
from opentelemetry.semconv.trace import SpanAttributes
2225
from opentelemetry.trace import Span, Tracer, TracerProvider
@@ -59,6 +62,9 @@
5962
_TASK_RETRY_REASON_KEY = "taskiq.retry.reason"
6063
_TASK_NAME_KEY = "taskiq.task_name"
6164

65+
_TASK_QUEUE_TIME_KEY = "_taskiq_queue_time"
66+
_TASK_RECEIVED_TIME_KEY = "_taskiq_broker_receive_time"
67+
6268

6369
def set_attributes_from_context(span: Span, context: dict[str, Any]) -> None:
6470
"""Helper to extract meta values from a Taskiq Context."""
@@ -170,6 +176,74 @@ def __init__(
170176
if meter is None
171177
else meter
172178
)
179+
# Create metrics
180+
# 1- Number of tasks sent. Producer (Counter)
181+
self.n_tasks_sent_counter = self._meter.create_counter(
182+
name="tasks_sent",
183+
unit="1",
184+
description="Number of tasks sent from the producer side",
185+
)
186+
# 2- Number of errors by task name. consumer (Counter)
187+
self.n_errors_counter = self._meter.create_counter(
188+
name="task_errors",
189+
unit="1",
190+
description="Number of errors raised",
191+
)
192+
# 3- Number of task successes. consumer (Counter)
193+
self.n_success_counter = self._meter.create_counter(
194+
name="task_success",
195+
unit="1",
196+
description="Number of tasks completed successfully",
197+
)
198+
# 4- Task execution time. consumer (Histogram)
199+
self.execution_time_hist = self._meter.create_histogram(
200+
"task_execution_time",
201+
unit="s",
202+
description="Time to finish executing tasks",
203+
)
204+
# 5- Task wait time. both (Histogram)
205+
self.task_wait_time = self._meter.create_histogram(
206+
"task_wait_time",
207+
unit="s",
208+
description="Time the tasks waited before executing",
209+
)
210+
# current metrics to watch for in workers: CPU and memory utilization
211+
self._process = psutil.Process()
212+
# 6- CPU utilization
213+
self.worker_cpu_utilization = self._meter.create_observable_gauge(
214+
"worker_cpu_utilization",
215+
callbacks=[self._observe_cpu],
216+
unit="%",
217+
description="Worker CPU utilization percentage. Only for worker processes",
218+
)
219+
# 7- Memory utilization
220+
self.worker_memory_utilization = self._meter.create_observable_gauge(
221+
"worker_memory_utilization",
222+
callbacks=[self._observe_memory],
223+
unit="By",
224+
description="Worker memory utilization in bytes. Only for worker processes",
225+
)
226+
227+
# 8- Number of tasks executing
228+
self.number_of_broker_active_tasks = self._meter.create_up_down_counter(
229+
"worker_active_tasks",
230+
unit="1",
231+
description="Number of tasks currently executing in the worker.",
232+
)
233+
# 9- Number of tasks executing
234+
self.number_of_broker_prefetched_tasks = self._meter.create_up_down_counter(
235+
"worker_prefetched_tasks",
236+
unit="1",
237+
description="Number of tasks currently prefetched in the worker.",
238+
)
239+
240+
def _observe_memory(self, options: Any) -> Generator[Observation, None, None]:
241+
if self.broker and self.broker.is_worker_process:
242+
yield Observation(self._process.memory_info().rss)
243+
244+
def _observe_cpu(self, options: Any) -> Generator[Observation, None, None]:
245+
if self.broker and self.broker.is_worker_process:
246+
yield Observation(self._process.cpu_percent())
173247

174248
def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
175249
"""
@@ -193,7 +267,7 @@ def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
193267
activation.__enter__()
194268
attach_context(message, span, activation, None, is_publish=True)
195269
inject(message.labels)
196-
270+
message.labels[_TASK_QUEUE_TIME_KEY] = datetime.now(timezone.utc).timestamp()
197271
return message
198272

199273
def post_send(self, message: TaskiqMessage) -> None:
@@ -214,6 +288,7 @@ def post_send(self, message: TaskiqMessage) -> None:
214288

215289
activation.__exit__(None, None, None)
216290
detach_context(message, is_publish=True)
291+
self.n_tasks_sent_counter.add(1, attributes={"task_name": message.task_name})
217292

218293
def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
219294
"""
@@ -236,6 +311,11 @@ def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
236311
activation = trace.use_span(span, end_on_exit=True)
237312
activation.__enter__() # pylint: disable=E1101
238313
attach_context(message, span, activation, token)
314+
message.labels[_TASK_RECEIVED_TIME_KEY] = datetime.now(timezone.utc).timestamp()
315+
self.number_of_broker_active_tasks.add(
316+
1,
317+
attributes={"task_name": message.task_name},
318+
)
239319
return message
240320

241321
def post_save( # pylint: disable=R6301
@@ -313,3 +393,65 @@ def on_error(
313393
}
314394
span.record_exception(exception)
315395
span.set_status(Status(**status_kwargs)) # type: ignore[arg-type]
396+
397+
def post_execute(
398+
self,
399+
message: "TaskiqMessage",
400+
result: "TaskiqResult[Any]",
401+
) -> None:
402+
"""
403+
This function tracks number of errors and success executions.
404+
405+
:param message: received message.
406+
:param result: result of the execution.
407+
"""
408+
if result.is_err:
409+
retry_on_error = message.labels.get("retry_on_error")
410+
if isinstance(retry_on_error, str):
411+
retry_on_error = retry_on_error.lower() == "true"
412+
413+
if retry_on_error is None:
414+
retry_on_error = False
415+
416+
if retry_on_error:
417+
# Add retry reason metadata to span
418+
self.n_errors_counter.add(
419+
1,
420+
attributes={"retry_error": True, "task_name": message.task_name},
421+
)
422+
else:
423+
self.n_errors_counter.add(
424+
1,
425+
attributes={"retry_error": False, "task_name": message.task_name},
426+
)
427+
else:
428+
self.n_success_counter.add(
429+
1,
430+
attributes={"task_name": message.task_name},
431+
)
432+
self.execution_time_hist.record(
433+
result.execution_time,
434+
attributes={
435+
"task_name": message.task_name,
436+
},
437+
)
438+
task_receive_time = message.labels.get(_TASK_RECEIVED_TIME_KEY)
439+
task_send_time = message.labels.get(_TASK_QUEUE_TIME_KEY)
440+
if task_receive_time is not None and task_send_time is not None:
441+
self.task_wait_time.record(
442+
amount=task_receive_time - task_send_time,
443+
attributes={"task_name": message.task_name},
444+
)
445+
446+
self.number_of_broker_active_tasks.add(
447+
-1,
448+
attributes={"task_name": message.task_name},
449+
)
450+
451+
def on_prefetch_queue_add(self) -> None:
452+
"""This hook is called after task is added to the worker prefetch queue."""
453+
self.number_of_broker_prefetched_tasks.add(1)
454+
455+
def on_prefetch_queue_remove(self) -> None:
456+
"""This hook is called after task is removed from the worker prefetch queue."""
457+
self.number_of_broker_prefetched_tasks.add(-1)

taskiq/receiver/receiver.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,12 @@ async def prefetcher(
425425
current_message = asyncio.create_task(iterator.__anext__()) # type: ignore
426426
fetched_tasks += 1
427427
await queue.put(message)
428+
# Custom hooks for OTel and any future instrumentations
429+
for middleware in reversed(self.broker.middlewares):
430+
if hasattr(middleware, "on_prefetch_queue_add"):
431+
await maybe_awaitable(
432+
middleware.on_prefetch_queue_add(), # type: ignore
433+
)
428434
except (asyncio.CancelledError, StopAsyncIteration):
429435
break
430436
# We don't want to fetch new messages if we are shutting down.
@@ -476,6 +482,13 @@ def task_cb(task: "asyncio.Task[Any]") -> None:
476482
logger.info("No more tasks to wait for. Shutting down.")
477483
break
478484

485+
# Custom hooks for OTel and any future instrumentations
486+
for middleware in reversed(self.broker.middlewares):
487+
if hasattr(middleware, "on_prefetch_queue_remove"):
488+
await maybe_awaitable(
489+
middleware.on_prefetch_queue_remove(), # type: ignore
490+
)
491+
479492
task = asyncio.create_task(
480493
self.callback(message=message, raise_err=False),
481494
)

tests/opentelemetry/taskiq_test_tasks.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from typing import Any
23

34
from opentelemetry import baggage
@@ -26,3 +27,8 @@ async def task_raises() -> None:
2627
@broker.task
2728
async def task_returns_baggage() -> Any:
2829
return dict(baggage.get_all())
30+
31+
32+
@broker.task
33+
async def task_does_processing(wait_time: float) -> None:
34+
await asyncio.sleep(wait_time)

0 commit comments

Comments
 (0)