feat: add anonymous run-level telemetry#155
Conversation
Emit one anonymizer_event per Anonymizer.run() / Anonymizer.preview() call, matching the schema in nemo-telemetry's anonymous_events.json. Telemetry is opt-out via NEMO_TELEMETRY_ENABLED=false, AnonymizerConfig(emit_telemetry=False), or the --no-emit-telemetry CLI flag. DD's session_ids are tagged with the anonymizer- prefix so anonymizer-driven LLM calls are filterable in dashboards. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR adds anonymous run-level telemetry to NeMo Anonymizer, emitting one
Confidence Score: 5/5Safe to merge — telemetry is fully isolated from the main pipeline, all exceptions are swallowed, and CI has emission disabled. The telemetry path is additive and wrapped in best-effort guards throughout; no change to core anonymization logic. The two flagged items are data-quality nits in the emitted payload, not pipeline correctness issues. Test coverage is thorough across opt-out paths, transformation types, and HTTP send semantics. No files require special attention for merge safety. The two suggestions are improvements to telemetry data quality, not blockers. Important Files Changed
Sequence DiagramsequenceDiagram
participant U as Caller
participant A as Anonymizer.run()/preview()
participant P as _run_internal()
participant T as _maybe_emit_telemetry()
participant TH as TelemetryHandler
participant E as Telemetry Endpoint
U->>A: run(config, data)
A->>A: read_input() to input_df, t_start
A->>P: _run_internal()
alt Success
P-->>A: AnonymizerResult
A->>A: "status = COMPLETED"
else Exception
P-->>A: raises Exception
A->>A: "status = ERROR or CANCELED"
end
A->>T: finally _maybe_emit_telemetry(status, result)
T->>T: check config.emit_telemetry and _telemetry_enabled()
alt Opt-out active
T-->>A: return no-op
else Telemetry enabled
T->>T: _build_telemetry_event tiktoken and model resolution
T->>TH: with TelemetryHandler as h h.enqueue(event)
TH->>TH: flush asyncio.run(_flush_events())
TH->>E: POST /v1.1/events/json
alt 2xx or 4xx
E-->>TH: drop no retry
else 5xx or timeout
E-->>TH: _add_to_dlq effectively dropped
end
TH-->>T: __exit__
end
T-->>A: swallows any exception
A-->>U: result or re-raises original exception
Reviews (3): Last reviewed commit: "Merge remote-tracking branch 'origin/mai..." | Re-trigger Greptile |
| from anonymizer.telemetry import ( | ||
| NOT_APPLICABLE, | ||
| AnonymizerEvent, | ||
| TaskEnum, | ||
| TaskStatusEnum, | ||
| TelemetryHandler, | ||
| avg_tokens_per_record, | ||
| classify_model_host, | ||
| collect_model_hosts, | ||
| sort_join_aliases, | ||
| ) |
There was a problem hiding this comment.
When
NEMO_TELEMETRY_ENABLED=false is set but config.emit_telemetry is still True, execution falls through the first guard and calls _build_telemetry_event, which encodes every row of input_df[COL_TEXT] through tiktoken before the event is silently discarded inside enqueue(). For large datasets this imposes O(n) tokenization overhead on every run even with the env-var opt-out active. The _telemetry_enabled() guard (already present in telemetry.py) needs to be checked here, before the event is built.
| from anonymizer.telemetry import ( | |
| NOT_APPLICABLE, | |
| AnonymizerEvent, | |
| TaskEnum, | |
| TaskStatusEnum, | |
| TelemetryHandler, | |
| avg_tokens_per_record, | |
| classify_model_host, | |
| collect_model_hosts, | |
| sort_join_aliases, | |
| ) | |
| from anonymizer.telemetry import ( | |
| NOT_APPLICABLE, | |
| AnonymizerEvent, | |
| TaskEnum, | |
| TaskStatusEnum, | |
| TelemetryHandler, | |
| _telemetry_enabled, | |
| avg_tokens_per_record, | |
| classify_model_host, | |
| collect_model_hosts, | |
| sort_join_aliases, | |
| ) |
There was a problem hiding this comment.
Good catch — fixed in a57c0c7. Added the _telemetry_enabled() short-circuit in _maybe_emit_telemetry before _build_telemetry_event so the env-var opt-out path no longer pays the tiktoken cost. Updated test_env_var_disables_emission to verify the short-circuit (no event reaches the handler at all).
| try: | ||
| if not getattr(config, "emit_telemetry", True): | ||
| return | ||
| event = self._build_telemetry_event( |
There was a problem hiding this comment.
Second half of the fix: short-circuit before
_build_telemetry_event when the env-var opt-out is active. Without this guard, tiktoken encodes all records and all step models are resolved even though the resulting event will be discarded by enqueue().
| try: | |
| if not getattr(config, "emit_telemetry", True): | |
| return | |
| event = self._build_telemetry_event( | |
| try: | |
| if not getattr(config, "emit_telemetry", True): | |
| return | |
| if not _telemetry_enabled(): | |
| return | |
| event = self._build_telemetry_event( |
| with patch("httpx.AsyncClient", side_effect=RuntimeError("boom")): | ||
| asyncio.run(handler._flush_events.__wrapped__(handler)) if False else None # noqa | ||
| # call _flush_events with an event present |
There was a problem hiding this comment.
This line is unreachable dead code — the
if False condition prevents it from ever executing. It looks like a leftover from a debugging session and should be removed to keep the test body clean.
| with patch("httpx.AsyncClient", side_effect=RuntimeError("boom")): | |
| asyncio.run(handler._flush_events.__wrapped__(handler)) if False else None # noqa | |
| # call _flush_events with an event present | |
| with patch("httpx.AsyncClient", side_effect=RuntimeError("boom")): | |
| # call _flush_events with an event present |
There was a problem hiding this comment.
Removed in a57c0c7. Leftover from when I was debugging unwrapped vs wrapped coroutines.
| failure_counts = _collect_failure_counts(failed) | ||
| hosts = _resolve_model_hosts(self._resolved_providers) | ||
|
|
||
| return AnonymizerEvent( |
There was a problem hiding this comment.
😅 Yeah — it's mostly a 30-kwarg AnonymizerEvent(...) constructor call. The actual computation is already extracted into helpers (_collect_step_models, _collect_failure_counts, _resolve_model_hosts, _transformation_type_string, etc.); the method body is just wiring those into the event. I tried sketching a split and didn't find a clean boundary that wasn't just hiding the constructor behind a wrapper. Open to suggestions if you see one.
There was a problem hiding this comment.
nah... other than a builder/factory or passing in a state object that pops things off (like models could be passed in then extract). but meh
|
|
||
| def _step_to_field(step: str) -> str: | ||
| """Map a FailedRecord.step (workflow_name) to a schema failure-count field key.""" | ||
| if step == "entity-detection": |
There was a problem hiding this comment.
Done in a57c0c7 — converted to match/case. The two regex-matched cases use case _ if <regex>.match(step): guards which works cleanly.
| "entity_validator": sort_join_aliases(det.entity_validator or []), | ||
| "entity_augmenter": det.entity_augmenter or NOT_APPLICABLE, | ||
| # latent_detector only runs in rewrite mode | ||
| "latent_detector": (det.latent_detector or NOT_APPLICABLE) if has_rewrite else NOT_APPLICABLE, |
There was a problem hiding this comment.
seems kinda weird, could make a _default_na() method for all these not applicable, but seems like there's ONE case where it's has_substitute instead of has_rewrite, so prolly not worth it
There was a problem hiding this comment.
Agreed — the has_substitute vs has_rewrite asymmetry is exactly why a generic _default_na() would obscure more than it'd save. Leaving as-is.
| return ModelHostEnum.NVIDIA_INTERNAL | ||
| if "openrouter.ai" in endpoint: | ||
| return ModelHostEnum.OPENROUTER | ||
| if any(h in endpoint for h in ("localhost", "127.0.0.1", "0.0.0.0", "[::1]")): |
There was a problem hiding this comment.
is this going character by character? or is endpoint a list?
There was a problem hiding this comment.
endpoint is a single URL string — the in checks are substring searches against it (e.g. "build.nvidia.com" in "https://build.nvidia.com/v1/chat" → True). Added a clarifying comment + pulled the local-host list to a named tuple in a57c0c7 so it reads more obviously.
| captured_events: list[AnonymizerEvent], | ||
| stub_input: AnonymizerInput, | ||
| ) -> None: | ||
| anonymizer, _, _, _ = _make_anonymizer() |
There was a problem hiding this comment.
I can't remember does
| anonymizer, _, _, _ = _make_anonymizer() | |
| anonymizer, * = _make_anonymizer() |
work too or does it complain that * is unused?
There was a problem hiding this comment.
Yep — anonymizer, *_ = _make_anonymizer() works (just _ alone would error since _make_anonymizer returns a 4-tuple; you need *_ to slurp the rest). Applied across all 13 sites in a57c0c7.
|
|
||
| ## Telemetry and Privacy | ||
|
|
||
| NeMo Anonymizer collects anonymous run-level telemetry to help prioritize product improvements. One event is sent per `Anonymizer.run()` / `Anonymizer.preview()` call, containing only technical metadata: the replacement strategy in use, models used, model hosts (e.g. `nvidia-build`, `openrouter`, `other`), input-record counts, run duration, and failure attribution by pipeline step. **No user data, record contents, prompts, or model outputs are collected.** |
There was a problem hiding this comment.
as with NSS, should we also link to/have this info in our docs?
There was a problem hiding this comment.
Done in a57c0c7 — added a "Telemetry and Privacy" section to docs/index.md mirroring NSS's pattern, and the README link now points to that anchor (#telemetry-and-privacy).
Greptile fixes: - Short-circuit on NEMO_TELEMETRY_ENABLED=false before _build_telemetry_event, so disabling telemetry no longer pays the tiktoken cost on every record - Remove dead "if False else None" line in test_client_setup_failure_routes_to_dlq - Document on _add_to_dlq that DLQ retries are unreachable in fire-and-flush usage; preserved for DD shape compatibility Matt's suggestions: - _step_to_field: if/elif chain -> match/case - classify_model_host: clarify that `host in endpoint` is substring search on a URL string, not iteration over characters - Test helpers: anonymizer, _, _, _ -> anonymizer, *_ (13 call sites) - Add Telemetry and Privacy section to docs/index.md (mirroring NSS); README links to that anchor Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| failure_counts = _collect_failure_counts(failed) | ||
| hosts = _resolve_model_hosts(self._resolved_providers) | ||
|
|
||
| return AnonymizerEvent( |
There was a problem hiding this comment.
nah... other than a builder/factory or passing in a state object that pops things off (like models could be passed in then extract). but meh
…telemetry # Conflicts: # src/anonymizer/interface/anonymizer.py
Summary
src/anonymizer/telemetry.pywithAnonymizerEvent+TelemetryHandler(DD-style fire-and-flush, NSS-style env handling)Anonymizer.run()/Anonymizer.preview()with try/except/finally so every exit path (completed/error/canceled) emits exactly one eventNEMO_SESSION_PREFIX=anonymizer-andNEMO_DEPLOYMENT_TYPE=sdk|cliviaos.environ.setdefaultso DataDesigner's existing per-LLM-call telemetry is filterable to anonymizer-driven runsNEMO_TELEMETRY_ENABLED=falseenv var,AnonymizerConfig(emit_telemetry=False),--no-emit-telemetryCLI flagEvent shape matches the
anonymizer_eventschema in nemo-telemetry (38 fields, raw integer counts,tiktoken cl100k_baseforavgTokensPerRecordmirroring DD's column-stat approach,modelHostsas a sorted-deduped list).Known limitation
FailedRecord.stepis workflow-name granularity today, soentityValidatorFailureCountandentityAugmenterFailureCountcannot be separated — both bucket intoentityDetectionFailureCount. Same for the rewrite pipeline.Coordination
This PR can land in main safely — CI has
NEMO_TELEMETRY_ENABLED=false. Alternatively, we wait until thenemo-telemetryMR lands, flip to true and then merge this.Test plan
uv run pytest— 721 tests pass (including 56 new unit + 21 new integration)make format-check,make copyright-check,make lock-checkanonymizer validate ... --no-emit-telemetryacceptedanonymizer runemits a real event against staging endpoint (post-merge, post-schema-deploy)