Skip to content

Commit 81bb278

Browse files
committed
Implement a Redis mode
Signed-off-by: Stefano Rivera <stefano@rivera.za.net>
1 parent 482656c commit 81bb278

6 files changed

Lines changed: 620 additions & 4 deletions

File tree

prometheus_client/metrics.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ def remove(self, *labelvalues: Any) -> None:
207207
warnings.warn(
208208
"Removal of labels has not been implemented in multi-process mode yet.",
209209
UserWarning)
210+
if 'PROMETHEUS_REDIS_URL' in os.environ:
211+
warnings.warn(
212+
"Removal of labels has not been implemented in redis mode yet.",
213+
UserWarning)
210214

211215
if not self._labelnames:
212216
raise ValueError('No label names were set when constructing %s' % self)
@@ -226,6 +230,10 @@ def remove_by_labels(self, labels: dict[str, str]) -> None:
226230
"Removal of labels has not been implemented in multi-process mode yet.",
227231
UserWarning
228232
)
233+
if 'PROMETHEUS_REDIS_URL' in os.environ:
234+
warnings.warn(
235+
"Removal of labels has not been implemented in redis mode yet.",
236+
UserWarning)
229237

230238
if not self._labelnames:
231239
raise ValueError('No label names were set when constructing %s' % self)
@@ -258,6 +266,10 @@ def clear(self) -> None:
258266
warnings.warn(
259267
"Clearing labels has not been implemented in multi-process mode yet",
260268
UserWarning)
269+
if 'PROMETHEUS_REDIS_URL' in os.environ:
270+
warnings.warn(
271+
"Clearing of labels has not been implemented in redis mode yet.",
272+
UserWarning)
261273
with self._lock:
262274
self._metrics = {}
263275

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
from collections.abc import Iterable
2+
import json
3+
import os
4+
from urllib.parse import urlsplit
5+
6+
from .metrics_core import Metric
7+
from .registry import Collector, CollectorRegistry
8+
from .samples import Sample
9+
10+
fake_redis_pool = {}
11+
12+
13+
def redis_client():
14+
"""
15+
Create a redis client for PROMETHEUS_REDIS_URL.
16+
17+
Configure the redis database via a URL in PROMETHEUS_REDIS_URL of the form
18+
redis://localhost:6379/0
19+
"""
20+
from redis import Redis
21+
22+
parsed_url = urlsplit(os.environ["PROMETHEUS_REDIS_URL"])
23+
assert parsed_url.path.startswith("/")
24+
assert parsed_url.path[1:].isdigit()
25+
port = parsed_url.port or 6379
26+
db = int(parsed_url.path[1:])
27+
28+
if parsed_url.scheme == "fakeredis":
29+
from fakeredis import FakeRedis
30+
31+
if db not in fake_redis_pool:
32+
fake_redis_pool[db] = FakeRedis()
33+
return fake_redis_pool[db]
34+
35+
assert parsed_url.scheme == "redis"
36+
return Redis(host=parsed_url.hostname, port=port, db=db)
37+
38+
39+
class RedisCollector(Collector):
40+
"""Collector for redis mode."""
41+
42+
def __init__(self, registry: CollectorRegistry | None) -> None:
43+
self._client = redis_client()
44+
if registry:
45+
registry.register(self)
46+
47+
def _iter_values(self) -> Iterable[tuple[bytes, str]]:
48+
cursor = 0
49+
while True:
50+
cursor, keys = self._client.scan(cursor=cursor, match="value:*")
51+
values = self._client.mget(keys)
52+
yield from zip(keys, values)
53+
if cursor == 0:
54+
break
55+
56+
def collect(self) -> Iterable[Metric]:
57+
metrics: dict[str, Metric] = {}
58+
histograms: set[str] = set()
59+
60+
for key, value_s in self._iter_values():
61+
# FIXME: Catch ValueError here, just in case?
62+
prefix_b, typ_b, mmap_key = key.split(b":", 2)
63+
assert prefix_b == b"value"
64+
typ = typ_b.decode()
65+
value = float(value_s)
66+
67+
metric_name, name, labels, help_text = json.loads(mmap_key)
68+
69+
metric = metrics.get(metric_name)
70+
if metric is None:
71+
metric = Metric(metric_name, help_text, typ)
72+
metrics[metric_name] = metric
73+
if typ in ("histogram", "gaugehistogram"):
74+
histograms.add(metric_name)
75+
76+
metric.add_sample(name, labels, value)
77+
78+
for name in histograms:
79+
self._fix_histogram(metrics[name])
80+
81+
return metrics.values()
82+
83+
def _fix_histogram(self, metric: Metric) -> None:
84+
"""
85+
Fix-up histogram samples.
86+
87+
Sort the buckets as expected by a client, and accumulate the values.
88+
The Histogram class is optimized to only increment the bucket that a
89+
value first appears in, not larger ones that would also contain it.
90+
"""
91+
by_label: dict[tuple[tuple[str, ...], str], list[Sample]] = {}
92+
93+
# Organize into lists of samples by label
94+
for sample in metric.samples:
95+
if "le" in sample.labels:
96+
labels_without_le = sample.labels.copy()
97+
labels_without_le.pop("le")
98+
key = (tuple(labels_without_le.values()), sample.name)
99+
else:
100+
key = (tuple(sample.labels.values()), sample.name)
101+
by_label.setdefault(key, []).append(sample)
102+
103+
metric.samples = []
104+
105+
for (labels, name), samples in sorted(by_label.items()):
106+
if name.endswith("_bucket"):
107+
# Sort buckets within each label
108+
samples.sort(key=lambda sample: float(sample.labels["le"]))
109+
110+
# Accumulate values into larger buckets
111+
value = 0.0
112+
for sample in samples:
113+
value += sample.value
114+
metric.samples.append(Sample(sample.name, sample.labels, value))
115+
116+
labels_without_le = sample.labels.copy()
117+
labels_without_le.pop("le")
118+
metric.samples.append(
119+
Sample(f"{metric.name}_count", labels_without_le, value)
120+
)
121+
122+
else:
123+
metric.samples.extend(samples)

