diff --git a/docker-compose.signoz.yml b/docker-compose.signoz.yml
new file mode 100644
index 0000000..d57fb8f
--- /dev/null
+++ b/docker-compose.signoz.yml
@@ -0,0 +1,120 @@
+# SigNoz Observability Stack
+# Usage: docker compose -f docker-compose.yml -f docker-compose.override.yml -f docker-compose.signoz.yml up -d
+#
+# After first run, execute schema migrations:
+# docker run --rm --network conserver signoz/signoz-schema-migrator:latest sync --dsn='tcp://signoz-clickhouse:9000'
+#
+# Access UI at: http://localhost:3301
+
+networks:
+ conserver:
+ external: true
+
+volumes:
+ signoz_clickhouse_data:
+ signoz_zookeeper_data:
+ signoz_zookeeper_log:
+ signoz_data:
+
+services:
+ signoz-zookeeper:
+ image: zookeeper:3.9
+ container_name: signoz-zookeeper
+ hostname: signoz-zookeeper
+ environment:
+ - ZOO_AUTOPURGE_PURGEINTERVAL=1
+ - ZOO_4LW_COMMANDS_WHITELIST=mntr,ruok,stat
+ volumes:
+ - signoz_zookeeper_data:/data
+ - signoz_zookeeper_log:/datalog
+ networks:
+ - conserver
+ healthcheck:
+ test: ["CMD-SHELL", "echo ruok | nc localhost 2181 | grep imok"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+ restart: unless-stopped
+
+ signoz-clickhouse:
+ image: clickhouse/clickhouse-server:24.1.2-alpine
+ container_name: signoz-clickhouse
+ hostname: signoz-clickhouse
+ tty: true
+ depends_on:
+ signoz-zookeeper:
+ condition: service_healthy
+ volumes:
+ - signoz_clickhouse_data:/var/lib/clickhouse
+ - ./signoz/zz-clickhouse-config.xml:/etc/clickhouse-server/config.d/zz-clickhouse-config.xml:ro
+ - ./signoz/clickhouse-users.xml:/etc/clickhouse-server/users.d/users.xml:ro
+ environment:
+ - CLICKHOUSE_DB=signoz_traces
+ - CLICKHOUSE_USER=default
+ - CLICKHOUSE_PASSWORD=
+ ulimits:
+ nofile:
+ soft: 262144
+ hard: 262144
+ networks:
+ - conserver
+ healthcheck:
+ test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:8123/ping"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+ restart: unless-stopped
+
+ signoz-otel-collector:
+ image: signoz/signoz-otel-collector:latest
+ container_name: signoz-otel-collector
+ hostname: signoz-otel-collector
+ command:
+ - "--config=/etc/otel-collector-config.yaml"
+ depends_on:
+ signoz-clickhouse:
+ condition: service_healthy
+ environment:
+ - OTEL_RESOURCE_ATTRIBUTES=host.name=signoz-host,os.type=linux
+ volumes:
+ - ./signoz/otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro
+ ports:
+ - "4317:4317" # OTLP gRPC receiver
+ - "4318:4318" # OTLP HTTP receiver
+ networks:
+ - conserver
+ healthcheck:
+ test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:13133/"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+ restart: unless-stopped
+
+ signoz:
+ image: signoz/query-service:latest
+ container_name: signoz
+ hostname: signoz
+ depends_on:
+ signoz-clickhouse:
+ condition: service_healthy
+ environment:
+ - ClickHouseUrl=tcp://signoz-clickhouse:9000
+ - SIGNOZ_LOCAL_DB_PATH=/var/lib/signoz/signoz.db
+ - DASHBOARDS_PATH=/root/config/dashboards
+ - STORAGE=clickhouse
+ - GODEBUG=netdns=go
+ - TELEMETRY_ENABLED=true
+ - DEPLOYMENT_TYPE=docker-standalone
+ volumes:
+ - signoz_data:/var/lib/signoz
+ - ./signoz/dashboards:/root/config/dashboards
+ ports:
+ - "3301:8080" # Web UI
+ networks:
+ - conserver
+ healthcheck:
+ test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:8080/api/v1/health"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+ restart: unless-stopped
diff --git a/docker/Dockerfile b/docker/Dockerfile
index f1c781d..13091e2 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -10,6 +10,9 @@ ENV VCON_SERVER_VERSION=${VCON_SERVER_VERSION}
ENV VCON_SERVER_GIT_COMMIT=${VCON_SERVER_GIT_COMMIT}
ENV VCON_SERVER_BUILD_TIME=${VCON_SERVER_BUILD_TIME}
+# Configure apt to use HTTPS sources (required when HTTP port 80 is blocked)
+RUN sed -i 's|http://deb.debian.org|https://deb.debian.org|g' /etc/apt/sources.list.d/debian.sources
+
RUN apt-get update && \
apt-get install -y libavdevice-dev ffmpeg
diff --git a/server/api.py b/server/api.py
index c38d7ca..1f3ce96 100644
--- a/server/api.py
+++ b/server/api.py
@@ -260,6 +260,24 @@ async def health_check() -> JSONResponse:
})
+@app.get(
+ "/stats/queue",
+ summary="Get queue depth",
+ description="Returns the number of items in a Redis list (queue)",
+ tags=["system"],
+)
+async def get_queue_depth(
+ list_name: str = Query(..., description="Name of the Redis list to measure")
+) -> JSONResponse:
+ """Get the current depth of a Redis list. Public endpoint (no auth) for monitoring and backpressure."""
+ try:
+ depth = await redis_async.llen(list_name)
+ return JSONResponse(content={"list_name": list_name, "depth": depth})
+ except Exception as e:
+ logger.error(f"Error getting queue depth for '{list_name}': {str(e)}")
+ raise HTTPException(status_code=500, detail="Failed to get queue depth")
+
+
class Vcon(BaseModel):
"""Pydantic model representing a vCon (Voice Conversation) record.
@@ -659,7 +677,7 @@ async def post_vcon(
await add_vcon_to_set(key, timestamp)
logger.debug(f"Indexing vCon {inbound_vcon.uuid}")
- await index_vcon(inbound_vcon.uuid)
+ await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"])
# Add to ingress lists if specified
if ingress_lists:
@@ -754,7 +772,7 @@ async def external_ingress_vcon(
await add_vcon_to_set(key, timestamp)
logger.debug(f"Indexing vCon {inbound_vcon.uuid}")
- await index_vcon(inbound_vcon.uuid)
+ await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"])
# Always add to the specified ingress list (required for this endpoint)
vcon_uuid_str = str(inbound_vcon.uuid)
@@ -1042,25 +1060,17 @@ async def get_dlq_vcons(
raise HTTPException(status_code=500, detail="Failed to read DLQ")
-async def index_vcon(uuid: UUID) -> None:
- """Index a vCon for searching.
+async def index_vcon_parties(vcon_uuid: str, parties: list) -> None:
+ """Index a vCon's parties for searching.
- Adds the vCon to the sorted set and indexes it by party information
- (tel, mailto, name) for searching. All indexed keys will expire after
- VCON_INDEX_EXPIRY seconds.
+ Indexes by party information (tel, mailto, name). All indexed keys
+ will expire after VCON_INDEX_EXPIRY seconds.
Args:
- uuid: UUID of the vCon to index
+ vcon_uuid: UUID string of the vCon
+ parties: List of party dicts from the vCon
"""
- key = f"vcon:{uuid}"
- vcon = await redis_async.json().get(key)
- created_at = datetime.fromisoformat(vcon["created_at"])
- timestamp = int(created_at.timestamp())
- vcon_uuid = vcon["uuid"]
- await add_vcon_to_set(key, timestamp)
-
- # Index by party information with expiration
- for party in vcon["parties"]:
+ for party in parties:
if party.get("tel"):
tel_key = f"tel:{party['tel']}"
await redis_async.sadd(tel_key, vcon_uuid)
@@ -1075,6 +1085,25 @@ async def index_vcon(uuid: UUID) -> None:
await redis_async.expire(name_key, VCON_INDEX_EXPIRY)
+async def index_vcon(uuid: UUID) -> None:
+ """Index a vCon for searching (reads from Redis).
+
+ Reads the vCon from Redis, adds it to the sorted set, and indexes
+ by party information. Used for bulk re-indexing. For the ingest path,
+ use index_vcon_parties() directly to avoid redundant Redis reads.
+
+ Args:
+ uuid: UUID of the vCon to index
+ """
+ key = f"vcon:{uuid}"
+ vcon = await redis_async.json().get(key)
+ created_at = datetime.fromisoformat(vcon["created_at"])
+ timestamp = int(created_at.timestamp())
+ vcon_uuid = vcon["uuid"]
+ await add_vcon_to_set(key, timestamp)
+ await index_vcon_parties(vcon_uuid, vcon["parties"])
+
+
@api_router.get(
"/index_vcons",
status_code=200,
diff --git a/server/links/keyword_tagger/__init__.py b/server/links/keyword_tagger/__init__.py
new file mode 100644
index 0000000..a7e8ccf
--- /dev/null
+++ b/server/links/keyword_tagger/__init__.py
@@ -0,0 +1,211 @@
+"""Keyword Tagger Link
+
+This link searches vCon transcriptions for specific keywords and phrases,
+then adds corresponding tags based on matches.
+
+Categories:
+- Compliance & Regulatory (do_not_call, take_me_off, stop_calling, fcc, etc.)
+- Voicemail Service Detection (youmail variants)
+- Recording/Transcription mentions
+- Greeting Patterns
+- Legal/Professional
+- Other Content
+
+Configuration options:
+ categories: List of category names to enable (default: all)
+ custom_keywords: Dict of additional tag -> keywords mappings
+ case_sensitive: Whether to match case-sensitively (default: false)
+"""
+
+import re
+from typing import Any, Dict, List, Optional, Set
+
+from server.lib.vcon_redis import VconRedis
+from lib.logging_utils import init_logger
+
+logger = init_logger(__name__)
+
+# Keyword definitions organized by category
+# Format: tag_name -> list of keywords/phrases to search for
+KEYWORD_RULES = {
+ # Compliance & Regulatory
+ "compliance": {
+ "do_not_call": ["do not call", "don't call", "dont call", "do-not-call"],
+ "take_me_off": ["take me off", "take us off", "remove me", "remove us"],
+ "stop_calling": ["stop calling", "quit calling", "stop call"],
+ "mentions_fcc": ["fcc", "federal communications"],
+ "mentions_report": ["report you", "report this", "reporting", "file a complaint"],
+ "mentions_enforcement": ["enforc", "attorney general", "lawsuit", "sue you", "legal action"],
+ "mentions_scam": ["scam", "scammer", "scamming", "fraudulent", "fraud"],
+ "mentions_spam": ["spam", "spammer", "spamming", "junk call"],
+ "mentions_robocall": ["robo", "robocall", "auto-dial", "autodial", "autodialer"],
+ "mentions_protection": ["protect", "protection", "tcpa", "consumer protection"],
+ },
+
+ # Voicemail Service Detection (YouMail variants)
+ "voicemail_service": {
+ "youmail_detected": [
+ "youmail", "youmale", "you mail", "you male",
+ "umale", "umail", "u mail", "u-mail",
+ ],
+ },
+
+ # Recording/Transcription mentions
+ "recording": {
+ "mentions_transcribe": ["transcribe", "transcription", "transcribing"],
+ "mentions_recording": ["being recorded", "this call is recorded", "call is being recorded",
+ "may be recorded", "will be recorded", "recording this call"],
+ "mentions_email": ["email", "e-mail"], # Note: may want to exclude "voicemail"
+ },
+
+ # Greeting Patterns
+ "greetings": {
+ "greeting_yea_hello": ["yea hello", "yeah hello", "ya hello"],
+ "greeting_hi_informal": ["hi j", "hi g", "hey j", "hey g"],
+ },
+
+ # Legal/Professional
+ "legal": {
+ "mentions_law": ["law office", "lawoffice", "law firm", "lawfirm",
+ "attorney", "lawyer", "legal department"],
+ },
+
+ # Other Content
+ "other": {
+ "profanity": ["fuck", "shit", "damn", "ass"],
+ "mentions_torn": ["torn"],
+ "mentions_push": ["push"],
+ "mentions_pitch": ["pitch", "sales pitch"],
+ "mentions_bank": ["bank", "banking", "banker"],
+ "mentions_county": ["county"],
+ "mentions_general": ["general"],
+ "mentions_subscribe": ["subscribe", "subscription"],
+ "why_calling": ["why are you calling", "why you calling", "why do you keep calling"],
+ },
+}
+
+default_options = {
+ "categories": None, # None means all categories
+ "custom_keywords": {}, # Additional tag -> keywords mappings
+ "case_sensitive": False,
+ "min_confidence": 0.0, # Minimum transcription confidence to process
+}
+
+
+def get_transcription_text(vcon: Any) -> Optional[str]:
+ """Extract transcription text from vCon analysis entries."""
+ texts = []
+
+ for analysis in vcon.analysis:
+ analysis_type = analysis.get("type", "")
+
+ # Handle WTF transcription format
+ if analysis_type == "wtf_transcription":
+ body = analysis.get("body", {})
+ if isinstance(body, dict):
+ transcript = body.get("transcript", {})
+ if isinstance(transcript, dict):
+ text = transcript.get("text", "")
+ if text:
+ texts.append(text)
+ # Also check segments for text
+ segments = body.get("segments", [])
+ for seg in segments:
+ seg_text = seg.get("text", "")
+ if seg_text:
+ texts.append(seg_text)
+
+ # Handle standard transcription format
+ elif analysis_type == "transcription":
+ body = analysis.get("body", "")
+ if isinstance(body, str):
+ texts.append(body)
+ elif isinstance(body, dict):
+ text = body.get("text", body.get("transcript", ""))
+ if text:
+ texts.append(text)
+
+ if texts:
+ return " ".join(texts)
+ return None
+
+
+def find_keywords(text: str, keywords: List[str], case_sensitive: bool = False) -> Set[str]:
+ """Find which keywords are present in the text."""
+ if not case_sensitive:
+ text = text.lower()
+
+ found = set()
+ for keyword in keywords:
+ search_keyword = keyword if case_sensitive else keyword.lower()
+ if search_keyword in text:
+ found.add(keyword)
+
+ return found
+
+
+def run(
+ vcon_uuid: str,
+ link_name: str,
+ opts: Dict[str, Any] = None,
+) -> Optional[str]:
+ """Process a vCon and add keyword-based tags."""
+ merged_opts = default_options.copy()
+ if opts:
+ merged_opts.update(opts)
+ opts = merged_opts
+
+ logger.info(f"Starting keyword_tagger for vCon: {vcon_uuid}")
+
+ vcon_redis = VconRedis()
+ vcon = vcon_redis.get_vcon(vcon_uuid)
+
+ if not vcon:
+ logger.error(f"keyword_tagger: vCon {vcon_uuid} not found")
+ return vcon_uuid
+
+ # Get transcription text
+ text = get_transcription_text(vcon)
+
+ if not text:
+ logger.debug(f"No transcription found for vCon {vcon_uuid}")
+ return vcon_uuid
+
+ logger.debug(f"Analyzing transcription ({len(text)} chars) for vCon {vcon_uuid}")
+
+ case_sensitive = opts.get("case_sensitive", False)
+ enabled_categories = opts.get("categories") # None = all
+ custom_keywords = opts.get("custom_keywords", {})
+
+ tags_added = []
+
+ # Process built-in keyword rules
+ for category, rules in KEYWORD_RULES.items():
+ # Skip if category filtering is enabled and this category is not in the list
+ if enabled_categories is not None and category not in enabled_categories:
+ continue
+
+ for tag_name, keywords in rules.items():
+ found = find_keywords(text, keywords, case_sensitive)
+ if found:
+ vcon.add_tag(tag_name=tag_name, tag_value=",".join(sorted(found)))
+ tags_added.append(tag_name)
+ logger.debug(f"Added tag '{tag_name}' (matched: {found})")
+
+ # Process custom keywords
+ for tag_name, keywords in custom_keywords.items():
+ if isinstance(keywords, str):
+ keywords = [keywords]
+ found = find_keywords(text, keywords, case_sensitive)
+ if found:
+ vcon.add_tag(tag_name=tag_name, tag_value=",".join(sorted(found)))
+ tags_added.append(tag_name)
+ logger.debug(f"Added custom tag '{tag_name}' (matched: {found})")
+
+ if tags_added:
+ vcon_redis.store_vcon(vcon)
+ logger.info(f"Added {len(tags_added)} tags to vCon {vcon_uuid}: {tags_added}")
+ else:
+ logger.debug(f"No keyword matches for vCon {vcon_uuid}")
+
+ return vcon_uuid
diff --git a/server/links/webhook/__init__.py b/server/links/webhook/__init__.py
index 16b433c..28c8afd 100644
--- a/server/links/webhook/__init__.py
+++ b/server/links/webhook/__init__.py
@@ -33,6 +33,10 @@ def run(
# The webhook needs a stringified JSON version.
json_dict = vCon.to_dict()
+ # Ensure vcon version is 0.3.0 for compatibility with vcon-mcp REST API
+ if json_dict.get("vcon") == "0.0.1" or "vcon" not in json_dict:
+ json_dict["vcon"] = "0.3.0"
+
# Build headers from configuration
headers = opts.get("headers", {})
diff --git a/server/links/wtf_transcribe/README.md b/server/links/wtf_transcribe/README.md
new file mode 100644
index 0000000..6b22fdc
--- /dev/null
+++ b/server/links/wtf_transcribe/README.md
@@ -0,0 +1,140 @@
+# WTF Transcription Link (vfun Integration)
+
+A link that sends vCon audio dialogs to a vfun transcription server and adds the results as WTF (World Transcription Format) analysis entries.
+
+## Overview
+
+This link integrates with the vfun transcription server to provide:
+- Multi-language speech recognition (English + auto-detect)
+- Speaker diarization (who spoke when)
+- GPU-accelerated processing with CUDA
+- WTF-compliant output format per IETF draft-howe-vcon-wtf-extension-01
+
+## Configuration
+
+```yaml
+wtf_transcribe:
+ module: links.wtf_transcribe
+ options:
+ # Required: URL of the vfun transcription server
+ vfun-server-url: http://localhost:8443/transcribe
+
+ # Optional: Enable speaker diarization (default: true)
+ diarize: true
+
+ # Optional: Request timeout in seconds (default: 300)
+ timeout: 300
+
+ # Optional: Minimum dialog duration to transcribe in seconds (default: 5)
+ min-duration: 5
+
+ # Optional: API key for vfun server authentication
+ api-key: your-api-key-here
+```
+
+## How It Works
+
+1. **Extract Audio**: Reads audio from vCon dialog (supports `body` with base64/base64url encoding, or `url` with file:// or http:// references)
+2. **Send to vfun**: POSTs audio file to vfun's `/transcribe` endpoint
+3. **Create WTF Analysis**: Formats the transcription result as a WTF analysis entry
+4. **Update vCon**: Adds the WTF analysis to the vCon and stores it back to Redis
+
+## Output Format
+
+The link adds analysis entries with the WTF format:
+
+```json
+{
+ "type": "wtf_transcription",
+ "dialog": 0,
+ "mediatype": "application/json",
+ "vendor": "vfun",
+ "product": "parakeet-tdt-110m",
+ "schema": "wtf-1.0",
+ "encoding": "json",
+ "body": {
+ "transcript": {
+ "text": "Hello, how can I help you today?",
+ "language": "en-US",
+ "duration": 30.0,
+ "confidence": 0.95
+ },
+ "segments": [
+ {
+ "id": 0,
+ "start": 0.0,
+ "end": 3.5,
+ "text": "Hello, how can I help you today?",
+ "confidence": 0.95,
+ "speaker": 0
+ }
+ ],
+ "metadata": {
+ "created_at": "2024-01-15T10:30:00Z",
+ "processed_at": "2024-01-15T10:30:05Z",
+ "provider": "vfun",
+ "model": "parakeet-tdt-110m"
+ },
+ "speakers": {
+ "0": {
+ "id": 0,
+ "label": "Speaker 0",
+ "segments": [0],
+ "total_time": 15.2
+ }
+ },
+ "quality": {
+ "average_confidence": 0.95,
+ "multiple_speakers": true,
+ "low_confidence_words": 0
+ }
+ }
+}
+```
+
+## Behavior
+
+- **Skips non-recording dialogs**: Only processes dialogs with `type: "recording"`
+- **Skips already transcribed**: Dialogs with existing WTF transcriptions are skipped
+- **Duration filtering**: Dialogs shorter than `min-duration` are skipped
+- **File URL support**: Can read audio from local `file://` URLs directly
+
+## Example Chain Configuration
+
+```yaml
+chains:
+ transcription_chain:
+ links:
+ - tag
+ - wtf_transcribe
+ - supabase_webhook
+ ingress_lists:
+ - transcribe
+ egress_lists:
+ - transcribed
+ enabled: 1
+```
+
+## vfun Server
+
+The vfun server provides GPU-accelerated transcription:
+
+```bash
+# Start vfun server
+cd /path/to/vfun
+./vfun server
+
+# Test health
+curl http://localhost:8443/ping
+
+# Manual transcription test
+curl -X POST http://localhost:8443/transcribe \
+ -H "Authorization: Bearer YOUR_API_KEY" \
+ -F "file=@audio.wav" \
+ -F "diarize=true"
+```
+
+## Related
+
+- [vfun](https://github.com/strolid/vfun) - GPU-accelerated transcription server
+- [draft-howe-vcon-wtf-extension](https://datatracker.ietf.org/doc/html/draft-howe-vcon-wtf-extension) - IETF WTF specification
diff --git a/server/links/wtf_transcribe/__init__.py b/server/links/wtf_transcribe/__init__.py
new file mode 100644
index 0000000..7c07407
--- /dev/null
+++ b/server/links/wtf_transcribe/__init__.py
@@ -0,0 +1,340 @@
+"""WTF Transcription Link (vfun integration)
+
+This link sends vCon audio dialogs to a vfun transcription server and adds
+the results as WTF (World Transcription Format) analysis entries.
+
+The vfun server provides:
+- Multi-language speech recognition (English + auto-detect)
+- Speaker diarization (who spoke when)
+- GPU-accelerated processing with CUDA
+
+Configuration options:
+ vfun-server-url: URL of the vfun transcription server (required)
+ diarize: Enable speaker diarization (default: true)
+ timeout: Request timeout in seconds (default: 300)
+ min-duration: Minimum dialog duration to transcribe in seconds (default: 5)
+ api-key: Optional API key for vfun server authentication
+
+Example configuration in config.yml:
+ wtf_transcribe:
+ module: links.wtf_transcribe
+ options:
+ vfun-server-url: http://localhost:8443/transcribe
+ diarize: true
+ timeout: 300
+ min-duration: 5
+ api-key: your-api-key-here
+"""
+
+import base64
+import json
+import logging
+import os
+import tempfile
+import requests
+from datetime import datetime, timezone
+from typing import Optional, Dict, Any, List
+
+from server.lib.vcon_redis import VconRedis
+from lib.logging_utils import init_logger
+from lib.error_tracking import init_error_tracker
+
+init_error_tracker()
+logger = init_logger(__name__)
+
+default_options = {
+ "vfun-server-url": None,
+ "diarize": True,
+ "timeout": 300,
+ "min-duration": 5,
+ "api-key": None,
+}
+
+
+def has_wtf_transcription(vcon: Any, dialog_index: int) -> bool:
+ """Check if a dialog already has a WTF transcription."""
+ for analysis in vcon.analysis:
+ if (analysis.get("type") == "wtf_transcription" and
+ analysis.get("dialog") == dialog_index):
+ return True
+ return False
+
+
+def should_transcribe_dialog(dialog: Dict[str, Any], min_duration: float) -> bool:
+ """Check if a dialog should be transcribed."""
+ if dialog.get("type") != "recording":
+ return False
+ if not dialog.get("body") and not dialog.get("url"):
+ return False
+ duration = dialog.get("duration")
+ if duration is not None and float(duration) < min_duration:
+ return False
+ return True
+
+
+def get_audio_content(dialog: Dict[str, Any]) -> Optional[bytes]:
+ """Extract audio content from dialog body or URL."""
+ if dialog.get("body"):
+ encoding = dialog.get("encoding", "base64")
+ if encoding == "base64url":
+ return base64.urlsafe_b64decode(dialog["body"])
+ elif encoding == "base64":
+ return base64.b64decode(dialog["body"])
+ else:
+ return dialog["body"].encode() if isinstance(dialog["body"], str) else dialog["body"]
+
+ if dialog.get("url"):
+ url = dialog["url"]
+ if url.startswith("file://"):
+ filepath = url[7:]
+ try:
+ with open(filepath, "rb") as f:
+ return f.read()
+ except Exception as e:
+ logger.error(f"Failed to read file {filepath}: {e}")
+ return None
+ else:
+ try:
+ resp = requests.get(url, timeout=60)
+ resp.raise_for_status()
+ return resp.content
+ except Exception as e:
+ logger.error(f"Failed to fetch URL {url}: {e}")
+ return None
+ return None
+
+
+def create_wtf_analysis(
+ dialog_index: int,
+ vfun_response: Dict[str, Any],
+ duration: float,
+) -> Dict[str, Any]:
+ """Create a WTF analysis entry from vfun response."""
+ now = datetime.now(timezone.utc).isoformat()
+
+ # Extract text and segments from vfun response
+ # vfun returns: analysis[].body with transcription data
+ analysis_entries = vfun_response.get("analysis", [])
+
+ full_text = ""
+ segments = []
+ language = "en-US"
+
+ for entry in analysis_entries:
+ if entry.get("type") in ("transcription", "wtf_transcription"):
+ body = entry.get("body", {})
+
+ # Handle different response formats
+ if isinstance(body, dict):
+ # WTF format from vfun
+ transcript = body.get("transcript", {})
+ full_text = transcript.get("text", body.get("text", ""))
+ language = transcript.get("language", body.get("language", "en-US"))
+ segments = body.get("segments", [])
+ elif isinstance(body, str):
+ full_text = body
+ break
+
+ # If no analysis found, check for direct text field
+ if not full_text:
+ full_text = vfun_response.get("text", "")
+ segments = vfun_response.get("segments", [])
+
+ # Calculate confidence
+ if segments:
+ confidences = [s.get("confidence", 0.9) for s in segments]
+ avg_confidence = sum(confidences) / len(confidences)
+ else:
+ avg_confidence = 0.9
+
+ # Build WTF segments
+ wtf_segments = []
+ for i, seg in enumerate(segments):
+ wtf_seg = {
+ "id": seg.get("id", i),
+ "start": float(seg.get("start", seg.get("start_time", 0.0))),
+ "end": float(seg.get("end", seg.get("end_time", 0.0))),
+ "text": seg.get("text", seg.get("transcription", "")),
+ "confidence": float(seg.get("confidence", 0.9)),
+ }
+ if "speaker" in seg:
+ wtf_seg["speaker"] = seg["speaker"]
+ wtf_segments.append(wtf_seg)
+
+ # Build speakers section
+ speakers = {}
+ for seg in wtf_segments:
+ speaker = seg.get("speaker")
+ if speaker is not None:
+ speaker_key = str(speaker)
+ if speaker_key not in speakers:
+ speakers[speaker_key] = {
+ "id": speaker,
+ "label": f"Speaker {speaker}",
+ "segments": [],
+ "total_time": 0.0,
+ }
+ speakers[speaker_key]["segments"].append(seg["id"])
+ speakers[speaker_key]["total_time"] += seg["end"] - seg["start"]
+
+ # Build WTF body
+ wtf_body = {
+ "transcript": {
+ "text": full_text,
+ "language": language,
+ "duration": float(duration),
+ "confidence": float(avg_confidence),
+ },
+ "segments": wtf_segments,
+ "metadata": {
+ "created_at": now,
+ "processed_at": now,
+ "provider": "vfun",
+ "model": "parakeet-tdt-110m",
+ "audio": {
+ "duration": float(duration),
+ },
+ },
+ "quality": {
+ "average_confidence": float(avg_confidence),
+ "multiple_speakers": len(speakers) > 1,
+ "low_confidence_words": sum(1 for s in wtf_segments if s.get("confidence", 1.0) < 0.5),
+ },
+ }
+
+ if speakers:
+ wtf_body["speakers"] = speakers
+
+ return {
+ "type": "wtf_transcription",
+ "dialog": dialog_index,
+ "mediatype": "application/json",
+ "vendor": "vfun",
+ "product": "parakeet-tdt-110m",
+ "schema": "wtf-1.0",
+ # Note: encoding omitted since body is a direct object, not a JSON string
+ "body": wtf_body,
+ }
+
+
+def run(
+ vcon_uuid: str,
+ link_name: str,
+ opts: Dict[str, Any] = None,
+) -> Optional[str]:
+ """Process a vCon through the vfun transcription service."""
+ merged_opts = default_options.copy()
+ if opts:
+ merged_opts.update(opts)
+ opts = merged_opts
+
+ logger.info(f"Starting wtf_transcribe link for vCon: {vcon_uuid}")
+
+ vfun_server_url = opts.get("vfun-server-url")
+ if not vfun_server_url:
+ logger.error("wtf_transcribe: vfun-server-url is required")
+ return vcon_uuid
+
+ vcon_redis = VconRedis()
+ vcon = vcon_redis.get_vcon(vcon_uuid)
+
+ if not vcon:
+ logger.error(f"wtf_transcribe: vCon {vcon_uuid} not found")
+ return vcon_uuid
+
+ # Find dialogs to transcribe
+ dialogs_processed = 0
+ dialogs_skipped = 0
+
+ for i, dialog in enumerate(vcon.dialog):
+ if not should_transcribe_dialog(dialog, opts.get("min-duration", 5)):
+ logger.debug(f"Skipping dialog {i} (not eligible)")
+ dialogs_skipped += 1
+ continue
+
+ if has_wtf_transcription(vcon, i):
+ logger.debug(f"Skipping dialog {i} (already transcribed)")
+ dialogs_skipped += 1
+ continue
+
+ # Get audio content
+ audio_content = get_audio_content(dialog)
+ if not audio_content:
+ logger.warning(f"Could not extract audio from dialog {i}")
+ dialogs_skipped += 1
+ continue
+
+ logger.info(f"Transcribing dialog {i} for vCon {vcon_uuid}")
+
+ try:
+ # Build request to vfun server
+ headers = {}
+ api_key = opts.get("api-key")
+ if api_key:
+ headers["Authorization"] = f"Bearer {api_key}"
+
+ # Get filename from dialog or generate one
+ filename = dialog.get("filename", f"audio_{i}.wav")
+ mimetype = dialog.get("mimetype", "audio/wav")
+
+ # Send audio to vfun server
+ files = {"file": (filename, audio_content, mimetype)}
+ data = {
+ "diarize": str(opts.get("diarize", True)),
+ "block": "true",
+ }
+
+ response = requests.post(
+ vfun_server_url,
+ files=files,
+ data=data,
+ headers=headers,
+ timeout=opts.get("timeout", 300),
+ )
+
+ if response.status_code in (200, 302):
+ vfun_response = response.json()
+ # Handle double-encoded JSON (vfun sometimes returns JSON string)
+ if isinstance(vfun_response, str):
+ vfun_response = json.loads(vfun_response)
+
+ duration = dialog.get("duration", 30.0)
+ wtf_analysis = create_wtf_analysis(i, vfun_response, float(duration))
+
+ # Add analysis to vCon
+ vcon.add_analysis(
+ type=wtf_analysis["type"],
+ dialog=wtf_analysis["dialog"],
+ vendor=wtf_analysis.get("vendor"),
+ body=wtf_analysis["body"],
+ extra={
+ "mediatype": wtf_analysis.get("mediatype"),
+ "product": wtf_analysis.get("product"),
+ "schema": wtf_analysis.get("schema"),
+ },
+ )
+
+ dialogs_processed += 1
+ logger.info(f"Added WTF transcription for dialog {i}")
+
+ else:
+ logger.error(
+ f"vfun transcription failed for dialog {i}: "
+ f"status={response.status_code}, response={response.text[:200]}"
+ )
+
+ except requests.exceptions.Timeout:
+ logger.error(f"vfun transcription timed out for dialog {i}")
+ except Exception as e:
+ logger.error(f"Error transcribing dialog {i}: {e}", exc_info=True)
+
+ if dialogs_processed > 0:
+ vcon_redis.store_vcon(vcon)
+ logger.info(
+ f"Updated vCon {vcon_uuid}: processed={dialogs_processed}, "
+ f"skipped={dialogs_skipped}"
+ )
+ else:
+ logger.info(f"No dialogs transcribed for vCon {vcon_uuid}")
+
+ return vcon_uuid
diff --git a/signoz/README.md b/signoz/README.md
new file mode 100644
index 0000000..cc38424
--- /dev/null
+++ b/signoz/README.md
@@ -0,0 +1,176 @@
+# SigNoz Observability Stack for vcon-server
+
+This directory contains the configuration for SigNoz, a self-hosted observability platform that collects traces, metrics, and logs from the vcon-mcp server via OpenTelemetry.
+
+## Architecture
+
+```
+┌─────────────────┐ OTLP/HTTP ┌──────────────────────┐
+│ vcon-mcp │ ─────────────────► │ signoz-otel-collector│
+│ (instrumented) │ :4318 │ (OTLP receiver) │
+└─────────────────┘ └──────────┬───────────┘
+ │
+ ▼
+┌─────────────────┐ ┌──────────────────────┐
+│ signoz (UI) │ ◄────────────────► │ signoz-clickhouse │
+│ :3301 │ TCP :9000 │ (time-series DB) │
+└─────────────────┘ └──────────┬───────────┘
+ │
+ ▼
+ ┌──────────────────────┐
+ │ signoz-zookeeper │
+ │ (coordination) │
+ └──────────────────────┘
+```
+
+## Components
+
+| Service | Image | Purpose | Ports |
+|---------|-------|---------|-------|
+| signoz | `signoz/query-service:latest` | Query API + Web UI | 3301 (mapped from 8080) |
+| signoz-otel-collector | `signoz/signoz-otel-collector:latest` | OTLP ingestion | 4317 (gRPC), 4318 (HTTP) |
+| signoz-clickhouse | `clickhouse/clickhouse-server:24.1.2-alpine` | Time-series storage | 8123, 9000 (internal) |
+| signoz-zookeeper | `zookeeper:3.9` | ClickHouse coordination | 2181 (internal) |
+
+## Configuration Files
+
+### otel-collector-config.yaml
+OpenTelemetry Collector pipeline configuration:
+- **Receivers**: OTLP gRPC (4317) and HTTP (4318)
+- **Processors**: Batch processing
+- **Exporters**: ClickHouse for traces, metrics, and logs
+
+### zz-clickhouse-config.xml
+ClickHouse server configuration:
+- IPv4 listening (0.0.0.0)
+- Single-node cluster named "cluster" (required by SigNoz schema migrator)
+- ZooKeeper integration for distributed DDL
+
+### clickhouse-users.xml
+ClickHouse user permissions with default user having full access.
+
+### alertmanager.yml
+Basic alertmanager configuration (not currently active).
+
+## Usage
+
+### Start with SigNoz
+
+```bash
+cd /home/thomas/bds/vcon-dev/vcon-server
+docker compose -f docker-compose.yml -f docker-compose.override.yml -f docker-compose.signoz.yml up -d
+```
+
+### Start without SigNoz (normal operation)
+
+```bash
+cd /home/thomas/bds/vcon-dev/vcon-server
+docker compose -f docker-compose.yml -f docker-compose.override.yml up -d
+```
+
+### Stop SigNoz only
+
+```bash
+docker compose -f docker-compose.signoz.yml down
+```
+
+### Access the UI
+
+Open http://localhost:3301 in your browser.
+
+## First-Time Setup
+
+After starting SigNoz for the first time, run the schema migrations:
+
+```bash
+docker run --rm --network conserver \
+ signoz/signoz-schema-migrator:latest \
+ sync --dsn='tcp://signoz-clickhouse:9000'
+```
+
+Note: Some migrations may fail due to JSON type syntax incompatibility with ClickHouse 24.1. Core functionality still works.
+
+## vcon-mcp Integration
+
+The vcon-mcp service is configured with these environment variables in `docker-compose.override.yml`:
+
+```yaml
+environment:
+ OTEL_ENABLED: "true"
+ OTEL_EXPORTER_TYPE: otlp
+ OTEL_ENDPOINT: http://signoz-otel-collector:4318
+ OTEL_SERVICE_NAME: vcon-mcp-server
+```
+
+## Verification
+
+1. Check service health:
+ ```bash
+ curl http://localhost:3301/api/v1/health
+ # Returns: {"status":"ok"}
+ ```
+
+2. Check container status:
+ ```bash
+ docker ps | grep signoz
+ ```
+
+3. View collector logs:
+ ```bash
+ docker logs signoz-otel-collector
+ ```
+
+## Troubleshooting
+
+### ClickHouse won't start
+- Check if port 9000 is in use
+- Verify zookeeper is healthy first
+- Check logs: `docker logs signoz-clickhouse`
+
+### OTEL Collector errors
+- Ensure ClickHouse is healthy before starting collector
+- Verify schema migrations have run
+- Check config syntax: `docker logs signoz-otel-collector`
+
+### No data in UI
+- Verify vcon-mcp is sending data (check its logs for OTEL export messages)
+- Ensure collector is receiving data: check collector metrics at port 8888
+- Verify ClickHouse tables exist: `docker exec signoz-clickhouse clickhouse-client --query "SHOW TABLES FROM signoz_traces"`
+
+### Port conflicts
+- Default ports: 3301 (UI), 4317 (gRPC), 4318 (HTTP)
+- Change in docker-compose.signoz.yml if needed
+
+## Known Issues
+
+1. **Schema Migration Failures**: Some newer SigNoz migrations use JSON column types with syntax not supported in ClickHouse 24.1.2. Core observability works but some advanced features may be limited.
+
+2. **Alertmanager**: Not configured for this deployment. Would require additional setup for alerts.
+
+3. **Health Check Timing**: The OTEL collector health check may show "starting" for extended periods but the service is functional.
+
+## Future Improvements
+
+- Upgrade ClickHouse to latest version for full schema compatibility
+- Add alertmanager configuration for alerts
+- Configure data retention policies
+- Add authentication to SigNoz UI
+- Set up dashboards for vcon-mcp metrics
+
+## Data Persistence
+
+Data is stored in Docker volumes:
+- `signoz_clickhouse_data` - Traces, metrics, logs
+- `signoz_zookeeper_data` - ZooKeeper state
+- `signoz_data` - SigNoz query service state
+
+To reset all data:
+```bash
+docker compose -f docker-compose.signoz.yml down -v
+```
+
+## Resources
+
+- [SigNoz Documentation](https://signoz.io/docs/)
+- [OpenTelemetry Documentation](https://opentelemetry.io/docs/)
+- [ClickHouse Documentation](https://clickhouse.com/docs/)
diff --git a/signoz/alertmanager.yml b/signoz/alertmanager.yml
new file mode 100644
index 0000000..89b0125
--- /dev/null
+++ b/signoz/alertmanager.yml
@@ -0,0 +1,19 @@
+global:
+ resolve_timeout: 5m
+
+route:
+ group_by: ['alertname']
+ group_wait: 10s
+ group_interval: 10s
+ repeat_interval: 1h
+ receiver: 'default-receiver'
+
+receivers:
+ - name: 'default-receiver'
+
+inhibit_rules:
+ - source_match:
+ severity: 'critical'
+ target_match:
+ severity: 'warning'
+ equal: ['alertname', 'dev', 'instance']
diff --git a/signoz/clickhouse-users.xml b/signoz/clickhouse-users.xml
new file mode 100644
index 0000000..c545475
--- /dev/null
+++ b/signoz/clickhouse-users.xml
@@ -0,0 +1,38 @@
+
+
+
+ 10000000000
+ 0
+ random
+ 100
+
+
+ 1
+
+
+
+
+
+
+
+ ::/0
+
+ default
+ default
+ 1
+
+
+
+
+
+
+ 3600
+ 0
+ 0
+ 0
+ 0
+ 0
+
+
+
+
diff --git a/signoz/otel-collector-config.yaml b/signoz/otel-collector-config.yaml
new file mode 100644
index 0000000..eeedb0c
--- /dev/null
+++ b/signoz/otel-collector-config.yaml
@@ -0,0 +1,48 @@
+receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:4317
+ http:
+ endpoint: 0.0.0.0:4318
+
+processors:
+ batch:
+ send_batch_size: 10000
+ timeout: 10s
+
+exporters:
+ clickhousetraces:
+ datasource: tcp://signoz-clickhouse:9000/signoz_traces
+ clickhouselogsexporter:
+ dsn: tcp://signoz-clickhouse:9000/signoz_logs
+ timeout: 5s
+ sending_queue:
+ queue_size: 100
+ retry_on_failure:
+ enabled: true
+ initial_interval: 5s
+ max_interval: 30s
+ max_elapsed_time: 300s
+ signozclickhousemetrics:
+ dsn: tcp://signoz-clickhouse:9000/signoz_metrics
+
+extensions:
+ health_check:
+ endpoint: 0.0.0.0:13133
+
+service:
+ extensions: [health_check]
+ pipelines:
+ traces:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [clickhousetraces]
+ metrics:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [signozclickhousemetrics]
+ logs:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [clickhouselogsexporter]
diff --git a/signoz/zz-clickhouse-config.xml b/signoz/zz-clickhouse-config.xml
new file mode 100644
index 0000000..d50368e
--- /dev/null
+++ b/signoz/zz-clickhouse-config.xml
@@ -0,0 +1,53 @@
+
+
+ warning
+ true
+
+
+ 0.0.0.0
+
+ 4096
+ 3
+ 100
+
+ /var/lib/clickhouse/
+ /var/lib/clickhouse/tmp/
+ /var/lib/clickhouse/user_files/
+
+ users.xml
+ default
+ default
+
+ UTC
+
+ true
+
+ 3600
+
+ 3600
+ 60
+
+
+
+
+
+ signoz-clickhouse
+ 9000
+
+
+
+
+
+
+ cluster
+ 01
+ signoz-clickhouse
+
+
+
+
+ signoz-zookeeper
+ 2181
+
+
+