Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions livekit-agents/livekit/agents/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
STTMetrics,
TTSMetrics,
VADMetrics,
VideoAvatarMetrics,
)
from .usage_collector import UsageCollector, UsageSummary
from .utils import log_metrics
Expand All @@ -21,6 +22,7 @@
"UsageSummary",
"UsageCollector",
"log_metrics",
"VideoAvatarMetrics",
]

# Cleanup docs of unexported modules
Expand Down
24 changes: 24 additions & 0 deletions livekit-agents/livekit/agents/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ class TTSMetrics(BaseModel):
metadata: Metadata | None = None


class VAMetrics(BaseModel):
type: Literal["va_metrics"] = "va_metrics"
label: str
request_id: str
timestamp: float
ttfb: float
audio_duration: float
metadata: Metadata | None = None


class VADMetrics(BaseModel):
type: Literal["vad_metrics"] = "vad_metrics"
label: str
Expand Down Expand Up @@ -88,6 +98,19 @@ class EOUMetrics(BaseModel):
metadata: Metadata | None = None


class VideoAvatarMetrics(BaseModel):
type: Literal["video_avatar_metrics"] = "video_avatar_metrics"
event_id: str
timestamp: float
audio_sent_ts: float
ws_received_ts: float | None = None
video_received_ts: float | None = None
full_latency: float
server_latency: float
video_pipeline_latency: float
metadata: Metadata | None = None


class RealtimeModelMetrics(BaseModel):
class CachedTokenDetails(BaseModel):
audio_tokens: int
Expand Down Expand Up @@ -139,4 +162,5 @@ class OutputTokenDetails(BaseModel):
VADMetrics,
EOUMetrics,
RealtimeModelMetrics,
VideoAvatarMetrics,
]
17 changes: 16 additions & 1 deletion livekit-agents/livekit/agents/metrics/usage_collector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from copy import deepcopy
from dataclasses import dataclass

from .base import AgentMetrics, LLMMetrics, RealtimeModelMetrics, STTMetrics, TTSMetrics
from .base import (
AgentMetrics,
LLMMetrics,
RealtimeModelMetrics,
STTMetrics,
TTSMetrics,
VideoAvatarMetrics,
)


@dataclass
Expand All @@ -21,6 +28,9 @@ class UsageSummary:
tts_characters_count: int = 0
tts_audio_duration: float = 0.0
stt_audio_duration: float = 0.0
full_latency: float = 0.0
server_latency: float = 0.0
video_pipeline_latency: float = 0.0

# properties for naming consistency: prompt = input, completion = output
@property
Expand Down Expand Up @@ -48,6 +58,7 @@ def __call__(self, metrics: AgentMetrics) -> None:
self.collect(metrics)

def collect(self, metrics: AgentMetrics) -> None:
print(type(metrics))
if isinstance(metrics, LLMMetrics):
self._summary.llm_prompt_tokens += metrics.prompt_tokens
self._summary.llm_prompt_cached_tokens += metrics.prompt_cached_tokens
Expand Down Expand Up @@ -87,6 +98,10 @@ def collect(self, metrics: AgentMetrics) -> None:

elif isinstance(metrics, STTMetrics):
self._summary.stt_audio_duration += metrics.audio_duration
elif isinstance(metrics, VideoAvatarMetrics):
self._summary.full_latency += metrics.full_latency or 0
self._summary.server_latency += metrics.server_latency or 0
self._summary.video_pipeline_latency += metrics.video_pipeline_latency or 0

def get_summary(self) -> UsageSummary:
return deepcopy(self._summary)
20 changes: 19 additions & 1 deletion livekit-agents/livekit/agents/metrics/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
import logging

from ..log import logger as default_logger
from .base import AgentMetrics, EOUMetrics, LLMMetrics, RealtimeModelMetrics, STTMetrics, TTSMetrics
from .base import (
AgentMetrics,
EOUMetrics,
LLMMetrics,
RealtimeModelMetrics,
STTMetrics,
TTSMetrics,
VideoAvatarMetrics,
)