prometheus_client/values.py

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,48 @@
11
import os
22
from threading import Lock
3+
from typing import Any, Protocol
34
import warnings
45

56
from .mmap_dict import mmap_key, MmapedDict
7+
from .redis_collector import redis_client
8+
from .samples import Exemplar
69

710

8-
class MutexValue:
11+
class Value(Protocol):
12+
"""Prometheus Client Metric implementation."""
13+
14+
_multiprocess: bool
15+
16+
def __init__(
17+
self,
18+
typ: str,
19+
metric_name: str,
20+
name: str,
21+
labelnames: list[str],
22+
labelvalues: list[str],
23+
help_text: str,
24+
**kwargs: Any,
25+
) -> None:
26+
"""Initialize a metric."""
27+
28+
def inc(self, amount: float) -> None:
29+
"""Increment the metric by amount."""
30+
31+
def set(self, value: float, timestamp: float | None = None) -> None:
32+
"""Set the metric to value."""
33+
34+
def get(self) -> float:
35+
"""Get the current metric value."""
36+
37+
def set_exemplar(self, exemplar: Exemplar) -> None:
38+
"""Set an exemplar value."""
39+
exemplar # For vulture
40+
41+
def get_exemplar(self) -> Exemplar | None:
42+
"""Get any set exemplar value."""
43+
44+
45+
class MutexValue(Value):
946
"""A float protected by a mutex."""
1047

1148
_multiprocess = False
@@ -52,7 +89,7 @@ def MultiProcessValue(process_identifier=os.getpid):
5289
# This avoids the need to also have mutexes in __MmapDict.
5390
lock = Lock()
5491

55-
class MmapedValue:
92+
class MmapedValue(Value):
5693
"""A float protected by a mutex backed by a per-process mmaped file."""
5794

5895
_multiprocess = True
@@ -125,12 +162,60 @@ def get_exemplar(self):
125162
return MmapedValue
126163

127164

128-
def get_value_class():
165+
class RedisValue(Value):
166+
"""
167+
A value implementation that stores data in a redis/valkey database.
168+
169+
Key scheme:
170+
* value:typ:MMAP_KEY
171+
"""
172+
173+
_multiprocess = False
174+
175+
def __init__(
176+
self,
177+
typ: str,
178+
metric_name: str,
179+
name: str,
180+
labelnames: list[str],
181+
labelvalues: list[str],
182+
help_text: str,
183+
**kwargs: Any,
184+
) -> None:
185+
key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
186+
self._key = f"value:{typ}:{key}"
187+
redis_client().setnx(self._key, 0.0)
188+
189+
def inc(self, amount: float) -> None:
190+
redis_client().incrbyfloat(self._key, amount)
191+
192+
def set(self, value: float, timestamp: float | None = None) -> None:
193+
# TODO: Implement timestamps
194+
redis_client().set(self._key, value)
195+
196+
def get(self) -> float:
197+
value = redis_client().get(self._key)
198+
if value is None:
199+
return 0.0
200+
return float(value)
201+
202+
def set_exemplar(self, exemplar: Exemplar) -> None:
203+
# TODO: Implement exemplars for redis.
204+
raise NotImplementedError("Exemplars are not implemented for Redis.")
205+
206+
def get_exemplar(self) -> Exemplar | None:
207+
# TODO: Implement exemplars for redis.
208+
return None
209+
210+
211+
def get_value_class() -> type[Value]:
129212
# Should we enable multi-process mode?
130213
# This needs to be chosen before the first metric is constructed,
131214
# and as that may be in some arbitrary library the user/admin has
132215
# no control over we use an environment variable.
133-
if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
216+
if "PROMETHEUS_REDIS_URL" in os.environ:
217+
return RedisValue
218+
elif 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
134219
return MultiProcessValue()
135220
else:
136221
return MutexValue

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ aiohttp = [
5050
django = [
5151
"django",
5252
]
53+
redis = [
54+
"redis",
55+
]
5356

5457
[project.urls]
5558
Homepage = "https://github.com/prometheus/client_python"

0 commit comments

Comments
 (0)