Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/lib/vcon_redis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Optional
from lib.logging_utils import init_logger
from lib.metrics import increment_counter
from redis.commands.json.path import Path
from redis_mgr import redis
from settings import VCON_REDIS_EXPIRY
Expand Down Expand Up @@ -49,6 +50,7 @@ def get_vcon(self, vcon_id: str) -> Optional[vcon.Vcon]:
f"vcon:{vcon_id}", Path.root_path()
)
if not vcon_dict:
increment_counter("conserver.lib.vcon_redis.get_vcon_not_found")
return None
_vcon = vcon.Vcon(vcon_dict)
return _vcon
Expand Down
4 changes: 2 additions & 2 deletions conserver/links/analyze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,14 @@ def run(
)
increment_counter(
"conserver.link.openai.analysis_failures",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise e

record_histogram(
"conserver.link.openai.analysis_time",
time.time() - start,
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

vendor_schema = {}
Expand Down
8 changes: 4 additions & 4 deletions conserver/links/analyze_and_label/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ def run(
increment_counter(
"conserver.link.openai.labels_added",
value=len(labels),
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON response for vCon {vcon_uuid}: {e}")
increment_counter(
"conserver.link.openai.json_parse_failures",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
# Add the raw text anyway as the analysis
vCon.add_analysis(
Expand All @@ -182,14 +182,14 @@ def run(
)
increment_counter(
"conserver.link.openai.analysis_failures",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise e

record_histogram(
"conserver.link.openai.analysis_time",
time.time() - start,
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

vcon_redis.store_vcon(vCon)
Expand Down
8 changes: 4 additions & 4 deletions conserver/links/analyze_vcon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ def run(
)
increment_counter(
"conserver.link.openai.invalid_json",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise ValueError("Invalid JSON response from OpenAI")

except Exception as e:
logger.error(
"Failed to generate analysis for vCon %s after multiple retries: %s",
Expand All @@ -160,14 +160,14 @@ def run(
)
increment_counter(
"conserver.link.openai.analysis_failures",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise e

record_histogram(
"conserver.link.openai.analysis_time",
time.time() - start,
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

vendor_schema = {}
Expand Down
6 changes: 3 additions & 3 deletions conserver/links/check_and_tag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def run(
logger.info(f"Applied tag: {opts['tag_name']}:{opts['tag_value']} (evaluation: {applies})")
increment_counter(
"conserver.link.openai.tags_applied",
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value']},
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
else:
logger.info(f"Tag not applied: {opts['tag_name']}:{opts['tag_value']} (evaluation: {applies})")
Expand All @@ -194,14 +194,14 @@ def run(
)
increment_counter(
"conserver.link.openai.evaluation_failures",
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value']},
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise e

record_histogram(
"conserver.link.openai.evaluation_time",
time.time() - start,
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value']},
attributes={"analysis_type": opts['analysis_type'], "tag_name": opts['tag_name'], "tag_value": opts['tag_value'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

vcon_redis.store_vcon(vCon)
Expand Down
13 changes: 12 additions & 1 deletion conserver/links/datatrails/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from fastapi import HTTPException
from lib.vcon_redis import VconRedis
from lib.logging_utils import init_logger
from lib.metrics import increment_counter
from starlette.status import HTTP_404_NOT_FOUND, HTTP_501_NOT_IMPLEMENTED
from vcon import Vcon

Expand Down Expand Up @@ -382,7 +383,13 @@ def run(vcon_uuid: str, link_name: str, opts: dict = default_options) -> str:
# }
# )

event = create_asset_event(opts, asset_id, auth, event_attributes)
attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}

try:
event = create_asset_event(opts, asset_id, auth, event_attributes)
except Exception:
increment_counter("conserver.link.datatrails.event_creation_failures", attributes=attrs)
raise
event_id = event["identity"]
logger.info(f"DataTrails: Event Created: {event_id}")

Expand All @@ -395,6 +402,10 @@ def run(vcon_uuid: str, link_name: str, opts: dict = default_options) -> str:
event_id = event["identity"]
logger.info(f"DataTrails: New Event Created: {event_id}")
except:
increment_counter(
"conserver.link.datatrails.event_creation_failures",
attributes={**attrs, "event_type": "asset_free"},
)
logger.info(f"DataTrails: New Event Creation Failure")


Expand Down
11 changes: 6 additions & 5 deletions conserver/links/deepgram_link/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def run(
logger.info("Dialog %s already transcribed on vCon: %s", index, vCon.uuid)
continue

attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}
start = time.time()
try:
if opts.get("LITELLM_PROXY_URL") and opts.get("LITELLM_MASTER_KEY"):
Expand All @@ -248,26 +249,26 @@ def run(
result = transcribe_dg(dg_client, dialog, opts["api"], vcon_uuid=vcon_uuid, run_opts=opts)
except Exception as e:
logger.error("Failed to transcribe vCon %s after multiple retries: %s", vcon_uuid, e, exc_info=True)
increment_counter("conserver.link.deepgram.transcription_failures")
increment_counter("conserver.link.deepgram.transcription_failures", attributes=attrs)
raise e
elapsed = time.time() - start
record_histogram("conserver.link.deepgram.transcription_time", elapsed)
record_histogram("conserver.link.deepgram.transcription_time", elapsed, attributes=attrs)
logger.info(f"Transcription for dialog {index} took {elapsed:.2f} seconds.")

if not result:
logger.warning("No transcription generated for vCon %s, dialog %s", vcon_uuid, index)
increment_counter("conserver.link.deepgram.transcription_failures")
increment_counter("conserver.link.deepgram.transcription_failures", attributes=attrs)
break

# Log and track confidence (not available for LiteLLM/OpenAI-format transcription)
confidence = result.get("confidence")
if confidence is not None:
record_histogram("conserver.link.deepgram.confidence", confidence)
record_histogram("conserver.link.deepgram.confidence", confidence, attributes=attrs)
logger.info(f"Transcription confidence for dialog {index}: {confidence}")
# If the confidence is too low, don't store the transcript
if confidence < opts["minimum_confidence"]:
logger.warning("Low confidence result for vCon %s, dialog %s: %s", vcon_uuid, index, confidence)
increment_counter("conserver.link.deepgram.transcription_failures")
increment_counter("conserver.link.deepgram.transcription_failures", attributes=attrs)
continue
else:
logger.info(f"Confidence not available for dialog {index} (LiteLLM path), skipping threshold check")
Expand Down
6 changes: 3 additions & 3 deletions conserver/links/detect_engagement/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def run(
increment_counter(
"conserver.link.openai.engagement_detected",
value=1 if is_engaged else 0,
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

except Exception as e:
Expand All @@ -158,14 +158,14 @@ def run(
)
increment_counter(
"conserver.link.openai.engagement_analysis_failures",
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)
raise e

record_histogram(
"conserver.link.openai.engagement_analysis_time",
time.time() - start,
attributes={"analysis_type": opts['analysis_type']},
attributes={"analysis_type": opts['analysis_type'], "link.name": link_name, "vcon.uuid": vcon_uuid},
)

vcon_redis.store_vcon(vCon)
Expand Down
5 changes: 5 additions & 0 deletions conserver/links/diet/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from redis_mgr import redis
from lib.logging_utils import init_logger
from lib.metrics import increment_counter
import json
import requests
import uuid
Expand Down Expand Up @@ -134,6 +135,8 @@ def run(vcon_uuid, link_name, opts=default_options):
for key, value in options.items():
logger.info("diet::%s: %s", key, _redact_option_value(key, value))

attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}

# Load vCon from Redis using JSON.GET
vcon = redis.json().get(f"vcon:{vcon_uuid}")
if not vcon:
Expand Down Expand Up @@ -180,9 +183,11 @@ def run(vcon_uuid, link_name, opts=default_options):
else:
dialog["body"] = ""
else:
increment_counter("conserver.link.diet.media_post_failures", attributes=attrs)
logger.error(f"Failed to post media: {response.status_code}")
dialog["body"] = ""
except Exception as e:
increment_counter("conserver.link.diet.media_post_failures", attributes=attrs)
logger.error(f"Exception posting media: {e}")
dialog["body"] = ""
else:
Expand Down
12 changes: 7 additions & 5 deletions conserver/links/groq_whisper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ def run(
vCon.uuid)
continue

attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}
try:
# Attempt transcription with timing metrics
start = time.time()
Expand All @@ -329,25 +330,26 @@ def run(
result = transcribe_groq_whisper(dialog, opts)
record_histogram(
"conserver.link.groq_whisper.transcription_time",
time.time() - start
time.time() - start,
attributes=attrs,
)
except RetryError as re:
logger.error(
"Failed to transcribe vCon %s after multiple retry attempts: %s",
vcon_uuid, re)
increment_counter("conserver.link.groq_whisper.transcription_failures")
increment_counter("conserver.link.groq_whisper.transcription_failures", attributes=attrs)
break
except Exception as e:
logger.error(
"Unexpected error transcribing vCon %s: %s",
vcon_uuid, e)
increment_counter("conserver.link.groq_whisper.transcription_failures")
increment_counter("conserver.link.groq_whisper.transcription_failures", attributes=attrs)
break

if not result:
logger.warning("No transcription generated for vCon %s", vcon_uuid)
increment_counter(
"conserver.link.groq_whisper.transcription_failures")
"conserver.link.groq_whisper.transcription_failures", attributes=attrs)
break

logger.info("Transcribed vCon: %s", vCon.uuid)
Expand All @@ -357,7 +359,7 @@ def run(
# Check if result is a successful transcription
if not hasattr(result, 'text'):
logger.warning(f"Unexpected result format: {result}")
increment_counter("conserver.link.groq_whisper.transcription_failures")
increment_counter("conserver.link.groq_whisper.transcription_failures", attributes=attrs)
break

# Handle different response formats from the Groq API
Expand Down
7 changes: 4 additions & 3 deletions conserver/links/hugging_face_whisper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,20 +184,21 @@ def run(
logger.info("Dialog %s already transcribed on vCon: %s", index, vCon.uuid)
continue

attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}
try:
# Attempt transcription with timing metrics
start = time.time()
logger.debug("Transcribing dialog %s in vCon: %s", index, vCon.uuid)
result = transcribe_hugging_face_whisper(dialog, opts)
record_histogram("conserver.link.hugging_face_whisper.transcription_time", time.time() - start)
record_histogram("conserver.link.hugging_face_whisper.transcription_time", time.time() - start, attributes=attrs)
except (RetryError, Exception) as e:
logger.error("Failed to transcribe vCon %s after multiple retries: %s", vcon_uuid, e)
increment_counter("conserver.link.hugging_face_whisper.transcription_failures")
increment_counter("conserver.link.hugging_face_whisper.transcription_failures", attributes=attrs)
break

if not result:
logger.warning("No transcription generated for vCon %s", vcon_uuid)
increment_counter("conserver.link.hugging_face_whisper.transcription_failures")
increment_counter("conserver.link.hugging_face_whisper.transcription_failures", attributes=attrs)
break

logger.info("Transcribed vCon: %s", vCon.uuid)
Expand Down
5 changes: 3 additions & 2 deletions conserver/links/hugging_llm_link/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,14 @@ def process_vcon(self, vcon_uuid: str, link_name: str) -> str:
logger.info("No transcript found in vCon: %s", vcon_uuid)
return vcon_uuid

attrs = {"link.name": link_name, "vcon.uuid": vcon_uuid}
try:
start = time.time()
result = self.llm.analyze(transcript_text)
record_histogram("conserver.link.huggingface.llm_time", time.time() - start)
record_histogram("conserver.link.huggingface.llm_time", time.time() - start, attributes=attrs)
except (RetryError, Exception) as e:
logger.error("Failed to analyze vCon %s: %s", vcon_uuid, str(e))
increment_counter("conserver.link.huggingface.llm_failures")
increment_counter("conserver.link.huggingface.llm_failures", attributes=attrs)
return vcon_uuid

self._add_analysis_to_vcon(vcon, result)
Expand Down
Loading
Loading