diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c3b2d7f8..2a3064ba 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,6 +10,10 @@ on: branches: [main] workflow_dispatch: +env: + # Telemetry is opt-out at runtime; in CI we never want to emit. + NEMO_TELEMETRY_ENABLED: "false" + jobs: test: name: Test diff --git a/README.md b/README.md index e5c453ab..1e8b9edd 100644 --- a/README.md +++ b/README.md @@ -161,6 +161,31 @@ make install-pre-commit # Install pre-commit hooks --- +## Telemetry and Privacy + +NeMo Anonymizer collects anonymous run-level telemetry to help prioritize product improvements. One event is sent per `Anonymizer.run()` / `Anonymizer.preview()` call, containing only technical metadata: the replacement strategy in use, models used, model hosts (e.g. `nvidia-build`, `openrouter`, `other`), input-record counts, run duration, and failure attribution by pipeline step. **No user data, record contents, prompts, or model outputs are collected.** See the [Telemetry and Privacy docs](https://nvidia-nemo.github.io/Anonymizer/latest/#telemetry-and-privacy) for the full field list. + +You may opt out of telemetry at any time: + +- **For one CLI invocation**: pass `--no-emit-telemetry` + ```bash + uv run anonymizer run --source data.csv --text-column text --replace redact --no-emit-telemetry + ``` +- **In the SDK**: set `emit_telemetry=False` on `AnonymizerConfig` + ```python + config = AnonymizerConfig(replace=Redact(), emit_telemetry=False) + ``` +- **For the current shell**: set the environment variable + ```bash + export NEMO_TELEMETRY_ENABLED=false + ``` + +Aggregate usage data (such as which models are most popular) will be shared back with the community. It is not used to track any individual user behavior. + +**Use of third-party endpoints, including NVIDIA Build:** Anonymizer can be configured to use various inference endpoints, including [build.nvidia.com](https://build.nvidia.com), [OpenRouter](https://openrouter.ai), or local model servers. If you choose to use a third-party endpoint, that endpoint's own terms of service and privacy practices apply independently of this library. Any opt-out you exercise within Anonymizer does not extend to data collection by your chosen endpoint. + +--- + ## License Apache License 2.0 — see [LICENSE](LICENSE) for details. diff --git a/docs/index.md b/docs/index.md index 32bdf7c6..a868c702 100644 --- a/docs/index.md +++ b/docs/index.md @@ -133,6 +133,42 @@ Access the full pipeline trace with all internal columns. ```python preview.trace_dataframe ``` +--- +## Telemetry and Privacy + +NeMo Anonymizer includes an optional function to share anonymous run-level telemetry with NVIDIA for product improvement. One event is emitted per `Anonymizer.run()` / `Anonymizer.preview()` invocation and contains only technical metadata: + +- **Run outcome** — final task status (`completed` / `error` / `canceled`) and wall-clock duration +- **Pipeline configuration** — transformation type (`annotate`, `redact`, `hash`, `substitute`, `rewrite`), whether `data_summary` / `privacy_goal` / `Substitute(instructions=...)` were customized, `max_repair_iterations`, `strict_entity_protection` +- **Models used per step** — model aliases for the detector, validator, augmenter, rewriter, etc. (whichever steps ran in this mode) +- **Model hosts** — coarse classification of the inference endpoints used (`nvidia-build`, `nvidia-internal`, `openrouter`, `local`, `other`) +- **Aggregate counts** — number of input records, success and failure counts, average tokens per record (estimated with `tiktoken cl100k_base`), and failure attribution by pipeline workflow +- **Deployment type** — `sdk` or `cli` + +**No user data, record contents, prompts, model outputs, or device information are collected.** Aggregate usage data (such as which models are most popular) will be shared back with the community; it is not used to track any individual user behavior. + +You may opt out of telemetry collection at any time. Opting out applies only to data collection by NeMo Anonymizer itself. + +To disable telemetry in the SDK, set `emit_telemetry=False` on `AnonymizerConfig`: + +```python +config = AnonymizerConfig(replace=Redact(), emit_telemetry=False) +``` + +To disable telemetry for one CLI invocation, pass `--no-emit-telemetry`: + +```bash +uv run anonymizer run --source data.csv --text-column text --replace redact --no-emit-telemetry +``` + +To disable telemetry for the current shell, set `NEMO_TELEMETRY_ENABLED=false` (other accepted disabling values: `0`, `no`) in your environment before running: + +```bash +export NEMO_TELEMETRY_ENABLED=false +``` + +**Use of third-party endpoints, including NVIDIA Build:** Anonymizer can be configured to use various inference endpoints, including [build.nvidia.com](https://build.nvidia.com), [OpenRouter](https://openrouter.ai), or local model servers. If you choose to use a third-party endpoint, that endpoint's own terms of service and privacy practices apply independently of this library. Any opt-out you exercise within Anonymizer does not extend to data collection by your chosen endpoint. + --- ## Next up diff --git a/pyproject.toml b/pyproject.toml index b723701b..debda8f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,8 @@ dependencies = [ "cyclopts>=3", "pygments>=2.20.0", "cryptography>=46.0.6", + "httpx>=0.27.0", + "tiktoken>=0.9.0", ] [project.scripts] diff --git a/src/anonymizer/config/anonymizer_config.py b/src/anonymizer/config/anonymizer_config.py index 637d579f..1f4b669e 100644 --- a/src/anonymizer/config/anonymizer_config.py +++ b/src/anonymizer/config/anonymizer_config.py @@ -176,6 +176,13 @@ class AnonymizerConfig(BaseModel): description="Replacement method (Substitute(), Redact(), Annotate(), or Hash()).", ) rewrite: Rewrite | None = Field(default=None, description="Optional rewrite-mode parameters. ") + emit_telemetry: bool = Field( + default=True, + description=( + "Whether to emit anonymous Anonymizer telemetry events. See the Telemetry section " + "in the README for what is collected and how to opt out at the environment or CLI level." + ), + ) @model_validator(mode="after") def validate_exactly_one_mode(self) -> AnonymizerConfig: diff --git a/src/anonymizer/interface/anonymizer.py b/src/anonymizer/interface/anonymizer.py index 3b0e19e9..56cc505f 100644 --- a/src/anonymizer/interface/anonymizer.py +++ b/src/anonymizer/interface/anonymizer.py @@ -4,7 +4,10 @@ from __future__ import annotations import logging +import os +import re import time +import uuid from collections import Counter from pathlib import Path from typing import TYPE_CHECKING @@ -34,7 +37,7 @@ ) from anonymizer.engine.detection.detection_workflow import EntityDetectionWorkflow from anonymizer.engine.io.reader import read_input -from anonymizer.engine.ndd.adapter import NddAdapter +from anonymizer.engine.ndd.adapter import FailedRecord, NddAdapter from anonymizer.engine.ndd.model_loader import parse_model_configs, validate_model_alias_references from anonymizer.engine.replace.llm_replace_workflow import LlmReplaceWorkflow from anonymizer.engine.replace.replace_runner import ReplacementWorkflow @@ -43,6 +46,18 @@ from anonymizer.interface.errors import InvalidConfigError from anonymizer.interface.results import AnonymizerResult, PreviewResult from anonymizer.logging import LOG_INDENT, configure_logging, reapply_log_levels +from anonymizer.telemetry import ( + NOT_APPLICABLE, + AnonymizerEvent, + TaskEnum, + TaskStatusEnum, + TelemetryHandler, + _telemetry_enabled, + avg_tokens_per_record, + classify_model_host, + collect_model_hosts, + sort_join_aliases, +) if TYPE_CHECKING: import pandas as pd @@ -89,10 +104,15 @@ def __init__( rewrite_runner: Custom rewrite workflow (advanced/testing). """ _initialize_logging() + # Tag DataDesigner telemetry events so they're filterable as anonymizer traffic in + # the shared NeMo dashboards. `setdefault` so users (or upstream hosts) can override. + os.environ.setdefault("NEMO_SESSION_PREFIX", "anonymizer-") + os.environ.setdefault("NEMO_DEPLOYMENT_TYPE", "sdk") resolved_artifact_path = Path(artifact_path or ".anonymizer-artifacts") parsed = parse_model_configs(model_configs) self._model_configs = parsed.model_configs self._selected_models = parsed.selected_models + self._resolved_providers: list[ModelProvider] | None = _resolve_model_providers(model_providers) logger.info("🔧 Anonymizer initialized with %d model configs", len(self._model_configs)) det = self._selected_models.detection logger.info(LOG_INDENT + "🔎 detector: %s", det.entity_detector) @@ -104,7 +124,7 @@ def __init__( else: self._data_designer = DataDesigner( artifact_path=resolved_artifact_path, - model_providers=_resolve_model_providers(model_providers), + model_providers=self._resolved_providers, ) reapply_log_levels() self._adapter = NddAdapter(data_designer=self._data_designer) @@ -128,7 +148,29 @@ def run( """ self._validate_preflight_config(config) context = read_input(data) - return self._run_internal(config=config, data=data, context=context, preview_num_records=None) + input_df = context.dataframe + t_start = time.perf_counter() + status = TaskStatusEnum.COMPLETED + result: AnonymizerResult | None = None + try: + result = self._run_internal(config=config, data=data, context=context, preview_num_records=None) + return result + except KeyboardInterrupt: + status = TaskStatusEnum.CANCELED + raise + except Exception: + status = TaskStatusEnum.ERROR + raise + finally: + self._maybe_emit_telemetry( + task=TaskEnum.BATCH, + status=status, + config=config, + data=data, + input_df=input_df, + result=result, + duration_sec=time.perf_counter() - t_start, + ) def preview( self, @@ -146,14 +188,35 @@ def preview( """ self._validate_preflight_config(config) context = read_input(data, nrows=num_records) - result = self._run_internal(config=config, data=data, context=context, preview_num_records=num_records) - return PreviewResult( - dataframe=result.dataframe, - trace_dataframe=result.trace_dataframe, - resolved_text_column=result.resolved_text_column, - failed_records=result.failed_records, - preview_num_records=num_records, - ) + input_df = context.dataframe + t_start = time.perf_counter() + status = TaskStatusEnum.COMPLETED + result: AnonymizerResult | None = None + try: + result = self._run_internal(config=config, data=data, context=context, preview_num_records=num_records) + return PreviewResult( + dataframe=result.dataframe, + trace_dataframe=result.trace_dataframe, + resolved_text_column=result.resolved_text_column, + failed_records=result.failed_records, + preview_num_records=num_records, + ) + except KeyboardInterrupt: + status = TaskStatusEnum.CANCELED + raise + except Exception: + status = TaskStatusEnum.ERROR + raise + finally: + self._maybe_emit_telemetry( + task=TaskEnum.PREVIEW, + status=status, + config=config, + data=data, + input_df=input_df, + result=result, + duration_sec=time.perf_counter() - t_start, + ) def validate_config(self, config: AnonymizerConfig) -> None: """Validate that the active workflow config is compatible with model selections.""" @@ -309,6 +372,123 @@ def _validate_preflight_config(self, config: AnonymizerConfig) -> None: except ValueError as exc: raise InvalidConfigError(str(exc)) from exc + # ------------------------------------------------------------------ telemetry + + def _maybe_emit_telemetry( + self, + *, + task: TaskEnum, + status: TaskStatusEnum, + config: AnonymizerConfig, + data: AnonymizerInput, + input_df: pd.DataFrame, + result: AnonymizerResult | None, + duration_sec: float, + ) -> None: + """Build and fire-and-flush a single telemetry event. + + Telemetry is best-effort: any exception here is swallowed so that a + broken telemetry path can never disrupt the anonymization pipeline. + """ + try: + if not getattr(config, "emit_telemetry", True): + return + # Short-circuit before _build_telemetry_event so we don't pay the + # tiktoken cost on every record when telemetry is globally disabled + # via NEMO_TELEMETRY_ENABLED=false. + if not _telemetry_enabled(): + return + event = self._build_telemetry_event( + task=task, + status=status, + config=config, + data=data, + input_df=input_df, + result=result, + duration_sec=duration_sec, + ) + from anonymizer import __version__ as _anonymizer_version + + with TelemetryHandler( + source_client_version=_anonymizer_version, + session_id=uuid.uuid4().hex, + ) as handler: + handler.enqueue(event) + except Exception: # noqa: BLE001 - best-effort + logger.debug("Failed to emit telemetry event", exc_info=True) + + def _build_telemetry_event( + self, + *, + task: TaskEnum, + status: TaskStatusEnum, + config: AnonymizerConfig, + data: AnonymizerInput, + input_df: pd.DataFrame, + result: AnonymizerResult | None, + duration_sec: float, + ) -> AnonymizerEvent: + """Construct an AnonymizerEvent from the current pipeline state.""" + total_records = int(len(input_df)) + failed = list(result.failed_records) if result is not None else [] + failure_count = len(failed) + success_count = max(total_records - failure_count, 0) + + avg_tokens = -1 + if total_records > 0 and COL_TEXT in input_df.columns: + avg_tokens = avg_tokens_per_record(input_df[COL_TEXT].astype(str)) + + transformation_type = _transformation_type_string(config) + rewrite = config.rewrite + substitute = config.replace if isinstance(config.replace, Substitute) else None + + models = _collect_step_models( + selected=self._selected_models, + has_substitute=substitute is not None, + has_rewrite=rewrite is not None, + ) + failure_counts = _collect_failure_counts(failed) + hosts = _resolve_model_hosts(self._resolved_providers) + + return AnonymizerEvent( + task=task, + task_status=status, + job_duration_sec=duration_sec, + num_input_records=total_records, + num_success_records=success_count, + num_failure_records=failure_count, + avg_tokens_per_record=avg_tokens, + transformation_type=transformation_type, + custom_data_summary_provided=bool(data.data_summary), + custom_privacy_goal_provided=_custom_privacy_goal_provided(rewrite), + custom_substitute_instructions_provided=bool(substitute is not None and substitute.instructions), + max_repair_iterations=(rewrite.max_repair_iterations if rewrite is not None else -1), + strict_entity_protection=(rewrite.strict_entity_protection if rewrite is not None else False), + repair_iterations_triggered=_repair_iterations_triggered(failed, rewrite is not None), + entity_detector_model=models["entity_detector"], + entity_validator_model=models["entity_validator"], + entity_augmenter_model=models["entity_augmenter"], + latent_detector_model=models["latent_detector"], + replacement_generator_model=models["replacement_generator"], + domain_classifier_model=models["domain_classifier"], + disposition_analyzer_model=models["disposition_analyzer"], + meaning_extractor_model=models["meaning_extractor"], + qa_generator_model=models["qa_generator"], + rewriter_model=models["rewriter"], + evaluator_model=models["evaluator"], + repairer_model=models["repairer"], + judge_model=models["judge"], + model_hosts=hosts, + entity_detection_failure_count=failure_counts["entity_detection"], + latent_detection_failure_count=failure_counts["latent_detection"], + replace_map_generation_failure_count=failure_counts["replace_map_generation"], + rewrite_pipeline_failure_count=failure_counts["rewrite_pipeline"], + rewrite_evaluate_failure_count=failure_counts["rewrite_evaluate"], + rewrite_repair_failure_count=failure_counts["rewrite_repair"], + rewrite_final_judge_failure_count=failure_counts["rewrite_final_judge"], + unknown_step_failure_count=failure_counts["unknown"], + ) + def _unwrap_entities(raw: object) -> list: if isinstance(raw, dict): @@ -418,3 +598,137 @@ def _build_user_dataframe(trace_dataframe: pd.DataFrame, *, resolved_text_column } return t[[col for col in t.columns if col in allowed]].copy() + + +# ----------------------------------------------------------------- telemetry helpers + + +_REWRITE_REPAIR_RE = re.compile(r"^rewrite-repair-(\d+)$") +_REWRITE_EVALUATE_RE = re.compile(r"^rewrite-evaluate-(\d+)$") + + +def _transformation_type_string(config: AnonymizerConfig) -> str: + """Map AnonymizerConfig to the schema's transformationType value. + + Schema accepts exactly one of: ``annotate``, ``redact``, ``hash``, + ``substitute``, ``rewrite``. AnonymizerConfig's validator enforces exactly one + of replace/rewrite, so one of these branches always fires. + """ + if config.rewrite is not None: + return "rewrite" + # The four ReplaceMethodBase subclasses (Annotate, Redact, Hash, Substitute) + # lowercase directly to their schema values. + return type(config.replace).__name__.lower() + + +def _custom_privacy_goal_provided(rewrite: object | None) -> bool: + """Detect whether the user supplied a non-default privacy_goal. + + ``Rewrite.populate_default_privacy_goal`` always populates a default if the + user passed None, so we treat the default protect/preserve text as "not custom". + """ + if rewrite is None or rewrite.privacy_goal is None: # type: ignore[union-attr] + return False + from anonymizer.config.rewrite import DEFAULT_PRESERVE_TEXT, DEFAULT_PROTECT_TEXT + + goal = rewrite.privacy_goal # type: ignore[union-attr] + return goal.protect != DEFAULT_PROTECT_TEXT or goal.preserve != DEFAULT_PRESERVE_TEXT + + +def _collect_step_models( + *, + selected, # ModelSelection + has_substitute: bool, + has_rewrite: bool, +) -> dict[str, str]: + """Project the user's model selection into the schema's step-keyed shape.""" + det = selected.detection + rewrite = selected.rewrite + replace = selected.replace + return { + "entity_detector": det.entity_detector or NOT_APPLICABLE, + "entity_validator": sort_join_aliases(det.entity_validator or []), + "entity_augmenter": det.entity_augmenter or NOT_APPLICABLE, + # latent_detector only runs in rewrite mode + "latent_detector": (det.latent_detector or NOT_APPLICABLE) if has_rewrite else NOT_APPLICABLE, + # replacement_generator only runs in Substitute mode + "replacement_generator": replace.replacement_generator if has_substitute else NOT_APPLICABLE, + # All rewrite-only roles + "domain_classifier": rewrite.domain_classifier if has_rewrite else NOT_APPLICABLE, + "disposition_analyzer": rewrite.disposition_analyzer if has_rewrite else NOT_APPLICABLE, + "meaning_extractor": rewrite.meaning_extractor if has_rewrite else NOT_APPLICABLE, + "qa_generator": rewrite.qa_generator if has_rewrite else NOT_APPLICABLE, + "rewriter": rewrite.rewriter if has_rewrite else NOT_APPLICABLE, + "evaluator": rewrite.evaluator if has_rewrite else NOT_APPLICABLE, + "repairer": rewrite.repairer if has_rewrite else NOT_APPLICABLE, + "judge": rewrite.judge if has_rewrite else NOT_APPLICABLE, + } + + +def _step_to_field(step: str) -> str: + """Map a FailedRecord.step (workflow_name) to a schema failure-count field key.""" + match step: + case "entity-detection": + return "entity_detection" + case "latent-entity-detection": + return "latent_detection" + case "replace-map-generation": + return "replace_map_generation" + case "rewrite-pipeline": + return "rewrite_pipeline" + case "rewrite-final-judge": + return "rewrite_final_judge" + case _ if _REWRITE_EVALUATE_RE.match(step): + return "rewrite_evaluate" + case _ if _REWRITE_REPAIR_RE.match(step): + return "rewrite_repair" + case _: + return "unknown" + + +def _collect_failure_counts(failed: list[FailedRecord]) -> dict[str, int]: + """Aggregate FailedRecord.step values into per-workflow failure counts.""" + counts = { + "entity_detection": 0, + "latent_detection": 0, + "replace_map_generation": 0, + "rewrite_pipeline": 0, + "rewrite_evaluate": 0, + "rewrite_repair": 0, + "rewrite_final_judge": 0, + "unknown": 0, + } + for fr in failed: + counts[_step_to_field(fr.step)] += 1 + return counts + + +def _repair_iterations_triggered(failed: list[FailedRecord], is_rewrite: bool) -> int: + """Count distinct repair iterations observed in FailedRecord step names. + + Falls back to -1 when the run wasn't a rewrite. Returns 0 when rewrite ran + but no failures surfaced from repair iterations — note that this undercounts + repair iterations that completed without producing FailedRecord entries. A + follow-up could plumb a richer signal up from the rewrite workflow. + """ + if not is_rewrite: + return -1 + iterations: set[int] = set() + for fr in failed: + m = _REWRITE_REPAIR_RE.match(fr.step) + if m: + iterations.add(int(m.group(1))) + return len(iterations) + + +def _resolve_model_hosts(providers: list[ModelProvider] | None) -> list[str]: + """Sorted, deduplicated list of provider host classifications. + + Returns ``["nvidia-build"]`` when no custom providers are configured — + anonymizer's defaults route through build.nvidia.com. + """ + if not providers: + from anonymizer.telemetry import ModelHostEnum as _MH + + return [_MH.NVIDIA_BUILD.value] + return collect_model_hosts([classify_model_host(p) for p in providers]) diff --git a/src/anonymizer/interface/cli/main.py b/src/anonymizer/interface/cli/main.py index 01f35596..d6490d2f 100644 --- a/src/anonymizer/interface/cli/main.py +++ b/src/anonymizer/interface/cli/main.py @@ -5,11 +5,17 @@ import functools import logging +import os import sys from dataclasses import dataclass, field from pathlib import Path from typing import Annotated, ClassVar, Literal +# When invoked via the CLI entry point, telemetry deployment type defaults to "cli". +# Anonymizer's SDK path sets "sdk" via os.environ.setdefault in Anonymizer.__init__, +# but the CLI is loaded first so its setdefault wins for CLI-driven runs. +os.environ.setdefault("NEMO_DEPLOYMENT_TYPE", "cli") + logger = logging.getLogger("anonymizer.cli") import cyclopts @@ -110,6 +116,18 @@ class CliOpts: # -- shared between substitute (replace) and rewrite -- instructions: Annotated[str | None, cyclopts.Parameter(help="Extra instructions for the LLM.")] = None + # -- telemetry -- + emit_telemetry: Annotated[ + bool, + cyclopts.Parameter( + help=( + "Whether to emit anonymous Anonymizer telemetry events. " + "Use --no-emit-telemetry to opt out for this invocation. " + "Also overridable globally via NEMO_TELEMETRY_ENABLED=false." + ) + ), + ] = True + _REPLACE_ONLY_FLAGS: ClassVar[tuple[str, ...]] = ("format_template",) _REWRITE_ONLY_FLAGS: ClassVar[tuple[str, ...]] = ("protect", "preserve") @@ -186,10 +204,10 @@ def _build_config_and_anonymizer(opts: CliOpts) -> tuple[AnonymizerConfig, Anony """Build the shared AnonymizerConfig and Anonymizer from CLI args.""" if opts.rewrite: strategy = _build_rewrite_config(opts) - config = AnonymizerConfig(rewrite=strategy, detect=opts.detect) + config = AnonymizerConfig(rewrite=strategy, detect=opts.detect, emit_telemetry=opts.emit_telemetry) elif opts.replace: strategy = _build_replace_strategy(opts) - config = AnonymizerConfig(replace=strategy, detect=opts.detect) + config = AnonymizerConfig(replace=strategy, detect=opts.detect, emit_telemetry=opts.emit_telemetry) else: raise InvalidConfigError("Specify --replace or --rewrite.") diff --git a/src/anonymizer/telemetry.py b/src/anonymizer/telemetry.py new file mode 100644 index 00000000..31d15a1e --- /dev/null +++ b/src/anonymizer/telemetry.py @@ -0,0 +1,398 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Anonymous telemetry handler for NeMo Anonymizer. + +Emits one ``anonymizer_event`` per ``Anonymizer.run()`` / ``Anonymizer.preview()`` +invocation. Telemetry is opt-out via: + +- ``NEMO_TELEMETRY_ENABLED=false`` environment variable +- ``AnonymizerConfig(emit_telemetry=False)`` +- ``--no-emit-telemetry`` CLI flag + +Related environment variables (read at runtime, not import time): + +- ``NEMO_TELEMETRY_ENABLED``: set to ``false`` / ``0`` / ``no`` to disable. +- ``NEMO_DEPLOYMENT_TYPE``: ``cli``, ``sdk``, ``nmp``. Defaults to ``sdk``. +- ``NEMO_TELEMETRY_ENDPOINT``: override the destination URL. +- ``NEMO_SESSION_PREFIX``: prepended to session IDs. Set to ``"anonymizer-"`` + automatically by ``Anonymizer.__init__`` for dashboard filtering. +""" + +from __future__ import annotations + +import asyncio +import os +import platform +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import Enum +from typing import TYPE_CHECKING, Any, ClassVar + +from pydantic import BaseModel, Field + +if TYPE_CHECKING: + import httpx + from data_designer.config.models import ModelProvider + +CLIENT_ID = "184482118588404" +NEMO_TELEMETRY_VERSION = "nemo-telemetry/1.0" +DEFAULT_ENDPOINT = "https://events.telemetry.data.nvidia.com/v1.1/events/json" +MAX_RETRIES = 3 +CPU_ARCHITECTURE = platform.uname().machine + + +class NemoSourceEnum(str, Enum): + ANONYMIZER = "anonymizer" + UNDEFINED = "undefined" + + +class TaskEnum(str, Enum): + BATCH = "batch" + PREVIEW = "preview" + + +class TaskStatusEnum(str, Enum): + COMPLETED = "completed" + ERROR = "error" + CANCELED = "canceled" + + +class DeploymentTypeEnum(str, Enum): + CLI = "cli" + SDK = "sdk" + NMP = "nmp" + UNDEFINED = "undefined" + + +class ModelHostEnum(str, Enum): + NVIDIA_BUILD = "nvidia-build" + NVIDIA_INTERNAL = "nvidia-internal" + OPENROUTER = "openrouter" + LOCAL = "local" + OTHER = "other" + + +NOT_APPLICABLE = "not_applicable" + + +def _telemetry_enabled() -> bool: + return os.getenv("NEMO_TELEMETRY_ENABLED", "true").lower() in ("1", "true", "yes") + + +def _telemetry_endpoint() -> str: + return os.getenv("NEMO_TELEMETRY_ENDPOINT", DEFAULT_ENDPOINT) + + +def _deployment_type() -> DeploymentTypeEnum: + raw = os.getenv("NEMO_DEPLOYMENT_TYPE", "sdk").lower() + try: + return DeploymentTypeEnum(raw) + except ValueError: + return DeploymentTypeEnum.UNDEFINED + + +def _session_prefix() -> str | None: + return os.getenv("NEMO_SESSION_PREFIX") + + +_tokenizer = None # lazily cached cl100k_base encoder + + +def _get_tokenizer(): + """Lazily initialize and cache the tiktoken encoder. + + Mirrors DataDesigner's approach (cl100k_base — the GPT-3.5/4 family encoder). + The exact LLM anonymizer hits may tokenize differently; this is a consistent + cross-run estimate, not an exact count. + """ + global _tokenizer + if _tokenizer is None: + import tiktoken + + _tokenizer = tiktoken.get_encoding("cl100k_base") + return _tokenizer + + +def avg_tokens_per_record(texts) -> int: + """Mean tiktoken count across the input texts. + + Returns -1 on empty input or tokenizer failure. Telemetry is best-effort. + Accepts any iterable of strings (list, pd.Series). + """ + try: + tokenizer = _get_tokenizer() + counts = [len(tokenizer.encode(str(t), disallowed_special=())) for t in texts] + if not counts: + return -1 + return int(sum(counts) / len(counts)) + except Exception: + return -1 + + +def classify_model_host(provider: ModelProvider | None) -> ModelHostEnum: + """Classify a ModelProvider's endpoint URL into one of the ModelHostEnum values. + + Substring-matches known host fragments against the (lower-cased) endpoint URL. + """ + if provider is None: + return ModelHostEnum.OTHER + # ``endpoint`` is a single URL string; the ``in`` checks below are substring + # searches against that string (not iteration over characters). + endpoint = (getattr(provider, "endpoint", "") or "").lower() + if "build.nvidia.com" in endpoint or "integrate.api.nvidia.com" in endpoint: + return ModelHostEnum.NVIDIA_BUILD + if "inference-api.nvidia.com" in endpoint: + return ModelHostEnum.NVIDIA_INTERNAL + if "openrouter.ai" in endpoint: + return ModelHostEnum.OPENROUTER + local_hosts = ("localhost", "127.0.0.1", "0.0.0.0", "[::1]") + if any(host in endpoint for host in local_hosts): + return ModelHostEnum.LOCAL + return ModelHostEnum.OTHER + + +def collect_model_hosts(hosts: list[ModelHostEnum]) -> list[str]: + """Sort + dedupe per-provider host classifications into the wire-format list. + + Returns ``["other"]`` when no hosts were observed — never an empty list, so the + telemetry payload always carries at least one host string. + """ + unique = sorted({h.value for h in hosts if h is not None}) + return unique or [ModelHostEnum.OTHER.value] + + +def sort_join_aliases(aliases: list[str]) -> str: + """Canonical pool serialization: sorted ascending, comma-joined, no spaces.""" + cleaned = [a.strip() for a in aliases if a and a.strip()] + if not cleaned: + return NOT_APPLICABLE + return ",".join(sorted(cleaned)) + + +class AnonymizerEvent(BaseModel): + """Pydantic model for the anonymizer_event payload. + + Field aliases match the camelCase schema in + ``aire/microservices/nemo-telemetry`` ``schemas/anonymous_events.json``. + """ + + _event_name: ClassVar[str] = "anonymizer_event" + # Matches the schemaMeta.schemaVersion of nemo-telemetry's anonymous_events.json. + _schema_version: ClassVar[str] = "1.7" + + # Identity + nemo_source: NemoSourceEnum = Field(default=NemoSourceEnum.ANONYMIZER, alias="nemoSource") + task: TaskEnum + task_status: TaskStatusEnum = Field(alias="taskStatus") + deployment_type: DeploymentTypeEnum = Field(default_factory=_deployment_type, alias="deploymentType") + + # Timing + job_duration_sec: float = Field(default=-1.0, alias="jobDurationSec") + + # Record counts (raw integers, legal-cleared) + num_input_records: int = Field(default=-1, alias="numInputRecords") + num_success_records: int = Field(default=-1, alias="numSuccessRecords") + num_failure_records: int = Field(default=-1, alias="numFailureRecords") + avg_tokens_per_record: int = Field(default=-1, alias="avgTokensPerRecord") + + # Configuration + transformation_type: str = Field(alias="transformationType") + custom_data_summary_provided: bool = Field(default=False, alias="customDataSummaryProvided") + custom_privacy_goal_provided: bool = Field(default=False, alias="customPrivacyGoalProvided") + custom_substitute_instructions_provided: bool = Field(default=False, alias="customSubstituteInstructionsProvided") + max_repair_iterations: int = Field(default=-1, alias="maxRepairIterations") + strict_entity_protection: bool = Field(default=False, alias="strictEntityProtection") + repair_iterations_triggered: int = Field(default=-1, alias="repairIterationsTriggered") + + # Models per step. The first three always run regardless of strategy, so they have + # no default; the rest fall back to ``NOT_APPLICABLE`` when their step doesn't run. + entity_detector_model: str = Field(alias="entityDetectorModel") + entity_validator_model: str = Field(alias="entityValidatorModel") + entity_augmenter_model: str = Field(alias="entityAugmenterModel") + latent_detector_model: str = Field(default=NOT_APPLICABLE, alias="latentDetectorModel") + replacement_generator_model: str = Field(default=NOT_APPLICABLE, alias="replacementGeneratorModel") + domain_classifier_model: str = Field(default=NOT_APPLICABLE, alias="domainClassifierModel") + disposition_analyzer_model: str = Field(default=NOT_APPLICABLE, alias="dispositionAnalyzerModel") + meaning_extractor_model: str = Field(default=NOT_APPLICABLE, alias="meaningExtractorModel") + qa_generator_model: str = Field(default=NOT_APPLICABLE, alias="qaGeneratorModel") + rewriter_model: str = Field(default=NOT_APPLICABLE, alias="rewriterModel") + evaluator_model: str = Field(default=NOT_APPLICABLE, alias="evaluatorModel") + repairer_model: str = Field(default=NOT_APPLICABLE, alias="repairerModel") + judge_model: str = Field(default=NOT_APPLICABLE, alias="judgeModel") + model_hosts: list[str] = Field(default_factory=list, alias="modelHosts") + + # Failure attribution (workflow_name granularity) + entity_detection_failure_count: int = Field(default=0, alias="entityDetectionFailureCount") + latent_detection_failure_count: int = Field(default=0, alias="latentDetectionFailureCount") + replace_map_generation_failure_count: int = Field(default=0, alias="replaceMapGenerationFailureCount") + rewrite_pipeline_failure_count: int = Field(default=0, alias="rewritePipelineFailureCount") + rewrite_evaluate_failure_count: int = Field(default=0, alias="rewriteEvaluateFailureCount") + rewrite_repair_failure_count: int = Field(default=0, alias="rewriteRepairFailureCount") + rewrite_final_judge_failure_count: int = Field(default=0, alias="rewriteFinalJudgeFailureCount") + unknown_step_failure_count: int = Field(default=0, alias="unknownStepFailureCount") + + model_config = {"populate_by_name": True} + + +@dataclass +class QueuedEvent: + event: AnonymizerEvent + timestamp: datetime + retry_count: int = 0 + + +def _get_iso_timestamp(dt: datetime | None = None) -> str: + if dt is None: + dt = datetime.now(timezone.utc) + return dt.strftime("%Y-%m-%dT%H:%M:%S.") + f"{dt.microsecond // 1000:03d}Z" + + +def build_payload( + events: list[QueuedEvent], *, source_client_version: str, session_id: str = "undefined" +) -> dict[str, Any]: + if not events: + raise ValueError("build_payload requires at least one event") + return { + "browserType": "undefined", # do not change + "clientId": CLIENT_ID, + "clientType": "Native", # do not change + "clientVariant": "Release", # do not change + "clientVer": source_client_version, + "cpuArchitecture": CPU_ARCHITECTURE, + "deviceGdprBehOptIn": "None", # do not change + "deviceGdprFuncOptIn": "None", # do not change + "deviceGdprTechOptIn": "None", # do not change + "deviceId": "undefined", # do not change + "deviceMake": "undefined", # do not change + "deviceModel": "undefined", # do not change + "deviceOS": "undefined", # do not change + "deviceOSVersion": "undefined", # do not change + "deviceType": "undefined", # do not change + "eventProtocol": "1.6", # do not change + "eventSchemaVer": events[0].event._schema_version, + "eventSysVer": NEMO_TELEMETRY_VERSION, + "externalUserId": "undefined", # do not change + "gdprBehOptIn": "None", # do not change + "gdprFuncOptIn": "None", # do not change + "gdprTechOptIn": "None", # do not change + "idpId": "undefined", # do not change + "integrationId": "undefined", # do not change + "productName": "undefined", # do not change + "productVersion": "undefined", # do not change + "sentTs": _get_iso_timestamp(), + "sessionId": session_id, + "userId": "undefined", # do not change + "events": [ + { + "ts": _get_iso_timestamp(q.timestamp), + "parameters": q.event.model_dump(by_alias=True, mode="json"), + "name": q.event._event_name, + } + for q in events + ], + } + + +class TelemetryHandler: + """Fire-and-flush telemetry handler for Anonymizer. + + Anonymizer runs are short, so we skip DD's background-daemon-thread mode + entirely. Usage: + + with TelemetryHandler(source_client_version=__version__, session_id=...) as h: + h.enqueue(event) + # on __exit__, queued events are flushed synchronously + + All errors are swallowed; telemetry must never disrupt the pipeline. + """ + + def __init__( + self, + *, + source_client_version: str = "undefined", + session_id: str = "undefined", + max_queue_size: int = 50, + max_retries: int = MAX_RETRIES, + ): + self._max_queue_size = max_queue_size + self._max_retries = max_retries + self._events: list[QueuedEvent] = [] + self._dlq: list[QueuedEvent] = [] + self._source_client_version = source_client_version + prefix = _session_prefix() + self._session_id = f"{prefix}{session_id}" if prefix else session_id + + def enqueue(self, event: AnonymizerEvent) -> None: + if not _telemetry_enabled(): + return + if not isinstance(event, AnonymizerEvent): + return + self._events.append(QueuedEvent(event=event, timestamp=datetime.now(timezone.utc))) + + def flush(self) -> None: + if not (self._events or self._dlq): + return + try: + asyncio.run(self._flush_events()) + except Exception: + pass # best-effort + + def __enter__(self) -> TelemetryHandler: + return self + + def __exit__(self, *_: object) -> None: + self.flush() + + async def _flush_events(self) -> None: + dlq, self._dlq = self._dlq, [] + new, self._events = self._events, [] + events = dlq + new + if not events: + return + try: + import httpx + + async with httpx.AsyncClient() as client: + await self._send_events_with_client(client, events) + except Exception: + self._add_to_dlq(events) + + async def _send_events_with_client(self, client: httpx.AsyncClient, events: list[QueuedEvent]) -> None: + if not events: + return + payload = build_payload( + events, + source_client_version=self._source_client_version, + session_id=self._session_id, + ) + try: + response = await client.post(_telemetry_endpoint(), json=payload) + if response.status_code in (400, 422) or response.is_success: + return + if response.status_code == 413: + if len(events) == 1: + return + mid = len(events) // 2 + await self._send_events_with_client(client, events[:mid]) + await self._send_events_with_client(client, events[mid:]) + return + if response.status_code == 408 or response.status_code >= 500: + self._add_to_dlq(events) + except Exception: + self._add_to_dlq(events) + + def _add_to_dlq(self, events: list[QueuedEvent]) -> None: + """Bookkeeping for events that hit a retryable failure. + + Note: in anonymizer's fire-and-flush usage (``with TelemetryHandler(...) as h``), + ``flush()`` runs exactly once per handler lifetime, so DLQ entries are not + actually retried — they're effectively dropped. The structure is preserved to + match DataDesigner's handler shape and to support a future long-lived / + timer-driven usage pattern without restructuring. Telemetry is best-effort. + """ + for q in events: + q.retry_count += 1 + if q.retry_count > self._max_retries: + continue + self._dlq.append(q) diff --git a/tests/conftest.py b/tests/conftest.py index 5c522b91..879cc98a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,6 +31,21 @@ def _caplog_for_anonymizer(caplog: pytest.LogCaptureFixture) -> Generator[None]: anon_logger.removeHandler(caplog.handler) +@pytest.fixture(autouse=True) +def _isolate_telemetry_env(monkeypatch: pytest.MonkeyPatch) -> None: + """Keep telemetry quiet and deterministic in unit tests. + + - Disable emission by default. Tests that exercise the emit path can opt in + by setting NEMO_TELEMETRY_ENABLED=true via their own monkeypatch. + - Clear NEMO_DEPLOYMENT_TYPE, NEMO_SESSION_PREFIX, and NEMO_TELEMETRY_ENDPOINT + so tests don't inherit values from the developer's shell. + """ + monkeypatch.setenv("NEMO_TELEMETRY_ENABLED", "false") + monkeypatch.delenv("NEMO_DEPLOYMENT_TYPE", raising=False) + monkeypatch.delenv("NEMO_SESSION_PREFIX", raising=False) + monkeypatch.delenv("NEMO_TELEMETRY_ENDPOINT", raising=False) + + @pytest.fixture def stub_detector_model_configs() -> list[ModelConfig]: """Model configs with the GLiNER PII detector alias.""" diff --git a/tests/interface/test_anonymizer_telemetry.py b/tests/interface/test_anonymizer_telemetry.py new file mode 100644 index 00000000..9e4c2b05 --- /dev/null +++ b/tests/interface/test_anonymizer_telemetry.py @@ -0,0 +1,407 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Integration tests for telemetry emission from Anonymizer.run() / preview().""" + +from __future__ import annotations + +import os +from pathlib import Path +from unittest.mock import Mock + +import pandas as pd +import pytest + +from anonymizer.config.anonymizer_config import AnonymizerConfig, AnonymizerInput, Rewrite +from anonymizer.config.replace_strategies import Annotate, Hash, Redact, Substitute +from anonymizer.engine.constants import COL_FINAL_ENTITIES, COL_REPLACED_TEXT, COL_REWRITTEN_TEXT, COL_TEXT +from anonymizer.engine.detection.detection_workflow import EntityDetectionResult, EntityDetectionWorkflow +from anonymizer.engine.ndd.adapter import FailedRecord +from anonymizer.engine.replace.replace_runner import ReplacementResult, ReplacementWorkflow +from anonymizer.engine.rewrite.rewrite_workflow import RewriteResult, RewriteWorkflow +from anonymizer.interface.anonymizer import Anonymizer +from anonymizer.telemetry import ( + NOT_APPLICABLE, + AnonymizerEvent, + TaskEnum, + TaskStatusEnum, +) + + +@pytest.fixture +def stub_input(tmp_path: Path) -> AnonymizerInput: + csv_path = tmp_path / "input.csv" + pd.DataFrame({"text": ["Alice works at Acme"]}).to_csv(csv_path, index=False) + return AnonymizerInput(source=str(csv_path)) + + +def _make_anonymizer( + detection_return: EntityDetectionResult | None = None, + replace_return: ReplacementResult | None = None, + rewrite_return: RewriteResult | None = None, +) -> tuple[Anonymizer, Mock, Mock, Mock]: + detection_workflow = Mock(spec=EntityDetectionWorkflow) + detection_workflow.run.return_value = detection_return or EntityDetectionResult( + dataframe=pd.DataFrame({COL_TEXT: ["Alice works at Acme"], COL_FINAL_ENTITIES: [{"entities": []}]}), + failed_records=[], + ) + _replace_df = pd.DataFrame( + {COL_TEXT: ["Alice works at Acme"], COL_REPLACED_TEXT: ["[REDACTED] works at [REDACTED]"]} + ) + _replace_df.attrs["original_text_column"] = "text" + replace_runner = Mock(spec=ReplacementWorkflow) + replace_runner.run.return_value = replace_return or ReplacementResult( + dataframe=_replace_df, + failed_records=[], + ) + _rewrite_df = pd.DataFrame( + { + COL_TEXT: ["Alice works at Acme"], + COL_REWRITTEN_TEXT: ["Beth works at Globex"], + "utility_score": [0.85], + "leakage_mass": [0.3], + "weighted_leakage_rate": [0.23], + "any_high_leaked": [False], + "needs_human_review": [False], + } + ) + _rewrite_df.attrs["original_text_column"] = "text" + rewrite_runner = Mock(spec=RewriteWorkflow) + rewrite_runner.run.return_value = rewrite_return or RewriteResult( + dataframe=_rewrite_df, + failed_records=[], + ) + anonymizer = Anonymizer( + detection_workflow=detection_workflow, + replace_runner=replace_runner, + rewrite_runner=rewrite_runner, + ) + return anonymizer, detection_workflow, replace_runner, rewrite_runner + + +class _FakeHandler: + """Drop-in replacement for TelemetryHandler that records what was enqueued.""" + + enqueued: list[AnonymizerEvent] = [] + + def __init__(self, *_, **__): + pass + + def __enter__(self): + return self + + def __exit__(self, *_): + return None + + def enqueue(self, event: AnonymizerEvent) -> None: + _FakeHandler.enqueued.append(event) + + def flush(self) -> None: # pragma: no cover - context manager handles it + pass + + +@pytest.fixture +def captured_events(monkeypatch: pytest.MonkeyPatch) -> list[AnonymizerEvent]: + """Patch the TelemetryHandler used inside Anonymizer to capture events. + + Also re-enables NEMO_TELEMETRY_ENABLED for this test (the autouse fixture + in conftest sets it to "false"). + """ + monkeypatch.setenv("NEMO_TELEMETRY_ENABLED", "true") + _FakeHandler.enqueued = [] + monkeypatch.setattr("anonymizer.interface.anonymizer.TelemetryHandler", _FakeHandler) + return _FakeHandler.enqueued + + +# ============================================================================= +# Init-time side effects +# ============================================================================= + + +class TestInitSideEffects: + def test_session_prefix_is_set(self) -> None: + # The autouse fixture clears NEMO_SESSION_PREFIX before each test, so + # we observe Anonymizer.__init__ setting it. + _make_anonymizer() + assert os.environ.get("NEMO_SESSION_PREFIX") == "anonymizer-" + + def test_session_prefix_respects_existing_value(self, monkeypatch: pytest.MonkeyPatch) -> None: + """setdefault should not override a user-provided prefix.""" + monkeypatch.setenv("NEMO_SESSION_PREFIX", "custom-") + _make_anonymizer() + assert os.environ["NEMO_SESSION_PREFIX"] == "custom-" + + def test_deployment_type_defaults_to_sdk(self) -> None: + _make_anonymizer() + assert os.environ.get("NEMO_DEPLOYMENT_TYPE") == "sdk" + + +# ============================================================================= +# Emission on the success / error / cancel paths +# ============================================================================= + + +class TestRunEmitsTelemetry: + def test_run_emits_completed_event( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + anonymizer, *_ = _make_anonymizer() + anonymizer.run(config=AnonymizerConfig(replace=Redact()), data=stub_input) + + assert len(captured_events) == 1 + event = captured_events[0] + assert event.task == TaskEnum.BATCH + assert event.task_status == TaskStatusEnum.COMPLETED + assert event.transformation_type == "redact" + assert event.job_duration_sec >= 0 + + def test_run_emits_error_event_and_reraises( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + anonymizer, detection_wf, _, _ = _make_anonymizer() + detection_wf.run.side_effect = RuntimeError("kaboom") + + with pytest.raises(RuntimeError, match="kaboom"): + anonymizer.run(config=AnonymizerConfig(replace=Redact()), data=stub_input) + + assert len(captured_events) == 1 + assert captured_events[0].task_status == TaskStatusEnum.ERROR + + def test_run_emits_canceled_event_on_keyboard_interrupt( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + anonymizer, detection_wf, _, _ = _make_anonymizer() + detection_wf.run.side_effect = KeyboardInterrupt() + + with pytest.raises(KeyboardInterrupt): + anonymizer.run(config=AnonymizerConfig(replace=Redact()), data=stub_input) + + assert len(captured_events) == 1 + assert captured_events[0].task_status == TaskStatusEnum.CANCELED + + +class TestPreviewEmitsTelemetry: + def test_preview_emits_task_preview( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + anonymizer, *_ = _make_anonymizer() + anonymizer.preview(config=AnonymizerConfig(replace=Redact()), data=stub_input, num_records=5) + + assert len(captured_events) == 1 + assert captured_events[0].task == TaskEnum.PREVIEW + assert captured_events[0].task_status == TaskStatusEnum.COMPLETED + + +# ============================================================================= +# Opt-out +# ============================================================================= + + +class TestOptOut: + def test_config_emit_telemetry_false_skips_emission( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + anonymizer, *_ = _make_anonymizer() + anonymizer.run( + config=AnonymizerConfig(replace=Redact(), emit_telemetry=False), + data=stub_input, + ) + assert captured_events == [] + + def test_env_var_disables_emission( + self, + monkeypatch: pytest.MonkeyPatch, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + """``NEMO_TELEMETRY_ENABLED=false`` short-circuits BEFORE event construction. + + The path must not pay the tiktoken cost or build the event when the env-var + opt-out is active — so the FakeHandler should never see anything enqueued. + """ + # captured_events fixture set this to "true"; flip back. + monkeypatch.setenv("NEMO_TELEMETRY_ENABLED", "false") + anonymizer, *_ = _make_anonymizer() + anonymizer.run(config=AnonymizerConfig(replace=Redact()), data=stub_input) + assert captured_events == [] + + +# ============================================================================= +# Field population +# ============================================================================= + + +class TestFieldPopulation: + def test_substitute_populates_replacement_generator( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + anonymizer, *_ = _make_anonymizer() + anonymizer.run(config=AnonymizerConfig(replace=Substitute()), data=stub_input) + + event = captured_events[0] + assert event.transformation_type == "substitute" + assert event.replacement_generator_model != NOT_APPLICABLE + # Rewrite-only fields stay not_applicable + assert event.rewriter_model == NOT_APPLICABLE + assert event.judge_model == NOT_APPLICABLE + assert event.max_repair_iterations == -1 + + def test_rewrite_populates_rewrite_models( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + anonymizer, *_ = _make_anonymizer() + anonymizer.run( + config=AnonymizerConfig(rewrite=Rewrite(max_repair_iterations=2, strict_entity_protection=True)), + data=stub_input, + ) + + event = captured_events[0] + assert event.transformation_type == "rewrite" + assert event.rewriter_model != NOT_APPLICABLE + assert event.judge_model != NOT_APPLICABLE + assert event.repairer_model != NOT_APPLICABLE + assert event.max_repair_iterations == 2 + assert event.strict_entity_protection is True + # Substitute-only field stays not_applicable + assert event.replacement_generator_model == NOT_APPLICABLE + + def test_custom_data_summary_detected( + self, + captured_events: list[AnonymizerEvent], + tmp_path: Path, + ) -> None: + csv_path = tmp_path / "input.csv" + pd.DataFrame({"text": ["Alice"]}).to_csv(csv_path, index=False) + data = AnonymizerInput(source=str(csv_path), data_summary="medical records about clinical trials") + + anonymizer, *_ = _make_anonymizer() + anonymizer.run(config=AnonymizerConfig(replace=Redact()), data=data) + + assert captured_events[0].custom_data_summary_provided is True + + @pytest.mark.parametrize( + "strategy_factory,expected_value", + [ + (Redact, "redact"), + (Hash, "hash"), + (Substitute, "substitute"), + (Annotate, "annotate"), + ], + ) + def test_transformation_type_matches_schema_enum( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + strategy_factory, + expected_value: str, + ) -> None: + """Every replace strategy must map to one of the schema's enum values.""" + anonymizer, *_ = _make_anonymizer() + anonymizer.run(config=AnonymizerConfig(replace=strategy_factory()), data=stub_input) + + assert captured_events[0].transformation_type == expected_value + + def test_default_model_hosts_is_nvidia_build( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + """When no providers are configured, hosts list contains only 'nvidia-build'.""" + anonymizer, *_ = _make_anonymizer() + anonymizer.run(config=AnonymizerConfig(replace=Redact()), data=stub_input) + + assert captured_events[0].model_hosts == ["nvidia-build"] + + +# ============================================================================= +# Failure-count aggregation +# ============================================================================= + + +class TestFailureAggregation: + def test_failure_counts_grouped_by_workflow_name( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + detection_failures = [ + FailedRecord(record_id="r1", step="entity-detection", reason="x"), + FailedRecord(record_id="r2", step="entity-detection", reason="y"), + ] + detection_return = EntityDetectionResult( + dataframe=pd.DataFrame({COL_TEXT: ["a"], COL_FINAL_ENTITIES: [{"entities": []}]}), + failed_records=detection_failures, + ) + anonymizer, *_ = _make_anonymizer(detection_return=detection_return) + anonymizer.run(config=AnonymizerConfig(replace=Redact()), data=stub_input) + + event = captured_events[0] + assert event.entity_detection_failure_count == 2 + assert event.num_failure_records == 2 + + def test_repair_iteration_suffixes_aggregate( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + """rewrite-repair-1 and rewrite-repair-2 both aggregate into rewrite_repair_failure_count.""" + rewrite_failures = [ + FailedRecord(record_id="r1", step="rewrite-repair-1", reason="x"), + FailedRecord(record_id="r2", step="rewrite-repair-2", reason="y"), + FailedRecord(record_id="r3", step="rewrite-repair-2", reason="z"), + ] + _df = pd.DataFrame({COL_TEXT: ["a"], COL_REWRITTEN_TEXT: ["b"]}) + _df.attrs["original_text_column"] = "text" + rewrite_return = RewriteResult(dataframe=_df, failed_records=rewrite_failures) + + anonymizer, *_ = _make_anonymizer(rewrite_return=rewrite_return) + anonymizer.run(config=AnonymizerConfig(rewrite=Rewrite()), data=stub_input) + + event = captured_events[0] + assert event.rewrite_repair_failure_count == 3 + # Distinct iteration numbers seen + assert event.repair_iterations_triggered == 2 + + def test_unknown_step_lands_in_catch_all( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + unknown_failures = [FailedRecord(record_id="r1", step="some-future-step", reason="x")] + detection_return = EntityDetectionResult( + dataframe=pd.DataFrame({COL_TEXT: ["a"], COL_FINAL_ENTITIES: [{"entities": []}]}), + failed_records=unknown_failures, + ) + anonymizer, *_ = _make_anonymizer(detection_return=detection_return) + anonymizer.run(config=AnonymizerConfig(replace=Redact()), data=stub_input) + + event = captured_events[0] + assert event.unknown_step_failure_count == 1 + assert event.entity_detection_failure_count == 0 + + def test_no_failures_counts_zero( + self, + captured_events: list[AnonymizerEvent], + stub_input: AnonymizerInput, + ) -> None: + anonymizer, *_ = _make_anonymizer() + anonymizer.run(config=AnonymizerConfig(replace=Redact()), data=stub_input) + + event = captured_events[0] + assert event.num_failure_records == 0 + assert event.entity_detection_failure_count == 0 + assert event.unknown_step_failure_count == 0 diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 00000000..a4bb9f36 --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,436 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import asyncio +import json +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from data_designer.config.models import ModelProvider + +from anonymizer.telemetry import ( + DEFAULT_ENDPOINT, + NOT_APPLICABLE, + AnonymizerEvent, + DeploymentTypeEnum, + ModelHostEnum, + NemoSourceEnum, + QueuedEvent, + TaskEnum, + TaskStatusEnum, + TelemetryHandler, + _deployment_type, + _telemetry_enabled, + _telemetry_endpoint, + avg_tokens_per_record, + build_payload, + classify_model_host, + collect_model_hosts, + sort_join_aliases, +) + +# ============================================================================= +# avg_tokens_per_record +# ============================================================================= + + +class TestAvgTokensPerRecord: + def test_empty_input_returns_negative_one(self) -> None: + assert avg_tokens_per_record([]) == -1 + + def test_single_string_returns_token_count(self) -> None: + # tiktoken cl100k_base: "hello world" tokenizes to 2 tokens + result = avg_tokens_per_record(["hello world"]) + assert result >= 1 + + def test_averages_across_records(self) -> None: + short = "hi" + long_ = "the quick brown fox jumps over the lazy dog" + avg = avg_tokens_per_record([short, long_]) + only_short = avg_tokens_per_record([short]) + only_long = avg_tokens_per_record([long_]) + assert only_short <= avg <= only_long + + def test_pandas_series_input(self) -> None: + import pandas as pd + + s = pd.Series(["alpha", "beta gamma delta"]) + result = avg_tokens_per_record(s) + assert result >= 1 + + +# ============================================================================= +# classify_model_host / collect_model_hosts +# ============================================================================= + + +def _provider(endpoint: str) -> ModelProvider: + return ModelProvider(name="test", endpoint=endpoint) + + +class TestClassifyModelHost: + @pytest.mark.parametrize( + "endpoint,expected", + [ + ("https://build.nvidia.com/v1/chat/completions", ModelHostEnum.NVIDIA_BUILD), + ("https://integrate.api.nvidia.com/v1", ModelHostEnum.NVIDIA_BUILD), + ("https://Build.NVIDIA.com/v1", ModelHostEnum.NVIDIA_BUILD), + ("https://inference-api.nvidia.com/v1", ModelHostEnum.NVIDIA_INTERNAL), + ("https://openrouter.ai/api/v1", ModelHostEnum.OPENROUTER), + ("http://localhost:8000/v1", ModelHostEnum.LOCAL), + ("http://127.0.0.1:11434/v1", ModelHostEnum.LOCAL), + ("http://0.0.0.0/v1", ModelHostEnum.LOCAL), + ("https://gliner-qaqtckhiy.brevlab.com/v1", ModelHostEnum.OTHER), + ("https://some-random-host.example/v1", ModelHostEnum.OTHER), + ], + ) + def test_known_hosts(self, endpoint: str, expected: ModelHostEnum) -> None: + assert classify_model_host(_provider(endpoint)) == expected + + def test_none_provider(self) -> None: + assert classify_model_host(None) == ModelHostEnum.OTHER + + +class TestCollectModelHosts: + def test_empty_returns_other(self) -> None: + assert collect_model_hosts([]) == ["other"] + + def test_single_unique_host(self) -> None: + assert collect_model_hosts([ModelHostEnum.NVIDIA_BUILD]) == ["nvidia-build"] + + def test_duplicates_collapse(self) -> None: + hosts = [ModelHostEnum.NVIDIA_BUILD, ModelHostEnum.NVIDIA_BUILD] + assert collect_model_hosts(hosts) == ["nvidia-build"] + + def test_mixed_hosts_sorted(self) -> None: + # Sorted alphabetically for canonical wire format + hosts = [ModelHostEnum.OPENROUTER, ModelHostEnum.NVIDIA_BUILD] + assert collect_model_hosts(hosts) == ["nvidia-build", "openrouter"] + + +# ============================================================================= +# sort_join_aliases +# ============================================================================= + + +class TestSortJoinAliases: + def test_single_alias_is_unwrapped_string(self) -> None: + assert sort_join_aliases(["alias-a"]) == "alias-a" + + def test_multiple_aliases_are_sorted_and_joined(self) -> None: + # Pool member set is canonicalized — independent of input order. + assert sort_join_aliases(["alias-b", "alias-a"]) == "alias-a,alias-b" + assert sort_join_aliases(["alias-a", "alias-b"]) == "alias-a,alias-b" + + def test_empty_list_is_not_applicable(self) -> None: + assert sort_join_aliases([]) == NOT_APPLICABLE + + def test_strips_whitespace(self) -> None: + assert sort_join_aliases([" alias-a ", "alias-b"]) == "alias-a,alias-b" + + def test_filters_empty_strings(self) -> None: + assert sort_join_aliases(["alias-a", " ", ""]) == "alias-a" + + +# ============================================================================= +# Env helpers +# ============================================================================= + + +class TestEnvHelpers: + def test_telemetry_enabled_default(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("NEMO_TELEMETRY_ENABLED", raising=False) + assert _telemetry_enabled() is True + + @pytest.mark.parametrize("value", ["false", "0", "no", "FALSE"]) + def test_telemetry_enabled_disable_values(self, monkeypatch: pytest.MonkeyPatch, value: str) -> None: + monkeypatch.setenv("NEMO_TELEMETRY_ENABLED", value) + assert _telemetry_enabled() is False + + def test_telemetry_endpoint_default(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("NEMO_TELEMETRY_ENDPOINT", raising=False) + assert _telemetry_endpoint() == DEFAULT_ENDPOINT + + def test_telemetry_endpoint_override_preserves_case(self, monkeypatch: pytest.MonkeyPatch) -> None: + custom = "https://Events.example.COM/v1/Events?Token=ABC" + monkeypatch.setenv("NEMO_TELEMETRY_ENDPOINT", custom) + assert _telemetry_endpoint() == custom + + def test_deployment_type_default_is_sdk(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("NEMO_DEPLOYMENT_TYPE", raising=False) + assert _deployment_type() == DeploymentTypeEnum.SDK + + def test_deployment_type_invalid_value_falls_back(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("NEMO_DEPLOYMENT_TYPE", "definitely-not-real") + # Must not raise — telemetry must never block runtime on a misconfigured env var. + assert _deployment_type() == DeploymentTypeEnum.UNDEFINED + + +# ============================================================================= +# AnonymizerEvent +# ============================================================================= + + +def _minimal_event(**overrides) -> AnonymizerEvent: + """Construct an event with the minimum required fields, allowing overrides.""" + defaults = dict( + task=TaskEnum.BATCH, + task_status=TaskStatusEnum.COMPLETED, + transformation_type="redact", + entity_detector_model="gliner-pii", + entity_validator_model="some-validator", + entity_augmenter_model="some-augmenter", + ) + defaults.update(overrides) + return AnonymizerEvent(**defaults) + + +class TestAnonymizerEvent: + def test_minimal_event_populates_required(self) -> None: + event = _minimal_event() + assert event.nemo_source == NemoSourceEnum.ANONYMIZER + assert event.deployment_type == DeploymentTypeEnum.SDK + assert event.job_duration_sec == -1.0 + assert event.transformation_type == "redact" + # model_hosts defaults to empty list when not provided + assert event.model_hosts == [] + # Counts default to -1 sentinels + assert event.num_input_records == -1 + assert event.num_success_records == -1 + assert event.num_failure_records == -1 + assert event.avg_tokens_per_record == -1 + # Rewrite-only model fields default to not_applicable + assert event.rewriter_model == NOT_APPLICABLE + + def test_transformation_type_is_required(self) -> None: + """transformationType has no default — every run has one.""" + import pydantic + + with pytest.raises(pydantic.ValidationError): + AnonymizerEvent( + task=TaskEnum.BATCH, + task_status=TaskStatusEnum.COMPLETED, + entity_detector_model="x", + entity_validator_model="x", + entity_augmenter_model="x", + ) + + def test_detector_validator_augmenter_required(self) -> None: + """The always-run detection roles have no defaults.""" + import pydantic + + with pytest.raises(pydantic.ValidationError): + AnonymizerEvent( + task=TaskEnum.BATCH, + task_status=TaskStatusEnum.COMPLETED, + transformation_type="redact", + ) + + def test_model_dump_uses_camelcase_aliases(self) -> None: + event = _minimal_event( + task=TaskEnum.PREVIEW, + task_status=TaskStatusEnum.ERROR, + max_repair_iterations=3, + strict_entity_protection=True, + num_input_records=42, + avg_tokens_per_record=128, + model_hosts=["nvidia-build", "openrouter"], + ) + dumped = event.model_dump(by_alias=True) + assert dumped["nemoSource"] == "anonymizer" + assert dumped["task"] == "preview" + assert dumped["taskStatus"] == "error" + assert dumped["maxRepairIterations"] == 3 + assert dumped["strictEntityProtection"] is True + assert dumped["numInputRecords"] == 42 + assert dumped["avgTokensPerRecord"] == 128 + assert dumped["modelHosts"] == ["nvidia-build", "openrouter"] + # dominantFailureStep is gone — must not appear + assert "dominantFailureStep" not in dumped + + def test_payload_is_json_serializable(self) -> None: + """Regression: enum-valued fields must encode as their string values.""" + event = _minimal_event() + dumped = event.model_dump(by_alias=True, mode="json") + json.dumps(dumped) # should not raise + + def test_all_task_statuses(self) -> None: + for status in TaskStatusEnum: + event = _minimal_event(task_status=status) + assert event.task_status == status + + +# ============================================================================= +# build_payload +# ============================================================================= + + +class TestBuildPayload: + def _make_queued(self, *, task: TaskEnum = TaskEnum.BATCH) -> QueuedEvent: + event = _minimal_event(task=task) + return QueuedEvent(event=event, timestamp=datetime(2026, 5, 11, 12, 0, 0, tzinfo=timezone.utc)) + + def test_envelope_shape(self) -> None: + payload = build_payload([self._make_queued()], source_client_version="1.2.3", session_id="anonymizer-abc") + assert payload["clientId"] == "184482118588404" + assert payload["clientVer"] == "1.2.3" + assert payload["sessionId"] == "anonymizer-abc" + assert payload["eventSchemaVer"] == "1.7" + assert len(payload["events"]) == 1 + assert payload["events"][0]["name"] == "anonymizer_event" + assert payload["events"][0]["ts"] == "2026-05-11T12:00:00.000Z" + + def test_event_parameters_are_camelcase_strings(self) -> None: + payload = build_payload([self._make_queued()], source_client_version="1.0") + params = payload["events"][0]["parameters"] + assert params["nemoSource"] == "anonymizer" + assert params["task"] == "batch" + assert params["taskStatus"] == "completed" + assert params["deploymentType"] == "sdk" + + def test_payload_is_json_dumpable(self) -> None: + payload = build_payload([self._make_queued()], source_client_version="1.0") + json.dumps(payload) # round-trip-safe + + def test_empty_events_raises(self) -> None: + with pytest.raises(ValueError): + build_payload([], source_client_version="1.0") + + +# ============================================================================= +# TelemetryHandler — opt-out and queue behavior +# ============================================================================= + + +class TestTelemetryHandlerOptOut: + def test_enqueue_noop_when_disabled(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("NEMO_TELEMETRY_ENABLED", "false") + handler = TelemetryHandler() + handler.enqueue(_minimal_event()) + assert handler._events == [] + + def test_enqueue_noop_for_non_event(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("NEMO_TELEMETRY_ENABLED", "true") + handler = TelemetryHandler() + handler.enqueue("not an event") # type: ignore[arg-type] + assert handler._events == [] + + def test_enqueue_when_enabled(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("NEMO_TELEMETRY_ENABLED", "true") + handler = TelemetryHandler() + event = _minimal_event() + handler.enqueue(event) + assert len(handler._events) == 1 + assert handler._events[0].event is event + + +class TestSessionPrefix: + def test_no_prefix_when_env_unset(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("NEMO_SESSION_PREFIX", raising=False) + handler = TelemetryHandler(session_id="abc") + assert handler._session_id == "abc" + + def test_prefix_applied_when_env_set(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("NEMO_SESSION_PREFIX", "anonymizer-") + handler = TelemetryHandler(session_id="abc") + assert handler._session_id == "anonymizer-abc" + + +# ============================================================================= +# TelemetryHandler — send semantics (retry / DLQ / split) +# ============================================================================= + + +class TestSendSemantics: + def _make(self) -> tuple[TelemetryHandler, QueuedEvent]: + handler = TelemetryHandler(source_client_version="1.0", session_id="s1") + event = _minimal_event() + return handler, QueuedEvent(event=event, timestamp=datetime.now(timezone.utc)) + + def test_successful_send_does_not_dlq(self) -> None: + handler, q = self._make() + mock_resp = MagicMock(status_code=200, is_success=True) + mock_client = AsyncMock() + mock_client.post.return_value = mock_resp + + asyncio.run(handler._send_events_with_client(mock_client, [q])) + mock_client.post.assert_awaited_once() + assert handler._dlq == [] + + def test_500_adds_to_dlq(self) -> None: + handler, q = self._make() + mock_resp = MagicMock(status_code=500, is_success=False) + mock_client = AsyncMock() + mock_client.post.return_value = mock_resp + + asyncio.run(handler._send_events_with_client(mock_client, [q])) + assert len(handler._dlq) == 1 + assert handler._dlq[0].retry_count == 1 + + def test_400_does_not_dlq(self) -> None: + """4xx (other than 413) means bad payload — no retry.""" + handler, q = self._make() + mock_resp = MagicMock(status_code=400, is_success=False) + mock_client = AsyncMock() + mock_client.post.return_value = mock_resp + + asyncio.run(handler._send_events_with_client(mock_client, [q])) + assert handler._dlq == [] + + def test_413_splits_and_retries(self) -> None: + handler = TelemetryHandler(source_client_version="1.0", session_id="s1") + events = [QueuedEvent(event=_minimal_event(), timestamp=datetime.now(timezone.utc)) for _ in range(2)] + too_large = MagicMock(status_code=413, is_success=False) + success = MagicMock(status_code=200, is_success=True) + mock_client = AsyncMock() + mock_client.post.side_effect = [too_large, success, success] + + asyncio.run(handler._send_events_with_client(mock_client, events)) + assert mock_client.post.await_count == 3 # 1 original + 2 splits + + def test_exceeds_max_retries_drops(self) -> None: + handler, q = self._make() + q.retry_count = handler._max_retries # already at the cap + mock_resp = MagicMock(status_code=500, is_success=False) + mock_client = AsyncMock() + mock_client.post.return_value = mock_resp + + asyncio.run(handler._send_events_with_client(mock_client, [q])) + assert handler._dlq == [] # event was dropped, not re-queued + + def test_client_setup_failure_routes_to_dlq(self) -> None: + """If httpx.AsyncClient construction fails, events must land in DLQ rather than vanish.""" + handler, q = self._make() + handler._events.append(q) + with patch("httpx.AsyncClient", side_effect=RuntimeError("boom")): + asyncio.run(handler._flush_events()) + assert len(handler._dlq) == 1 + + +# ============================================================================= +# TelemetryHandler — context manager +# ============================================================================= + + +class TestContextManager: + def test_context_manager_flushes_on_exit(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("NEMO_TELEMETRY_ENABLED", "true") + sent: list[int] = [] + + async def fake_send(events): + sent.append(len(events)) + + with patch.object(TelemetryHandler, "_send_events_with_client", new=AsyncMock(side_effect=fake_send)): + with TelemetryHandler(source_client_version="1.0") as handler: + handler.enqueue(_minimal_event()) + # On exit, flush was called; with httpx mocked at the client level it would still + # exercise the path, so verify the queue drained. + assert handler._events == [] + + def test_context_manager_with_no_events_is_noop(self) -> None: + with TelemetryHandler(source_client_version="1.0") as handler: + pass + assert handler._events == [] diff --git a/uv.lock b/uv.lock index 193663c3..9e0432c4 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11" resolution-markers = [ "python_full_version >= '3.14'", @@ -2417,8 +2417,10 @@ dependencies = [ { name = "cryptography" }, { name = "cyclopts" }, { name = "data-designer" }, + { name = "httpx" }, { name = "pydantic" }, { name = "pygments" }, + { name = "tiktoken" }, ] [package.dev-dependencies] @@ -2452,8 +2454,10 @@ requires-dist = [ { name = "cryptography", specifier = ">=46.0.6" }, { name = "cyclopts", specifier = ">=3" }, { name = "data-designer", specifier = "==0.5.7" }, + { name = "httpx", specifier = ">=0.27.0" }, { name = "pydantic", specifier = ">=2.9,<3" }, { name = "pygments", specifier = ">=2.20.0" }, + { name = "tiktoken", specifier = ">=0.9.0" }, ] [package.metadata.requires-dev]