From 59bff4cae8e1aecf812b5794a298c394937e323f Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Tue, 27 Jan 2026 22:24:15 +0000 Subject: [PATCH 1/7] Add keyword_tagger link for automatic tagging Co-Authored-By: Claude Opus 4.5 --- server/links/keyword_tagger/__init__.py | 211 ++++++++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 server/links/keyword_tagger/__init__.py diff --git a/server/links/keyword_tagger/__init__.py b/server/links/keyword_tagger/__init__.py new file mode 100644 index 0000000..a7e8ccf --- /dev/null +++ b/server/links/keyword_tagger/__init__.py @@ -0,0 +1,211 @@ +"""Keyword Tagger Link + +This link searches vCon transcriptions for specific keywords and phrases, +then adds corresponding tags based on matches. + +Categories: +- Compliance & Regulatory (do_not_call, take_me_off, stop_calling, fcc, etc.) +- Voicemail Service Detection (youmail variants) +- Recording/Transcription mentions +- Greeting Patterns +- Legal/Professional +- Other Content + +Configuration options: + categories: List of category names to enable (default: all) + custom_keywords: Dict of additional tag -> keywords mappings + case_sensitive: Whether to match case-sensitively (default: false) +""" + +import re +from typing import Any, Dict, List, Optional, Set + +from server.lib.vcon_redis import VconRedis +from lib.logging_utils import init_logger + +logger = init_logger(__name__) + +# Keyword definitions organized by category +# Format: tag_name -> list of keywords/phrases to search for +KEYWORD_RULES = { + # Compliance & Regulatory + "compliance": { + "do_not_call": ["do not call", "don't call", "dont call", "do-not-call"], + "take_me_off": ["take me off", "take us off", "remove me", "remove us"], + "stop_calling": ["stop calling", "quit calling", "stop call"], + "mentions_fcc": ["fcc", "federal communications"], + "mentions_report": ["report you", "report this", "reporting", "file a complaint"], + "mentions_enforcement": ["enforc", "attorney general", "lawsuit", "sue you", "legal action"], + "mentions_scam": ["scam", "scammer", "scamming", "fraudulent", "fraud"], + "mentions_spam": ["spam", "spammer", "spamming", "junk call"], + "mentions_robocall": ["robo", "robocall", "auto-dial", "autodial", "autodialer"], + "mentions_protection": ["protect", "protection", "tcpa", "consumer protection"], + }, + + # Voicemail Service Detection (YouMail variants) + "voicemail_service": { + "youmail_detected": [ + "youmail", "youmale", "you mail", "you male", + "umale", "umail", "u mail", "u-mail", + ], + }, + + # Recording/Transcription mentions + "recording": { + "mentions_transcribe": ["transcribe", "transcription", "transcribing"], + "mentions_recording": ["being recorded", "this call is recorded", "call is being recorded", + "may be recorded", "will be recorded", "recording this call"], + "mentions_email": ["email", "e-mail"], # Note: may want to exclude "voicemail" + }, + + # Greeting Patterns + "greetings": { + "greeting_yea_hello": ["yea hello", "yeah hello", "ya hello"], + "greeting_hi_informal": ["hi j", "hi g", "hey j", "hey g"], + }, + + # Legal/Professional + "legal": { + "mentions_law": ["law office", "lawoffice", "law firm", "lawfirm", + "attorney", "lawyer", "legal department"], + }, + + # Other Content + "other": { + "profanity": ["fuck", "shit", "damn", "ass"], + "mentions_torn": ["torn"], + "mentions_push": ["push"], + "mentions_pitch": ["pitch", "sales pitch"], + "mentions_bank": ["bank", "banking", "banker"], + "mentions_county": ["county"], + "mentions_general": ["general"], + "mentions_subscribe": ["subscribe", "subscription"], + "why_calling": ["why are you calling", "why you calling", "why do you keep calling"], + }, +} + +default_options = { + "categories": None, # None means all categories + "custom_keywords": {}, # Additional tag -> keywords mappings + "case_sensitive": False, + "min_confidence": 0.0, # Minimum transcription confidence to process +} + + +def get_transcription_text(vcon: Any) -> Optional[str]: + """Extract transcription text from vCon analysis entries.""" + texts = [] + + for analysis in vcon.analysis: + analysis_type = analysis.get("type", "") + + # Handle WTF transcription format + if analysis_type == "wtf_transcription": + body = analysis.get("body", {}) + if isinstance(body, dict): + transcript = body.get("transcript", {}) + if isinstance(transcript, dict): + text = transcript.get("text", "") + if text: + texts.append(text) + # Also check segments for text + segments = body.get("segments", []) + for seg in segments: + seg_text = seg.get("text", "") + if seg_text: + texts.append(seg_text) + + # Handle standard transcription format + elif analysis_type == "transcription": + body = analysis.get("body", "") + if isinstance(body, str): + texts.append(body) + elif isinstance(body, dict): + text = body.get("text", body.get("transcript", "")) + if text: + texts.append(text) + + if texts: + return " ".join(texts) + return None + + +def find_keywords(text: str, keywords: List[str], case_sensitive: bool = False) -> Set[str]: + """Find which keywords are present in the text.""" + if not case_sensitive: + text = text.lower() + + found = set() + for keyword in keywords: + search_keyword = keyword if case_sensitive else keyword.lower() + if search_keyword in text: + found.add(keyword) + + return found + + +def run( + vcon_uuid: str, + link_name: str, + opts: Dict[str, Any] = None, +) -> Optional[str]: + """Process a vCon and add keyword-based tags.""" + merged_opts = default_options.copy() + if opts: + merged_opts.update(opts) + opts = merged_opts + + logger.info(f"Starting keyword_tagger for vCon: {vcon_uuid}") + + vcon_redis = VconRedis() + vcon = vcon_redis.get_vcon(vcon_uuid) + + if not vcon: + logger.error(f"keyword_tagger: vCon {vcon_uuid} not found") + return vcon_uuid + + # Get transcription text + text = get_transcription_text(vcon) + + if not text: + logger.debug(f"No transcription found for vCon {vcon_uuid}") + return vcon_uuid + + logger.debug(f"Analyzing transcription ({len(text)} chars) for vCon {vcon_uuid}") + + case_sensitive = opts.get("case_sensitive", False) + enabled_categories = opts.get("categories") # None = all + custom_keywords = opts.get("custom_keywords", {}) + + tags_added = [] + + # Process built-in keyword rules + for category, rules in KEYWORD_RULES.items(): + # Skip if category filtering is enabled and this category is not in the list + if enabled_categories is not None and category not in enabled_categories: + continue + + for tag_name, keywords in rules.items(): + found = find_keywords(text, keywords, case_sensitive) + if found: + vcon.add_tag(tag_name=tag_name, tag_value=",".join(sorted(found))) + tags_added.append(tag_name) + logger.debug(f"Added tag '{tag_name}' (matched: {found})") + + # Process custom keywords + for tag_name, keywords in custom_keywords.items(): + if isinstance(keywords, str): + keywords = [keywords] + found = find_keywords(text, keywords, case_sensitive) + if found: + vcon.add_tag(tag_name=tag_name, tag_value=",".join(sorted(found))) + tags_added.append(tag_name) + logger.debug(f"Added custom tag '{tag_name}' (matched: {found})") + + if tags_added: + vcon_redis.store_vcon(vcon) + logger.info(f"Added {len(tags_added)} tags to vCon {vcon_uuid}: {tags_added}") + else: + logger.debug(f"No keyword matches for vCon {vcon_uuid}") + + return vcon_uuid From 950bc3c35412d2c4dd256ef5f08fccd1734ed8dc Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Tue, 27 Jan 2026 22:24:27 +0000 Subject: [PATCH 2/7] Add wtf_transcribe link for WTF transcription Co-Authored-By: Claude Opus 4.5 --- server/links/wtf_transcribe/README.md | 140 ++++++++++ server/links/wtf_transcribe/__init__.py | 340 ++++++++++++++++++++++++ 2 files changed, 480 insertions(+) create mode 100644 server/links/wtf_transcribe/README.md create mode 100644 server/links/wtf_transcribe/__init__.py diff --git a/server/links/wtf_transcribe/README.md b/server/links/wtf_transcribe/README.md new file mode 100644 index 0000000..6b22fdc --- /dev/null +++ b/server/links/wtf_transcribe/README.md @@ -0,0 +1,140 @@ +# WTF Transcription Link (vfun Integration) + +A link that sends vCon audio dialogs to a vfun transcription server and adds the results as WTF (World Transcription Format) analysis entries. + +## Overview + +This link integrates with the vfun transcription server to provide: +- Multi-language speech recognition (English + auto-detect) +- Speaker diarization (who spoke when) +- GPU-accelerated processing with CUDA +- WTF-compliant output format per IETF draft-howe-vcon-wtf-extension-01 + +## Configuration + +```yaml +wtf_transcribe: + module: links.wtf_transcribe + options: + # Required: URL of the vfun transcription server + vfun-server-url: http://localhost:8443/transcribe + + # Optional: Enable speaker diarization (default: true) + diarize: true + + # Optional: Request timeout in seconds (default: 300) + timeout: 300 + + # Optional: Minimum dialog duration to transcribe in seconds (default: 5) + min-duration: 5 + + # Optional: API key for vfun server authentication + api-key: your-api-key-here +``` + +## How It Works + +1. **Extract Audio**: Reads audio from vCon dialog (supports `body` with base64/base64url encoding, or `url` with file:// or http:// references) +2. **Send to vfun**: POSTs audio file to vfun's `/transcribe` endpoint +3. **Create WTF Analysis**: Formats the transcription result as a WTF analysis entry +4. **Update vCon**: Adds the WTF analysis to the vCon and stores it back to Redis + +## Output Format + +The link adds analysis entries with the WTF format: + +```json +{ + "type": "wtf_transcription", + "dialog": 0, + "mediatype": "application/json", + "vendor": "vfun", + "product": "parakeet-tdt-110m", + "schema": "wtf-1.0", + "encoding": "json", + "body": { + "transcript": { + "text": "Hello, how can I help you today?", + "language": "en-US", + "duration": 30.0, + "confidence": 0.95 + }, + "segments": [ + { + "id": 0, + "start": 0.0, + "end": 3.5, + "text": "Hello, how can I help you today?", + "confidence": 0.95, + "speaker": 0 + } + ], + "metadata": { + "created_at": "2024-01-15T10:30:00Z", + "processed_at": "2024-01-15T10:30:05Z", + "provider": "vfun", + "model": "parakeet-tdt-110m" + }, + "speakers": { + "0": { + "id": 0, + "label": "Speaker 0", + "segments": [0], + "total_time": 15.2 + } + }, + "quality": { + "average_confidence": 0.95, + "multiple_speakers": true, + "low_confidence_words": 0 + } + } +} +``` + +## Behavior + +- **Skips non-recording dialogs**: Only processes dialogs with `type: "recording"` +- **Skips already transcribed**: Dialogs with existing WTF transcriptions are skipped +- **Duration filtering**: Dialogs shorter than `min-duration` are skipped +- **File URL support**: Can read audio from local `file://` URLs directly + +## Example Chain Configuration + +```yaml +chains: + transcription_chain: + links: + - tag + - wtf_transcribe + - supabase_webhook + ingress_lists: + - transcribe + egress_lists: + - transcribed + enabled: 1 +``` + +## vfun Server + +The vfun server provides GPU-accelerated transcription: + +```bash +# Start vfun server +cd /path/to/vfun +./vfun server + +# Test health +curl http://localhost:8443/ping + +# Manual transcription test +curl -X POST http://localhost:8443/transcribe \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -F "file=@audio.wav" \ + -F "diarize=true" +``` + +## Related + +- [vfun](https://github.com/strolid/vfun) - GPU-accelerated transcription server +- [draft-howe-vcon-wtf-extension](https://datatracker.ietf.org/doc/html/draft-howe-vcon-wtf-extension) - IETF WTF specification diff --git a/server/links/wtf_transcribe/__init__.py b/server/links/wtf_transcribe/__init__.py new file mode 100644 index 0000000..7c07407 --- /dev/null +++ b/server/links/wtf_transcribe/__init__.py @@ -0,0 +1,340 @@ +"""WTF Transcription Link (vfun integration) + +This link sends vCon audio dialogs to a vfun transcription server and adds +the results as WTF (World Transcription Format) analysis entries. + +The vfun server provides: +- Multi-language speech recognition (English + auto-detect) +- Speaker diarization (who spoke when) +- GPU-accelerated processing with CUDA + +Configuration options: + vfun-server-url: URL of the vfun transcription server (required) + diarize: Enable speaker diarization (default: true) + timeout: Request timeout in seconds (default: 300) + min-duration: Minimum dialog duration to transcribe in seconds (default: 5) + api-key: Optional API key for vfun server authentication + +Example configuration in config.yml: + wtf_transcribe: + module: links.wtf_transcribe + options: + vfun-server-url: http://localhost:8443/transcribe + diarize: true + timeout: 300 + min-duration: 5 + api-key: your-api-key-here +""" + +import base64 +import json +import logging +import os +import tempfile +import requests +from datetime import datetime, timezone +from typing import Optional, Dict, Any, List + +from server.lib.vcon_redis import VconRedis +from lib.logging_utils import init_logger +from lib.error_tracking import init_error_tracker + +init_error_tracker() +logger = init_logger(__name__) + +default_options = { + "vfun-server-url": None, + "diarize": True, + "timeout": 300, + "min-duration": 5, + "api-key": None, +} + + +def has_wtf_transcription(vcon: Any, dialog_index: int) -> bool: + """Check if a dialog already has a WTF transcription.""" + for analysis in vcon.analysis: + if (analysis.get("type") == "wtf_transcription" and + analysis.get("dialog") == dialog_index): + return True + return False + + +def should_transcribe_dialog(dialog: Dict[str, Any], min_duration: float) -> bool: + """Check if a dialog should be transcribed.""" + if dialog.get("type") != "recording": + return False + if not dialog.get("body") and not dialog.get("url"): + return False + duration = dialog.get("duration") + if duration is not None and float(duration) < min_duration: + return False + return True + + +def get_audio_content(dialog: Dict[str, Any]) -> Optional[bytes]: + """Extract audio content from dialog body or URL.""" + if dialog.get("body"): + encoding = dialog.get("encoding", "base64") + if encoding == "base64url": + return base64.urlsafe_b64decode(dialog["body"]) + elif encoding == "base64": + return base64.b64decode(dialog["body"]) + else: + return dialog["body"].encode() if isinstance(dialog["body"], str) else dialog["body"] + + if dialog.get("url"): + url = dialog["url"] + if url.startswith("file://"): + filepath = url[7:] + try: + with open(filepath, "rb") as f: + return f.read() + except Exception as e: + logger.error(f"Failed to read file {filepath}: {e}") + return None + else: + try: + resp = requests.get(url, timeout=60) + resp.raise_for_status() + return resp.content + except Exception as e: + logger.error(f"Failed to fetch URL {url}: {e}") + return None + return None + + +def create_wtf_analysis( + dialog_index: int, + vfun_response: Dict[str, Any], + duration: float, +) -> Dict[str, Any]: + """Create a WTF analysis entry from vfun response.""" + now = datetime.now(timezone.utc).isoformat() + + # Extract text and segments from vfun response + # vfun returns: analysis[].body with transcription data + analysis_entries = vfun_response.get("analysis", []) + + full_text = "" + segments = [] + language = "en-US" + + for entry in analysis_entries: + if entry.get("type") in ("transcription", "wtf_transcription"): + body = entry.get("body", {}) + + # Handle different response formats + if isinstance(body, dict): + # WTF format from vfun + transcript = body.get("transcript", {}) + full_text = transcript.get("text", body.get("text", "")) + language = transcript.get("language", body.get("language", "en-US")) + segments = body.get("segments", []) + elif isinstance(body, str): + full_text = body + break + + # If no analysis found, check for direct text field + if not full_text: + full_text = vfun_response.get("text", "") + segments = vfun_response.get("segments", []) + + # Calculate confidence + if segments: + confidences = [s.get("confidence", 0.9) for s in segments] + avg_confidence = sum(confidences) / len(confidences) + else: + avg_confidence = 0.9 + + # Build WTF segments + wtf_segments = [] + for i, seg in enumerate(segments): + wtf_seg = { + "id": seg.get("id", i), + "start": float(seg.get("start", seg.get("start_time", 0.0))), + "end": float(seg.get("end", seg.get("end_time", 0.0))), + "text": seg.get("text", seg.get("transcription", "")), + "confidence": float(seg.get("confidence", 0.9)), + } + if "speaker" in seg: + wtf_seg["speaker"] = seg["speaker"] + wtf_segments.append(wtf_seg) + + # Build speakers section + speakers = {} + for seg in wtf_segments: + speaker = seg.get("speaker") + if speaker is not None: + speaker_key = str(speaker) + if speaker_key not in speakers: + speakers[speaker_key] = { + "id": speaker, + "label": f"Speaker {speaker}", + "segments": [], + "total_time": 0.0, + } + speakers[speaker_key]["segments"].append(seg["id"]) + speakers[speaker_key]["total_time"] += seg["end"] - seg["start"] + + # Build WTF body + wtf_body = { + "transcript": { + "text": full_text, + "language": language, + "duration": float(duration), + "confidence": float(avg_confidence), + }, + "segments": wtf_segments, + "metadata": { + "created_at": now, + "processed_at": now, + "provider": "vfun", + "model": "parakeet-tdt-110m", + "audio": { + "duration": float(duration), + }, + }, + "quality": { + "average_confidence": float(avg_confidence), + "multiple_speakers": len(speakers) > 1, + "low_confidence_words": sum(1 for s in wtf_segments if s.get("confidence", 1.0) < 0.5), + }, + } + + if speakers: + wtf_body["speakers"] = speakers + + return { + "type": "wtf_transcription", + "dialog": dialog_index, + "mediatype": "application/json", + "vendor": "vfun", + "product": "parakeet-tdt-110m", + "schema": "wtf-1.0", + # Note: encoding omitted since body is a direct object, not a JSON string + "body": wtf_body, + } + + +def run( + vcon_uuid: str, + link_name: str, + opts: Dict[str, Any] = None, +) -> Optional[str]: + """Process a vCon through the vfun transcription service.""" + merged_opts = default_options.copy() + if opts: + merged_opts.update(opts) + opts = merged_opts + + logger.info(f"Starting wtf_transcribe link for vCon: {vcon_uuid}") + + vfun_server_url = opts.get("vfun-server-url") + if not vfun_server_url: + logger.error("wtf_transcribe: vfun-server-url is required") + return vcon_uuid + + vcon_redis = VconRedis() + vcon = vcon_redis.get_vcon(vcon_uuid) + + if not vcon: + logger.error(f"wtf_transcribe: vCon {vcon_uuid} not found") + return vcon_uuid + + # Find dialogs to transcribe + dialogs_processed = 0 + dialogs_skipped = 0 + + for i, dialog in enumerate(vcon.dialog): + if not should_transcribe_dialog(dialog, opts.get("min-duration", 5)): + logger.debug(f"Skipping dialog {i} (not eligible)") + dialogs_skipped += 1 + continue + + if has_wtf_transcription(vcon, i): + logger.debug(f"Skipping dialog {i} (already transcribed)") + dialogs_skipped += 1 + continue + + # Get audio content + audio_content = get_audio_content(dialog) + if not audio_content: + logger.warning(f"Could not extract audio from dialog {i}") + dialogs_skipped += 1 + continue + + logger.info(f"Transcribing dialog {i} for vCon {vcon_uuid}") + + try: + # Build request to vfun server + headers = {} + api_key = opts.get("api-key") + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + # Get filename from dialog or generate one + filename = dialog.get("filename", f"audio_{i}.wav") + mimetype = dialog.get("mimetype", "audio/wav") + + # Send audio to vfun server + files = {"file": (filename, audio_content, mimetype)} + data = { + "diarize": str(opts.get("diarize", True)), + "block": "true", + } + + response = requests.post( + vfun_server_url, + files=files, + data=data, + headers=headers, + timeout=opts.get("timeout", 300), + ) + + if response.status_code in (200, 302): + vfun_response = response.json() + # Handle double-encoded JSON (vfun sometimes returns JSON string) + if isinstance(vfun_response, str): + vfun_response = json.loads(vfun_response) + + duration = dialog.get("duration", 30.0) + wtf_analysis = create_wtf_analysis(i, vfun_response, float(duration)) + + # Add analysis to vCon + vcon.add_analysis( + type=wtf_analysis["type"], + dialog=wtf_analysis["dialog"], + vendor=wtf_analysis.get("vendor"), + body=wtf_analysis["body"], + extra={ + "mediatype": wtf_analysis.get("mediatype"), + "product": wtf_analysis.get("product"), + "schema": wtf_analysis.get("schema"), + }, + ) + + dialogs_processed += 1 + logger.info(f"Added WTF transcription for dialog {i}") + + else: + logger.error( + f"vfun transcription failed for dialog {i}: " + f"status={response.status_code}, response={response.text[:200]}" + ) + + except requests.exceptions.Timeout: + logger.error(f"vfun transcription timed out for dialog {i}") + except Exception as e: + logger.error(f"Error transcribing dialog {i}: {e}", exc_info=True) + + if dialogs_processed > 0: + vcon_redis.store_vcon(vcon) + logger.info( + f"Updated vCon {vcon_uuid}: processed={dialogs_processed}, " + f"skipped={dialogs_skipped}" + ) + else: + logger.info(f"No dialogs transcribed for vCon {vcon_uuid}") + + return vcon_uuid From 7f74f867f81f935d7bc7d461b0d5ea80daa7e339 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Tue, 27 Jan 2026 22:24:43 +0000 Subject: [PATCH 3/7] Ensure vcon version 0.3.0 for webhook compatibility Updates webhook link to set vcon version to 0.3.0 for compatibility with vcon-mcp REST API. Co-Authored-By: Claude Opus 4.5 --- server/links/webhook/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/links/webhook/__init__.py b/server/links/webhook/__init__.py index 16b433c..28c8afd 100644 --- a/server/links/webhook/__init__.py +++ b/server/links/webhook/__init__.py @@ -33,6 +33,10 @@ def run( # The webhook needs a stringified JSON version. json_dict = vCon.to_dict() + # Ensure vcon version is 0.3.0 for compatibility with vcon-mcp REST API + if json_dict.get("vcon") == "0.0.1" or "vcon" not in json_dict: + json_dict["vcon"] = "0.3.0" + # Build headers from configuration headers = opts.get("headers", {}) From 161b623f7333bf7f8d1364b4b5999826d52c1e95 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Tue, 27 Jan 2026 22:27:42 +0000 Subject: [PATCH 4/7] Use HTTPS for apt sources in Dockerfile Configure apt to use HTTPS sources for environments where HTTP port 80 is blocked. Co-Authored-By: Claude Opus 4.5 --- docker/Dockerfile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docker/Dockerfile b/docker/Dockerfile index f1c781d..13091e2 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -10,6 +10,9 @@ ENV VCON_SERVER_VERSION=${VCON_SERVER_VERSION} ENV VCON_SERVER_GIT_COMMIT=${VCON_SERVER_GIT_COMMIT} ENV VCON_SERVER_BUILD_TIME=${VCON_SERVER_BUILD_TIME} +# Configure apt to use HTTPS sources (required when HTTP port 80 is blocked) +RUN sed -i 's|http://deb.debian.org|https://deb.debian.org|g' /etc/apt/sources.list.d/debian.sources + RUN apt-get update && \ apt-get install -y libavdevice-dev ffmpeg From 74e747b7ce91e19457b3ac96d6ad4c04022584a2 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Tue, 27 Jan 2026 22:27:19 +0000 Subject: [PATCH 5/7] Add SigNoz observability configuration Includes docker-compose and config files for SigNoz observability stack with OpenTelemetry collector. Co-Authored-By: Claude Opus 4.5 --- docker-compose.signoz.yml | 120 ++++++++++++++++++++ signoz/README.md | 176 ++++++++++++++++++++++++++++++ signoz/alertmanager.yml | 19 ++++ signoz/clickhouse-users.xml | 38 +++++++ signoz/otel-collector-config.yaml | 48 ++++++++ signoz/zz-clickhouse-config.xml | 53 +++++++++ 6 files changed, 454 insertions(+) create mode 100644 docker-compose.signoz.yml create mode 100644 signoz/README.md create mode 100644 signoz/alertmanager.yml create mode 100644 signoz/clickhouse-users.xml create mode 100644 signoz/otel-collector-config.yaml create mode 100644 signoz/zz-clickhouse-config.xml diff --git a/docker-compose.signoz.yml b/docker-compose.signoz.yml new file mode 100644 index 0000000..d57fb8f --- /dev/null +++ b/docker-compose.signoz.yml @@ -0,0 +1,120 @@ +# SigNoz Observability Stack +# Usage: docker compose -f docker-compose.yml -f docker-compose.override.yml -f docker-compose.signoz.yml up -d +# +# After first run, execute schema migrations: +# docker run --rm --network conserver signoz/signoz-schema-migrator:latest sync --dsn='tcp://signoz-clickhouse:9000' +# +# Access UI at: http://localhost:3301 + +networks: + conserver: + external: true + +volumes: + signoz_clickhouse_data: + signoz_zookeeper_data: + signoz_zookeeper_log: + signoz_data: + +services: + signoz-zookeeper: + image: zookeeper:3.9 + container_name: signoz-zookeeper + hostname: signoz-zookeeper + environment: + - ZOO_AUTOPURGE_PURGEINTERVAL=1 + - ZOO_4LW_COMMANDS_WHITELIST=mntr,ruok,stat + volumes: + - signoz_zookeeper_data:/data + - signoz_zookeeper_log:/datalog + networks: + - conserver + healthcheck: + test: ["CMD-SHELL", "echo ruok | nc localhost 2181 | grep imok"] + interval: 30s + timeout: 10s + retries: 3 + restart: unless-stopped + + signoz-clickhouse: + image: clickhouse/clickhouse-server:24.1.2-alpine + container_name: signoz-clickhouse + hostname: signoz-clickhouse + tty: true + depends_on: + signoz-zookeeper: + condition: service_healthy + volumes: + - signoz_clickhouse_data:/var/lib/clickhouse + - ./signoz/zz-clickhouse-config.xml:/etc/clickhouse-server/config.d/zz-clickhouse-config.xml:ro + - ./signoz/clickhouse-users.xml:/etc/clickhouse-server/users.d/users.xml:ro + environment: + - CLICKHOUSE_DB=signoz_traces + - CLICKHOUSE_USER=default + - CLICKHOUSE_PASSWORD= + ulimits: + nofile: + soft: 262144 + hard: 262144 + networks: + - conserver + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:8123/ping"] + interval: 30s + timeout: 10s + retries: 3 + restart: unless-stopped + + signoz-otel-collector: + image: signoz/signoz-otel-collector:latest + container_name: signoz-otel-collector + hostname: signoz-otel-collector + command: + - "--config=/etc/otel-collector-config.yaml" + depends_on: + signoz-clickhouse: + condition: service_healthy + environment: + - OTEL_RESOURCE_ATTRIBUTES=host.name=signoz-host,os.type=linux + volumes: + - ./signoz/otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro + ports: + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP HTTP receiver + networks: + - conserver + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:13133/"] + interval: 30s + timeout: 10s + retries: 3 + restart: unless-stopped + + signoz: + image: signoz/query-service:latest + container_name: signoz + hostname: signoz + depends_on: + signoz-clickhouse: + condition: service_healthy + environment: + - ClickHouseUrl=tcp://signoz-clickhouse:9000 + - SIGNOZ_LOCAL_DB_PATH=/var/lib/signoz/signoz.db + - DASHBOARDS_PATH=/root/config/dashboards + - STORAGE=clickhouse + - GODEBUG=netdns=go + - TELEMETRY_ENABLED=true + - DEPLOYMENT_TYPE=docker-standalone + volumes: + - signoz_data:/var/lib/signoz + - ./signoz/dashboards:/root/config/dashboards + ports: + - "3301:8080" # Web UI + networks: + - conserver + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:8080/api/v1/health"] + interval: 30s + timeout: 10s + retries: 3 + restart: unless-stopped diff --git a/signoz/README.md b/signoz/README.md new file mode 100644 index 0000000..cc38424 --- /dev/null +++ b/signoz/README.md @@ -0,0 +1,176 @@ +# SigNoz Observability Stack for vcon-server + +This directory contains the configuration for SigNoz, a self-hosted observability platform that collects traces, metrics, and logs from the vcon-mcp server via OpenTelemetry. + +## Architecture + +``` +┌─────────────────┐ OTLP/HTTP ┌──────────────────────┐ +│ vcon-mcp │ ─────────────────► │ signoz-otel-collector│ +│ (instrumented) │ :4318 │ (OTLP receiver) │ +└─────────────────┘ └──────────┬───────────┘ + │ + ▼ +┌─────────────────┐ ┌──────────────────────┐ +│ signoz (UI) │ ◄────────────────► │ signoz-clickhouse │ +│ :3301 │ TCP :9000 │ (time-series DB) │ +└─────────────────┘ └──────────┬───────────┘ + │ + ▼ + ┌──────────────────────┐ + │ signoz-zookeeper │ + │ (coordination) │ + └──────────────────────┘ +``` + +## Components + +| Service | Image | Purpose | Ports | +|---------|-------|---------|-------| +| signoz | `signoz/query-service:latest` | Query API + Web UI | 3301 (mapped from 8080) | +| signoz-otel-collector | `signoz/signoz-otel-collector:latest` | OTLP ingestion | 4317 (gRPC), 4318 (HTTP) | +| signoz-clickhouse | `clickhouse/clickhouse-server:24.1.2-alpine` | Time-series storage | 8123, 9000 (internal) | +| signoz-zookeeper | `zookeeper:3.9` | ClickHouse coordination | 2181 (internal) | + +## Configuration Files + +### otel-collector-config.yaml +OpenTelemetry Collector pipeline configuration: +- **Receivers**: OTLP gRPC (4317) and HTTP (4318) +- **Processors**: Batch processing +- **Exporters**: ClickHouse for traces, metrics, and logs + +### zz-clickhouse-config.xml +ClickHouse server configuration: +- IPv4 listening (0.0.0.0) +- Single-node cluster named "cluster" (required by SigNoz schema migrator) +- ZooKeeper integration for distributed DDL + +### clickhouse-users.xml +ClickHouse user permissions with default user having full access. + +### alertmanager.yml +Basic alertmanager configuration (not currently active). + +## Usage + +### Start with SigNoz + +```bash +cd /home/thomas/bds/vcon-dev/vcon-server +docker compose -f docker-compose.yml -f docker-compose.override.yml -f docker-compose.signoz.yml up -d +``` + +### Start without SigNoz (normal operation) + +```bash +cd /home/thomas/bds/vcon-dev/vcon-server +docker compose -f docker-compose.yml -f docker-compose.override.yml up -d +``` + +### Stop SigNoz only + +```bash +docker compose -f docker-compose.signoz.yml down +``` + +### Access the UI + +Open http://localhost:3301 in your browser. + +## First-Time Setup + +After starting SigNoz for the first time, run the schema migrations: + +```bash +docker run --rm --network conserver \ + signoz/signoz-schema-migrator:latest \ + sync --dsn='tcp://signoz-clickhouse:9000' +``` + +Note: Some migrations may fail due to JSON type syntax incompatibility with ClickHouse 24.1. Core functionality still works. + +## vcon-mcp Integration + +The vcon-mcp service is configured with these environment variables in `docker-compose.override.yml`: + +```yaml +environment: + OTEL_ENABLED: "true" + OTEL_EXPORTER_TYPE: otlp + OTEL_ENDPOINT: http://signoz-otel-collector:4318 + OTEL_SERVICE_NAME: vcon-mcp-server +``` + +## Verification + +1. Check service health: + ```bash + curl http://localhost:3301/api/v1/health + # Returns: {"status":"ok"} + ``` + +2. Check container status: + ```bash + docker ps | grep signoz + ``` + +3. View collector logs: + ```bash + docker logs signoz-otel-collector + ``` + +## Troubleshooting + +### ClickHouse won't start +- Check if port 9000 is in use +- Verify zookeeper is healthy first +- Check logs: `docker logs signoz-clickhouse` + +### OTEL Collector errors +- Ensure ClickHouse is healthy before starting collector +- Verify schema migrations have run +- Check config syntax: `docker logs signoz-otel-collector` + +### No data in UI +- Verify vcon-mcp is sending data (check its logs for OTEL export messages) +- Ensure collector is receiving data: check collector metrics at port 8888 +- Verify ClickHouse tables exist: `docker exec signoz-clickhouse clickhouse-client --query "SHOW TABLES FROM signoz_traces"` + +### Port conflicts +- Default ports: 3301 (UI), 4317 (gRPC), 4318 (HTTP) +- Change in docker-compose.signoz.yml if needed + +## Known Issues + +1. **Schema Migration Failures**: Some newer SigNoz migrations use JSON column types with syntax not supported in ClickHouse 24.1.2. Core observability works but some advanced features may be limited. + +2. **Alertmanager**: Not configured for this deployment. Would require additional setup for alerts. + +3. **Health Check Timing**: The OTEL collector health check may show "starting" for extended periods but the service is functional. + +## Future Improvements + +- Upgrade ClickHouse to latest version for full schema compatibility +- Add alertmanager configuration for alerts +- Configure data retention policies +- Add authentication to SigNoz UI +- Set up dashboards for vcon-mcp metrics + +## Data Persistence + +Data is stored in Docker volumes: +- `signoz_clickhouse_data` - Traces, metrics, logs +- `signoz_zookeeper_data` - ZooKeeper state +- `signoz_data` - SigNoz query service state + +To reset all data: +```bash +docker compose -f docker-compose.signoz.yml down -v +``` + +## Resources + +- [SigNoz Documentation](https://signoz.io/docs/) +- [OpenTelemetry Documentation](https://opentelemetry.io/docs/) +- [ClickHouse Documentation](https://clickhouse.com/docs/) diff --git a/signoz/alertmanager.yml b/signoz/alertmanager.yml new file mode 100644 index 0000000..89b0125 --- /dev/null +++ b/signoz/alertmanager.yml @@ -0,0 +1,19 @@ +global: + resolve_timeout: 5m + +route: + group_by: ['alertname'] + group_wait: 10s + group_interval: 10s + repeat_interval: 1h + receiver: 'default-receiver' + +receivers: + - name: 'default-receiver' + +inhibit_rules: + - source_match: + severity: 'critical' + target_match: + severity: 'warning' + equal: ['alertname', 'dev', 'instance'] diff --git a/signoz/clickhouse-users.xml b/signoz/clickhouse-users.xml new file mode 100644 index 0000000..c545475 --- /dev/null +++ b/signoz/clickhouse-users.xml @@ -0,0 +1,38 @@ + + + + 10000000000 + 0 + random + 100 + + + 1 + + + + + + + + ::/0 + + default + default + 1 + + + + + + + 3600 + 0 + 0 + 0 + 0 + 0 + + + + diff --git a/signoz/otel-collector-config.yaml b/signoz/otel-collector-config.yaml new file mode 100644 index 0000000..eeedb0c --- /dev/null +++ b/signoz/otel-collector-config.yaml @@ -0,0 +1,48 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + send_batch_size: 10000 + timeout: 10s + +exporters: + clickhousetraces: + datasource: tcp://signoz-clickhouse:9000/signoz_traces + clickhouselogsexporter: + dsn: tcp://signoz-clickhouse:9000/signoz_logs + timeout: 5s + sending_queue: + queue_size: 100 + retry_on_failure: + enabled: true + initial_interval: 5s + max_interval: 30s + max_elapsed_time: 300s + signozclickhousemetrics: + dsn: tcp://signoz-clickhouse:9000/signoz_metrics + +extensions: + health_check: + endpoint: 0.0.0.0:13133 + +service: + extensions: [health_check] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [clickhousetraces] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [signozclickhousemetrics] + logs: + receivers: [otlp] + processors: [batch] + exporters: [clickhouselogsexporter] diff --git a/signoz/zz-clickhouse-config.xml b/signoz/zz-clickhouse-config.xml new file mode 100644 index 0000000..d50368e --- /dev/null +++ b/signoz/zz-clickhouse-config.xml @@ -0,0 +1,53 @@ + + + warning + true + + + 0.0.0.0 + + 4096 + 3 + 100 + + /var/lib/clickhouse/ + /var/lib/clickhouse/tmp/ + /var/lib/clickhouse/user_files/ + + users.xml + default + default + + UTC + + true + + 3600 + + 3600 + 60 + + + + + + signoz-clickhouse + 9000 + + + + + + + cluster + 01 + signoz-clickhouse + + + + + signoz-zookeeper + 2181 + + + From cefef641874f2d61b17f25a4b0bd13a9c1025346 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Sun, 8 Feb 2026 01:07:43 +0000 Subject: [PATCH 6/7] Add /stats/queue endpoint for Redis queue depth monitoring Public endpoint (no auth) that returns the depth of any Redis list, used by the audio adapter for backpressure control. Co-Authored-By: Claude Opus 4.6 --- server/api.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/server/api.py b/server/api.py index c38d7ca..78b40b0 100644 --- a/server/api.py +++ b/server/api.py @@ -260,6 +260,24 @@ async def health_check() -> JSONResponse: }) +@app.get( + "/stats/queue", + summary="Get queue depth", + description="Returns the number of items in a Redis list (queue)", + tags=["system"], +) +async def get_queue_depth( + list_name: str = Query(..., description="Name of the Redis list to measure") +) -> JSONResponse: + """Get the current depth of a Redis list. Public endpoint (no auth) for monitoring and backpressure.""" + try: + depth = await redis_async.llen(list_name) + return JSONResponse(content={"list_name": list_name, "depth": depth}) + except Exception as e: + logger.error(f"Error getting queue depth for '{list_name}': {str(e)}") + raise HTTPException(status_code=500, detail="Failed to get queue depth") + + class Vcon(BaseModel): """Pydantic model representing a vCon (Voice Conversation) record. From bd0715f4c35954555d2933a97a19d518178cf2be Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Wed, 18 Feb 2026 17:17:07 +0000 Subject: [PATCH 7/7] Optimize vCon ingest by removing redundant Redis operations The post_vcon and external_ingress_vcon paths called index_vcon() which re-read the vCon from Redis (JSON.GET) and duplicated the sorted set add (ZADD) that was already done by the caller. This added 2 unnecessary Redis round-trips per ingest. Extract index_vcon_parties() that takes the vCon dict directly, and use it in both POST paths. The original index_vcon() is preserved for the bulk re-indexing endpoint. Reduces ingest from 11 to 9 Redis ops per vCon, measured 4.9x improvement in adapter posting throughput. Co-Authored-By: Claude Opus 4.6 --- server/api.py | 45 ++++++++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/server/api.py b/server/api.py index 78b40b0..1f3ce96 100644 --- a/server/api.py +++ b/server/api.py @@ -677,7 +677,7 @@ async def post_vcon( await add_vcon_to_set(key, timestamp) logger.debug(f"Indexing vCon {inbound_vcon.uuid}") - await index_vcon(inbound_vcon.uuid) + await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"]) # Add to ingress lists if specified if ingress_lists: @@ -772,7 +772,7 @@ async def external_ingress_vcon( await add_vcon_to_set(key, timestamp) logger.debug(f"Indexing vCon {inbound_vcon.uuid}") - await index_vcon(inbound_vcon.uuid) + await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"]) # Always add to the specified ingress list (required for this endpoint) vcon_uuid_str = str(inbound_vcon.uuid) @@ -1060,25 +1060,17 @@ async def get_dlq_vcons( raise HTTPException(status_code=500, detail="Failed to read DLQ") -async def index_vcon(uuid: UUID) -> None: - """Index a vCon for searching. +async def index_vcon_parties(vcon_uuid: str, parties: list) -> None: + """Index a vCon's parties for searching. - Adds the vCon to the sorted set and indexes it by party information - (tel, mailto, name) for searching. All indexed keys will expire after - VCON_INDEX_EXPIRY seconds. + Indexes by party information (tel, mailto, name). All indexed keys + will expire after VCON_INDEX_EXPIRY seconds. Args: - uuid: UUID of the vCon to index + vcon_uuid: UUID string of the vCon + parties: List of party dicts from the vCon """ - key = f"vcon:{uuid}" - vcon = await redis_async.json().get(key) - created_at = datetime.fromisoformat(vcon["created_at"]) - timestamp = int(created_at.timestamp()) - vcon_uuid = vcon["uuid"] - await add_vcon_to_set(key, timestamp) - - # Index by party information with expiration - for party in vcon["parties"]: + for party in parties: if party.get("tel"): tel_key = f"tel:{party['tel']}" await redis_async.sadd(tel_key, vcon_uuid) @@ -1093,6 +1085,25 @@ async def index_vcon(uuid: UUID) -> None: await redis_async.expire(name_key, VCON_INDEX_EXPIRY) +async def index_vcon(uuid: UUID) -> None: + """Index a vCon for searching (reads from Redis). + + Reads the vCon from Redis, adds it to the sorted set, and indexes + by party information. Used for bulk re-indexing. For the ingest path, + use index_vcon_parties() directly to avoid redundant Redis reads. + + Args: + uuid: UUID of the vCon to index + """ + key = f"vcon:{uuid}" + vcon = await redis_async.json().get(key) + created_at = datetime.fromisoformat(vcon["created_at"]) + timestamp = int(created_at.timestamp()) + vcon_uuid = vcon["uuid"] + await add_vcon_to_set(key, timestamp) + await index_vcon_parties(vcon_uuid, vcon["parties"]) + + @api_router.get( "/index_vcons", status_code=200,