def log_metrics(metrics: AgentMetrics, *, logger: logging.Logger | None = None) -> None:
Expand Down Expand Up @@ -83,3 +91,13 @@ def log_metrics(metrics: AgentMetrics, *, logger: logging.Logger | None = None)
"audio_duration": round(metrics.audio_duration, 2),
},
)
elif isinstance(metrics, VideoAvatarMetrics):
logger.info(
"VideoAvatar metrics",
extra=metadata
| {
"full_latency_ms": round(metrics.full_latency * 1000, 2),
"server_latency_ms": round(metrics.server_latency * 1000, 2),
"pipeline_latency_ms": round(metrics.video_pipeline_latency * 1000, 2),
},
)
3 changes: 3 additions & 0 deletions livekit-agents/livekit/agents/voice/avatar/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ._datastream_io import DataStreamAudioOutput, DataStreamAudioReceiver
from ._datastream_io_upd import LatencyAudioOutput, attach_video_latency_listener
from ._queue_io import QueueAudioOutput
from ._runner import AvatarOptions, AvatarRunner
from ._types import AudioReceiver, AudioSegmentEnd, VideoGenerator
Expand All @@ -12,6 +13,8 @@
"QueueAudioOutput",
"DataStreamAudioReceiver",
"DataStreamAudioOutput",
"LatencyAudioOutput",
"attach_video_latency_listener",
]

# Cleanup docs of unexported modules
Expand Down
107 changes: 107 additions & 0 deletions livekit-agents/livekit/agents/voice/avatar/_datastream_io_upd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import asyncio
import time
import uuid
from collections import deque

from livekit import rtc

from ...log import logger
from ...metrics import VideoAvatarMetrics
from ._datastream_io import DataStreamAudioOutput


class LatencyAudioOutput(DataStreamAudioOutput):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._audio_send_times = (
deque()
) # deque of {'send_ts': float, 'ingest_ts': float | None, 'duration': float}
self._total_audio_sent = 0
self._lock = asyncio.Lock()

async def capture_frame(self, frame: rtc.AudioFrame):
async with self._lock:
send_ts = time.time()
ingest_ts = frame.userdata.get("ingest_ts")

# add to this audio chunk with its timestamps
self._audio_send_times.append(
{"send_ts": send_ts, "ingest_ts": ingest_ts, "duration": frame.duration}
)
self._total_audio_sent += 1

logger.debug(
f"Audio pushed: total={self._total_audio_sent}, pending={len(self._audio_send_times)}"
)

await super().capture_frame(frame)

async def pop_audio_timing(self):
"""Pop the oldest audio timing for latency calculation"""
async with self._lock:
# print(f'dequeue {self._audio_send_times}')
if self._audio_send_times:
timm = self._audio_send_times.popleft()
print("TIMM. ", timm)
return timm
return None


def attach_video_latency_listener(room: rtc.Room, audio_output: LatencyAudioOutput, avatar_session):
@room.on("track_subscribed")
def subscribed(
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
):
logger.info(
f"Track subscribed: kind={publication.kind}, participant={participant.identity}"
)

if publication.kind != rtc.TrackKind.KIND_VIDEO:
return

if participant.identity != avatar_session._avatar_participant_identity:
logger.info(f"Skipping non-avatar participant: {participant.identity}")
return

if isinstance(track, rtc.RemoteVideoTrack):
logger.info("Setting up video frame listener for avatar")

async def listen_video_frames():
try:
video_stream = rtc.VideoStream(track)
logger.info("Video stream created, waiting for frames...")

async for _frame in video_stream:
timing = (
await audio_output.pop_audio_timing()
) # get the matching audio chunk
if timing:
recv_ts = time.time() # when video frame arrives
metrics = VideoAvatarMetrics(
event_id=str(uuid.uuid4()),
timestamp=time.time(),
audio_sent_ts=timing["send_ts"],
ws_received_ts=timing.get("ingest_ts") or timing["send_ts"],
video_received_ts=recv_ts,
full_latency=recv_ts - timing["send_ts"],
server_latency=(timing.get("ingest_ts") or timing["send_ts"])
- timing["send_ts"],
video_pipeline_latency=recv_ts
- (timing.get("ingest_ts") or timing["send_ts"]),
)
avatar_session.emit("metrics_collected", metrics)
else:
# idle animation
# logger.debug(f"Video frame {frame_count} received (no pending audio)")
pass

logger.info("Video stream ended")
except Exception as e:
logger.error(f"Error in video frame listener: {e}", exc_info=True)

task = asyncio.create_task(listen_video_frames())
avatar_session._latency_tasks.add(task)
task.add_done_callback(avatar_session._latency_tasks.discard)
logger.info("Video frame listener task created")
Loading
Loading