diff --git a/Cargo.lock b/Cargo.lock index 76f41ed1d..bd81acb28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1762,6 +1762,20 @@ dependencies = [ "serde", ] +[[package]] +name = "encoded_video" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "env_logger 0.11.8", + "libwebrtc", + "livekit", + "livekit-api", + "log", + "tokio", +] + [[package]] name = "encoding_rs" version = "0.8.35" diff --git a/Cargo.toml b/Cargo.toml index 760ee6366..e01bf8ff3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "examples/agent_dispatch", "examples/api", "examples/basic_room", + "examples/encoded_video", "examples/basic_text_stream", "examples/encrypted_text_stream", "examples/local_audio", diff --git a/examples/encoded_video/Cargo.toml b/examples/encoded_video/Cargo.toml new file mode 100644 index 000000000..57d438f3b --- /dev/null +++ b/examples/encoded_video/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "encoded_video" +version = "0.1.0" +edition.workspace = true +publish = false + +[[bin]] +name = "encoded_video" +path = "src/main.rs" + +[dependencies] +tokio = { workspace = true, features = ["full", "parking_lot"] } +livekit = { workspace = true, features = ["rustls-tls-native-roots"] } +libwebrtc = { workspace = true } +livekit-api = { workspace = true } +clap = { workspace = true, features = ["derive", "env"] } +log = { workspace = true } +env_logger = { workspace = true } +anyhow = { workspace = true } diff --git a/examples/encoded_video/src/main.rs b/examples/encoded_video/src/main.rs new file mode 100644 index 000000000..72d5989cb --- /dev/null +++ b/examples/encoded_video/src/main.rs @@ -0,0 +1,808 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Example: Ingest pre-encoded H264/H265 video from a TCP server or file into a LiveKit room. +//! +//! # Usage +//! +//! ## H264 from TCP (default) +//! +//! First start a TCP server that streams Annex-B H264 data (e.g. with ffmpeg). +//! +//! **Important**: use `-g 30` (or similar) so that keyframes are emitted +//! frequently enough for subscribers to start receiving video quickly. +//! +//! ``` +//! ffmpeg -re -f lavfi -i testsrc=size=1280x720:rate=30 \ +//! -c:v libx264 -preset ultrafast -tune zerolatency \ +//! -g 30 -keyint_min 30 \ +//! -bsf:v h264_mp4toannexb -f h264 tcp://0.0.0.0:5000?listen +//! ``` +//! +//! Then run this example to connect to it: +//! ``` +//! cargo run --bin encoded_video -- --url wss://your.livekit.host --api-key --api-secret --room --connect 127.0.0.1:5000 +//! ``` +//! +//! ## H265 from TCP +//! +//! ``` +//! ffmpeg -re -f lavfi -i testsrc=size=1280x720:rate=30 \ +//! -c:v libx265 -preset ultrafast -tune zerolatency \ +//! -g 30 -keyint_min 30 \ +//! -f hevc tcp://0.0.0.0:5000?listen +//! ``` +//! +//! ``` +//! cargo run --bin encoded_video -- --codec h265 --connect 127.0.0.1:5000 \ +//! --url wss://your.livekit.host --api-key --api-secret --room +//! ``` +//! +//! ## From a file +//! +//! Generate an Annex-B file: +//! ``` +//! ffmpeg -f lavfi -i testsrc=size=1280x720:rate=30:duration=10 \ +//! -c:v libx264 -preset ultrafast -g 30 -keyint_min 30 \ +//! -bsf:v h264_mp4toannexb -f h264 test.h264 +//! ``` +//! +//! Play it into a room: +//! ``` +//! cargo run --bin encoded_video -- --file test.h264 --codec h264 \ +//! --url wss://your.livekit.host --api-key --api-secret --room +//! ``` +//! +//! Use `--loop-file` to replay the file continuously. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Instant; + +use anyhow::{bail, Result}; +use clap::{Parser, ValueEnum}; +use livekit::options::TrackPublishOptions; +use livekit::prelude::*; +use livekit::track::LocalVideoTrack; +use livekit::webrtc::encoded_video_source::native::NativeEncodedVideoSource; +use livekit::webrtc::encoded_video_source::{ + EncodedFrameInfo, KeyFrameRequestCallback, VideoCodecType, +}; +use livekit::webrtc::video_source::RtcVideoSource; +use livekit_api::access_token; +use log::debug; +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::net::TcpStream; + +// --------------------------------------------------------------------------- +// CLI +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, Copy, ValueEnum)] +enum CodecArg { + H264, + H265, +} + +#[derive(Parser, Debug)] +#[command(author, version, about = "Ingest pre-encoded H264/H265 video into LiveKit")] +struct Args { + /// LiveKit server URL (e.g. wss://your.livekit.host) + #[arg(long, env = "LIVEKIT_URL")] + url: String, + + /// LiveKit API key + #[arg(long, env = "LIVEKIT_API_KEY")] + api_key: String, + + /// LiveKit API secret + #[arg(long, env = "LIVEKIT_API_SECRET")] + api_secret: String, + + /// Room name to join + #[arg(long, default_value = "encoded-video-test")] + room: String, + + /// TCP server address to connect to for the encoded stream. + /// Mutually exclusive with --file. + #[arg(long)] + connect: Option, + + /// Path to a local Annex-B .h264/.h265 file. + /// Mutually exclusive with --connect. + #[arg(long)] + file: Option, + + /// Loop the file continuously (only used with --file) + #[arg(long, default_value_t = false)] + loop_file: bool, + + /// Video codec + #[arg(long, value_enum, default_value_t = CodecArg::H264)] + codec: CodecArg, + + /// Video width + #[arg(long, default_value_t = 1280)] + width: u32, + + /// Video height + #[arg(long, default_value_t = 720)] + height: u32, + + /// Frames per second (used for frame-rate pacing when reading from a file) + #[arg(long, default_value_t = 30)] + fps: u32, +} + +// --------------------------------------------------------------------------- +// Codec-aware NALU helpers +// --------------------------------------------------------------------------- + +/// Which codec we are parsing. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Codec { + H264, + H265, +} + +// -- H264 NALU types -- +const H264_NALU_SLICE: u8 = 1; +const H264_NALU_IDR: u8 = 5; +const H264_NALU_SPS: u8 = 7; +const H264_NALU_PPS: u8 = 8; + +// -- H265 (HEVC) NALU types -- +const H265_NALU_VPS: u8 = 32; +const H265_NALU_SPS: u8 = 33; +const H265_NALU_PPS: u8 = 34; +const H265_NALU_IDR_W_RADL: u8 = 19; +const H265_NALU_IDR_N_LP: u8 = 20; + +/// Extract the NALU type byte from Annex-B data (after the start code). +fn find_nalu_type(nalu_with_start: &[u8], codec: Codec) -> u8 { + // Skip 00 00 00 01 or 00 00 01 + let offset = if nalu_with_start.len() > 3 + && nalu_with_start[0] == 0 + && nalu_with_start[1] == 0 + && nalu_with_start[2] == 0 + && nalu_with_start[3] == 1 + { + 4 + } else if nalu_with_start.len() > 2 + && nalu_with_start[0] == 0 + && nalu_with_start[1] == 0 + && nalu_with_start[2] == 1 + { + 3 + } else { + return 0; + }; + + if offset >= nalu_with_start.len() { + return 0; + } + + match codec { + // H264: 1-byte header, type in lower 5 bits + Codec::H264 => nalu_with_start[offset] & 0x1F, + // H265: 2-byte header, type in bits [1..6] of the first byte + Codec::H265 => (nalu_with_start[offset] >> 1) & 0x3F, + } +} + +/// Is this NALU type a VCL (Video Coding Layer) unit — i.e. actual picture data? +fn is_vcl_nalu(nalu_type: u8, codec: Codec) -> bool { + match codec { + // H264 VCL types: 1 (coded slice) through 5 (IDR) + Codec::H264 => matches!(nalu_type, H264_NALU_SLICE..=H264_NALU_IDR), + // H265 VCL types: 0..=31 (all types < 32 are VCL in HEVC) + Codec::H265 => nalu_type <= 31, + } +} + +/// Is this NALU type a keyframe (IDR)? +fn is_keyframe_nalu(nalu_type: u8, codec: Codec) -> bool { + match codec { + Codec::H264 => nalu_type == H264_NALU_IDR, + Codec::H265 => matches!(nalu_type, H265_NALU_IDR_W_RADL | H265_NALU_IDR_N_LP), + } +} + +/// Is this a parameter set NALU? (SPS/PPS for H264, VPS/SPS/PPS for H265) +fn is_parameter_set_nalu(nalu_type: u8, codec: Codec) -> bool { + match codec { + Codec::H264 => matches!(nalu_type, H264_NALU_SPS | H264_NALU_PPS), + Codec::H265 => matches!(nalu_type, H265_NALU_VPS | H265_NALU_SPS | H265_NALU_PPS), + } +} + +/// For H265: check `first_slice_segment_in_pic_flag` to detect access-unit +/// boundaries in multi-slice pictures. +/// +/// In HEVC a single picture (IDR or otherwise) can be split across multiple +/// VCL NALUs (slices). The first bit of the slice segment header +/// (`first_slice_segment_in_pic_flag`) is 1 only for the first slice of a +/// new picture. Subsequent slices of the *same* picture have it set to 0 and +/// must be grouped into the same access unit. +/// +/// `nalu_data` is the full NALU *including* the Annex-B start code prefix. +/// +/// Returns `(is_first_slice, parsed_ok)`. +fn h265_first_slice_in_pic(nalu_data: &[u8]) -> (bool, bool) { + // Skip past the start code to reach the 2-byte NAL header + payload. + let offset = if nalu_data.len() > 3 + && nalu_data[0] == 0 + && nalu_data[1] == 0 + && nalu_data[2] == 0 + && nalu_data[3] == 1 + { + 4 + } else if nalu_data.len() > 2 + && nalu_data[0] == 0 + && nalu_data[1] == 0 + && nalu_data[2] == 1 + { + 3 + } else { + return (true, false); + }; + + // Need at least 2-byte NAL header + 1 byte of slice header + if nalu_data.len() < offset + 3 { + return (true, false); + } + + // Byte at offset+2 is the first byte of the slice segment header. + // Bit 7 (MSB) is first_slice_segment_in_pic_flag. + let first_flag = (nalu_data[offset + 2] & 0x80) != 0; + (first_flag, true) +} + +// --------------------------------------------------------------------------- +// Annex-B NALU parser (codec-agnostic start-code scanning) +// --------------------------------------------------------------------------- + +/// Simple Annex-B NALU parser. +/// +/// Accumulates data from the input, splits on 3-byte or 4-byte start +/// codes (`00 00 01` or `00 00 00 01`), and returns individual NALUs +/// with their type tag extracted using the appropriate codec rules. +struct AnnexBParser { + buffer: Vec, + codec: Codec, +} + +impl AnnexBParser { + fn new(codec: Codec) -> Self { + Self { + buffer: Vec::with_capacity(256 * 1024), + codec, + } + } + + /// Push raw data and extract complete NALUs. + /// Returns a list of (nalu_type, nalu_data_including_start_code) pairs. + fn push(&mut self, data: &[u8]) -> Vec<(u8, Vec)> { + self.buffer.extend_from_slice(data); + let mut nalus = Vec::new(); + let mut start = None; + + let mut i = 0; + while i < self.buffer.len() { + // Look for start codes + let is_4byte_start = i + 3 < self.buffer.len() + && self.buffer[i] == 0 + && self.buffer[i + 1] == 0 + && self.buffer[i + 2] == 0 + && self.buffer[i + 3] == 1; + let is_3byte_start = !is_4byte_start + && i + 2 < self.buffer.len() + && self.buffer[i] == 0 + && self.buffer[i + 1] == 0 + && self.buffer[i + 2] == 1; + + if is_4byte_start || is_3byte_start { + if let Some(prev_start) = start { + let nalu_data = self.buffer[prev_start..i].to_vec(); + if !nalu_data.is_empty() { + let nalu_type = find_nalu_type(&nalu_data, self.codec); + nalus.push((nalu_type, nalu_data)); + } + } + start = Some(i); + i += if is_4byte_start { 4 } else { 3 }; + } else { + i += 1; + } + } + + // Keep the remaining partial NALU in the buffer + if let Some(prev_start) = start { + self.buffer = self.buffer[prev_start..].to_vec(); + } + + nalus + } +} + +// --------------------------------------------------------------------------- +// Frame assembler (groups NALUs into access units) +// --------------------------------------------------------------------------- + +/// Group NALUs into access units (frames). +/// +/// An access unit typically looks like: +/// H264: [SPS] [PPS] [IDR] -- keyframe +/// [slice] -- delta frame +/// H265: [VPS] [SPS] [PPS] [IDR] -- keyframe +/// [slice] -- delta frame +/// +/// Non-VCL NALUs that appear *between* two VCL NALUs belong to the *next* +/// access unit. We also cache parameter sets (SPS/PPS, and VPS for H265) +/// and prepend them to any keyframe that does not already include them. +struct FrameAssembler { + codec: Codec, + pending_nalus: Vec<(u8, Vec)>, + /// Whether we have seen at least one keyframe. + seen_keyframe: bool, + /// Cached VPS NALU (H265 only). + cached_vps: Option>, + /// Cached SPS NALU. + cached_sps: Option>, + /// Cached PPS NALU. + cached_pps: Option>, +} + +impl FrameAssembler { + fn new(codec: Codec) -> Self { + Self { + codec, + pending_nalus: Vec::new(), + seen_keyframe: false, + cached_vps: None, + cached_sps: None, + cached_pps: None, + } + } + + /// Feed NALUs and return complete frames. + /// + /// Frames before the first keyframe (with parameter sets) are silently + /// dropped because decoders require an IDR with parameter sets to start. + /// Returns `(emitted_frames, dropped_count)`. + fn push_nalus(&mut self, nalus: Vec<(u8, Vec)>) -> (Vec, u64) { + let mut frames = Vec::new(); + let mut dropped: u64 = 0; + let codec = self.codec; + + for (nalu_type, nalu_data) in nalus { + // Cache every parameter set we see, even from dropped frames. + self.cache_parameter_set(nalu_type, &nalu_data); + + let is_vcl = is_vcl_nalu(nalu_type, codec); + + if is_vcl && !self.pending_nalus.is_empty() { + // Check if previous pending had a VCL -- if so, this *might* + // be the start of a new access unit (= new frame). + let has_prev_vcl = self + .pending_nalus + .iter() + .any(|(t, _)| is_vcl_nalu(*t, codec)); + + if has_prev_vcl { + // For H265, a single picture (IDR or otherwise) can be + // split into multiple VCL NALUs (slices). We must check + // first_slice_segment_in_pic_flag to know whether this + // VCL NALU starts a *new* picture or is a continuation + // slice of the current one. + let starts_new_au = match codec { + Codec::H264 => true, // H264 single-slice assumption + Codec::H265 => { + let (is_first, ok) = h265_first_slice_in_pic(&nalu_data); + // If we can't parse the flag, err on the side of + // splitting (same behaviour as the Go SDK). + !ok || is_first + } + }; + + if starts_new_au { + let mut frame = self.flush_frame_before_next_au(); + + // If this is a keyframe without parameter sets, prepend cached ones + if frame.is_keyframe && !frame.has_parameter_sets { + self.prepend_cached_parameter_sets(&mut frame); + } + + // Only emit frames starting from the first keyframe that + // has parameter sets (either inline or injected from cache). + if frame.is_keyframe && frame.has_parameter_sets { + if !self.seen_keyframe { + println!( + "First keyframe found! (size={} bytes, nalus={})", + frame.data.len(), + frame.nalu_count + ); + } + self.seen_keyframe = true; + } + if self.seen_keyframe { + frames.push(frame); + } else { + dropped += 1; + } + } + // If !starts_new_au, this is a continuation slice of the + // same picture — just fall through and append it to pending. + } + } + + self.pending_nalus.push((nalu_type, nalu_data)); + } + + (frames, dropped) + } + + /// Cache a parameter set NALU if it matches the codec's parameter set types. + fn cache_parameter_set(&mut self, nalu_type: u8, nalu_data: &[u8]) { + match self.codec { + Codec::H264 => match nalu_type { + H264_NALU_SPS => self.cached_sps = Some(nalu_data.to_vec()), + H264_NALU_PPS => self.cached_pps = Some(nalu_data.to_vec()), + _ => {} + }, + Codec::H265 => match nalu_type { + H265_NALU_VPS => self.cached_vps = Some(nalu_data.to_vec()), + H265_NALU_SPS => self.cached_sps = Some(nalu_data.to_vec()), + H265_NALU_PPS => self.cached_pps = Some(nalu_data.to_vec()), + _ => {} + }, + } + } + + /// Prepend cached parameter sets to a frame that is missing them. + fn prepend_cached_parameter_sets(&self, frame: &mut FrameData) { + let mut prefix = Vec::new(); + let mut extra_nalus = 0usize; + + // H265 needs VPS before SPS/PPS + if self.codec == Codec::H265 { + if let Some(vps) = &self.cached_vps { + prefix.extend_from_slice(vps); + extra_nalus += 1; + } + } + if let Some(sps) = &self.cached_sps { + prefix.extend_from_slice(sps); + extra_nalus += 1; + } + if let Some(pps) = &self.cached_pps { + prefix.extend_from_slice(pps); + extra_nalus += 1; + } + + if !prefix.is_empty() { + prefix.extend_from_slice(&frame.data); + frame.data = prefix; + frame.has_parameter_sets = true; + frame.nalu_count += extra_nalus; + } + } + + /// Flush the pending buffer as one frame, splitting off any trailing + /// non-VCL NALUs that belong to the *next* access unit. + fn flush_frame_before_next_au(&mut self) -> FrameData { + let all = std::mem::take(&mut self.pending_nalus); + let codec = self.codec; + + // Find the index of the last VCL NALU in the buffer. + let last_vcl_idx = all.iter().rposition(|(t, _)| is_vcl_nalu(*t, codec)); + + let split_at = match last_vcl_idx { + Some(idx) => idx + 1, + None => all.len(), + }; + + let (frame_nalus, carry_over) = all.split_at(split_at); + self.pending_nalus = carry_over.to_vec(); + + Self::build_frame(frame_nalus, codec) + } + + /// Build a FrameData from a slice of NALUs. + fn build_frame(nalus: &[(u8, Vec)], codec: Codec) -> FrameData { + let mut data = Vec::new(); + let mut is_keyframe = false; + let mut has_parameter_sets = false; + + for (nalu_type, nalu_data) in nalus { + data.extend_from_slice(nalu_data); + if is_keyframe_nalu(*nalu_type, codec) { + is_keyframe = true; + } + if is_parameter_set_nalu(*nalu_type, codec) { + has_parameter_sets = true; + } + } + + FrameData { + data, + is_keyframe, + has_parameter_sets, + nalu_count: nalus.len(), + } + } + + /// Flush any remaining pending NALUs as a final frame. + fn flush_remaining(&mut self) -> Option { + if self.pending_nalus.is_empty() { + return None; + } + let nalus = std::mem::take(&mut self.pending_nalus); + if self.seen_keyframe { + Some(Self::build_frame(&nalus, self.codec)) + } else { + None + } + } +} + +struct FrameData { + data: Vec, + is_keyframe: bool, + has_parameter_sets: bool, + nalu_count: usize, +} + +// --------------------------------------------------------------------------- +// Main +// --------------------------------------------------------------------------- + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + let args = Args::parse(); + + // Validate input source + if args.connect.is_none() && args.file.is_none() { + bail!("Either --connect or --file must be specified"); + } + if args.connect.is_some() && args.file.is_some() { + bail!("--connect and --file are mutually exclusive"); + } + + let codec = match args.codec { + CodecArg::H264 => Codec::H264, + CodecArg::H265 => Codec::H265, + }; + let codec_type = match codec { + Codec::H264 => VideoCodecType::H264, + Codec::H265 => VideoCodecType::H265, + }; + let video_codec = match codec { + Codec::H264 => livekit::options::VideoCodec::H264, + Codec::H265 => livekit::options::VideoCodec::H265, + }; + let codec_name = match codec { + Codec::H264 => "H264", + Codec::H265 => "H265", + }; + + println!( + "Starting encoded video ingest ({codec_name}): {}x{} @ {}fps", + args.width, args.height, args.fps + ); + + // Create the encoded video source + let mut encoded_source = NativeEncodedVideoSource::new(args.width, args.height, codec_type); + + // Register a keyframe request callback so that when WebRTC needs a keyframe + // (e.g. on subscriber join or packet loss), we know about it. + // In a real application you would signal the upstream encoder to produce an IDR. + struct KfCallback; + impl KeyFrameRequestCallback for KfCallback { + fn on_keyframe_request(&self) { + println!("WebRTC requested a keyframe (PLI)"); + } + } + encoded_source.set_keyframe_request_callback(Arc::new(KfCallback)); + + let rtc_source = RtcVideoSource::Encoded(encoded_source.clone()); + + // Create a video track from it + let track_name = format!("{}-ingest", codec_name.to_lowercase()); + let video_track = LocalVideoTrack::create_video_track(&track_name, rtc_source); + + // Generate access token + let token = access_token::AccessToken::with_api_key(&args.api_key, &args.api_secret) + .with_identity("encoded-video-publisher") + .with_name("Encoded Video Publisher") + .with_grants(access_token::VideoGrants { + room_join: true, + room: args.room.clone(), + ..Default::default() + }) + .to_jwt()?; + + // Connect to room + println!("Connecting to room: {}", args.room); + let (room, mut events) = Room::connect(&args.url, &token, RoomOptions::default()).await?; + println!("Connected to room: {}", args.room); + + // Publish the video track + let publish_options = TrackPublishOptions { + simulcast: false, // no simulcast for passthrough + source: TrackSource::Camera, + video_codec, + ..Default::default() + }; + println!("Publishing video track ({codec_name})..."); + let publication = room + .local_participant() + .publish_track( + livekit::track::LocalTrack::Video(video_track), + publish_options, + ) + .await?; + println!("Published video track: {}", publication.sid()); + + let running = Arc::new(AtomicBool::new(true)); + let running_clone = running.clone(); + + // Handle Ctrl+C + tokio::spawn(async move { + tokio::signal::ctrl_c().await.ok(); + println!("Shutting down..."); + running_clone.store(false, Ordering::SeqCst); + }); + + // Spawn room event handler + tokio::spawn(async move { + while let Some(event) = events.recv().await { + debug!("Room event: {:?}", event); + } + }); + + // Frame pacing interval (used for file input to simulate real-time) + let frame_interval = std::time::Duration::from_secs_f64(1.0 / args.fps as f64); + + // Run the ingest loop — may iterate more than once when --loop-file is set. + loop { + // Open the input source + let mut source: Box = if let Some(ref addr) = args.connect { + println!("Connecting to TCP server at {addr}..."); + let stream = TcpStream::connect(addr).await?; + println!("Connected to TCP server at {addr}"); + Box::new(stream) + } else { + let path = args.file.as_ref().unwrap(); + println!("Opening file: {path}"); + let file = tokio::fs::File::open(path).await?; + Box::new(file) + }; + + let mut parser = AnnexBParser::new(codec); + let mut assembler = FrameAssembler::new(codec); + let mut buf = vec![0u8; 64 * 1024]; + let mut frame_count: u64 = 0; + let mut keyframe_count: u64 = 0; + let mut dropped_count: u64 = 0; + let mut bytes_received: u64 = 0; + + let start_time = Instant::now(); + let is_file = args.file.is_some(); + + println!("Reading {codec_name} stream (waiting for first keyframe)..."); + + loop { + if !running.load(Ordering::SeqCst) { + break; + } + + match source.read(&mut buf).await { + Ok(0) => { + println!( + "{}", + if is_file { + "End of file reached" + } else { + "TCP server closed connection" + } + ); + // Flush remaining + if let Some(frame) = assembler.flush_remaining() { + let capture_time_us = start_time.elapsed().as_micros() as i64; + let info = EncodedFrameInfo { + data: frame.data, + capture_time_us, + rtp_timestamp: 0, + width: args.width, + height: args.height, + is_keyframe: frame.is_keyframe, + has_sps_pps: frame.has_parameter_sets, + }; + encoded_source.capture_frame(&info); + frame_count += 1; + } + break; + } + Ok(n) => { + bytes_received += n as u64; + let nalus = parser.push(&buf[..n]); + let (frames, dropped) = assembler.push_nalus(nalus); + dropped_count += dropped; + + for frame in frames { + // For file input, pace frames to approximate real-time playback + if is_file { + let target_time = frame_interval * frame_count as u32; + let elapsed = start_time.elapsed(); + if target_time > elapsed { + tokio::time::sleep(target_time - elapsed).await; + } + } + + let capture_time_us = start_time.elapsed().as_micros() as i64; + let is_keyframe = frame.is_keyframe; + let has_param_sets = frame.has_parameter_sets; + let frame_size = frame.data.len(); + let nalu_count = frame.nalu_count; + + if is_keyframe { + keyframe_count += 1; + } + + let info = EncodedFrameInfo { + data: frame.data, + capture_time_us, + rtp_timestamp: 0, + width: args.width, + height: args.height, + is_keyframe, + has_sps_pps: has_param_sets, + }; + + let ok = encoded_source.capture_frame(&info); + frame_count += 1; + + if frame_count <= 5 || frame_count % 100 == 0 || is_keyframe { + println!( + "Frame #{}: size={} bytes, keyframe={}, param_sets={}, nalus={}, capture_ok={}, total_bytes={}", + frame_count, frame_size, is_keyframe, has_param_sets, nalu_count, ok, bytes_received + ); + } + } + } + Err(e) => { + eprintln!("Read error: {}", e); + break; + } + } + } + + println!( + "Done. Total frames: {}, keyframes: {}, dropped_before_first_kf: {}, bytes received: {}", + frame_count, keyframe_count, dropped_count, bytes_received + ); + + // Loop only for file input with --loop-file + if !(is_file && args.loop_file && running.load(Ordering::SeqCst)) { + break; + } + println!("Looping file..."); + } + + room.close().await?; + println!("Room closed"); + Ok(()) +} diff --git a/libwebrtc/src/encoded_video_source.rs b/libwebrtc/src/encoded_video_source.rs new file mode 100644 index 000000000..0dfa45586 --- /dev/null +++ b/libwebrtc/src/encoded_video_source.rs @@ -0,0 +1,121 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::sync::Arc; + +use crate::video_source::VideoResolution; + +/// Video codec type for encoded frames. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +#[repr(u32)] +pub enum VideoCodecType { + VP8 = 1, + VP9 = 2, + AV1 = 3, + H264 = 4, + H265 = 5, +} + +/// Information about a pre-encoded video frame. +#[derive(Debug, Clone)] +pub struct EncodedFrameInfo { + /// The encoded frame data (e.g., H264 NALUs in Annex B format). + pub data: Vec, + /// Capture timestamp in microseconds. + pub capture_time_us: i64, + /// RTP timestamp (set to 0 for auto-generation). + pub rtp_timestamp: u32, + /// Frame width. + pub width: u32, + /// Frame height. + pub height: u32, + /// Whether this frame is a keyframe (IDR for H264). + pub is_keyframe: bool, + /// For H264: whether the frame includes SPS/PPS NALUs. + pub has_sps_pps: bool, +} + +impl Default for EncodedFrameInfo { + fn default() -> Self { + Self { + data: Vec::new(), + capture_time_us: 0, + rtp_timestamp: 0, + width: 0, + height: 0, + is_keyframe: false, + has_sps_pps: false, + } + } +} + +/// Callback trait for keyframe requests from the encoder/receiver. +pub trait KeyFrameRequestCallback: Send + Sync { + fn on_keyframe_request(&self); +} + +/// A video source that accepts pre-encoded frames (H264, VP8, VP9, etc.) +/// +/// This allows injecting pre-encoded video frames directly into the WebRTC +/// pipeline, bypassing the internal encoding step. +#[cfg(not(target_arch = "wasm32"))] +pub mod native { + use std::fmt::{Debug, Formatter}; + use std::sync::Arc; + + use super::*; + use crate::imp::encoded_video_source as evs_imp; + + #[derive(Clone)] + pub struct NativeEncodedVideoSource { + pub(crate) handle: evs_imp::NativeEncodedVideoSource, + } + + impl Debug for NativeEncodedVideoSource { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("NativeEncodedVideoSource") + .field("resolution", &self.handle.video_resolution()) + .field("codec", &self.handle.codec_type()) + .finish() + } + } + + impl NativeEncodedVideoSource { + pub fn new(width: u32, height: u32, codec: VideoCodecType) -> Self { + Self { + handle: evs_imp::NativeEncodedVideoSource::new(width, height, codec), + } + } + + pub fn capture_frame(&self, info: &EncodedFrameInfo) -> bool { + self.handle.capture_frame(info) + } + + pub fn set_keyframe_request_callback( + &mut self, + callback: Arc, + ) { + self.handle.set_keyframe_request_callback(callback); + } + + pub fn video_resolution(&self) -> VideoResolution { + self.handle.video_resolution() + } + + pub fn codec_type(&self) -> VideoCodecType { + self.handle.codec_type() + } + } +} diff --git a/libwebrtc/src/lib.rs b/libwebrtc/src/lib.rs index 1cd1eb1e5..db041d069 100644 --- a/libwebrtc/src/lib.rs +++ b/libwebrtc/src/lib.rs @@ -45,6 +45,7 @@ pub mod audio_source; pub mod audio_stream; pub mod audio_track; pub mod data_channel; +pub mod encoded_video_source; #[cfg(any(target_os = "macos", target_os = "windows", target_os = "linux"))] pub mod desktop_capturer; pub mod ice_candidate; diff --git a/libwebrtc/src/native/encoded_video_source.rs b/libwebrtc/src/native/encoded_video_source.rs new file mode 100644 index 000000000..c4fc9f1cc --- /dev/null +++ b/libwebrtc/src/native/encoded_video_source.rs @@ -0,0 +1,105 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use cxx::SharedPtr; +use webrtc_sys::encoded_video_source as evs_sys; + +use crate::{ + encoded_video_source::{EncodedFrameInfo, KeyFrameRequestCallback, VideoCodecType}, + video_source::VideoResolution, +}; + +impl From for evs_sys::ffi::VideoCodecType { + fn from(codec: VideoCodecType) -> Self { + match codec { + VideoCodecType::VP8 => Self::VP8, + VideoCodecType::VP9 => Self::VP9, + VideoCodecType::AV1 => Self::AV1, + VideoCodecType::H264 => Self::H264, + VideoCodecType::H265 => Self::H265, + } + } +} + +impl From for VideoCodecType { + fn from(codec: evs_sys::ffi::VideoCodecType) -> Self { + match codec { + evs_sys::ffi::VideoCodecType::VP8 => Self::VP8, + evs_sys::ffi::VideoCodecType::VP9 => Self::VP9, + evs_sys::ffi::VideoCodecType::AV1 => Self::AV1, + evs_sys::ffi::VideoCodecType::H264 => Self::H264, + evs_sys::ffi::VideoCodecType::H265 => Self::H265, + _ => Self::H264, + } + } +} + +#[derive(Clone)] +pub struct NativeEncodedVideoSource { + sys_handle: SharedPtr, +} + +impl NativeEncodedVideoSource { + pub fn new(width: u32, height: u32, codec: VideoCodecType) -> Self { + Self { + sys_handle: evs_sys::ffi::new_encoded_video_track_source( + width, + height, + codec.into(), + ), + } + } + + pub fn sys_handle(&self) -> SharedPtr { + self.sys_handle.clone() + } + + pub fn capture_frame(&self, info: &EncodedFrameInfo) -> bool { + evs_sys::ffi::capture_encoded_frame( + &self.sys_handle, + &info.data, + info.capture_time_us, + info.rtp_timestamp, + info.width, + info.height, + info.is_keyframe, + info.has_sps_pps, + ) + } + + pub fn set_keyframe_request_callback(&self, callback: Arc) { + struct CallbackAdapter(Arc); + + impl evs_sys::KeyFrameRequestObserver for CallbackAdapter { + fn on_keyframe_request(&self) { + self.0.on_keyframe_request(); + } + } + + let wrapper = evs_sys::KeyFrameRequestObserverWrapper::new(Arc::new(CallbackAdapter( + callback, + ))); + self.sys_handle.set_keyframe_request_callback(Box::new(wrapper)); + } + + pub fn video_resolution(&self) -> VideoResolution { + self.sys_handle.video_resolution().into() + } + + pub fn codec_type(&self) -> VideoCodecType { + self.sys_handle.codec_type().into() + } +} diff --git a/libwebrtc/src/native/mod.rs b/libwebrtc/src/native/mod.rs index 183f5ab66..df1f33512 100644 --- a/libwebrtc/src/native/mod.rs +++ b/libwebrtc/src/native/mod.rs @@ -21,6 +21,7 @@ pub mod audio_source; pub mod audio_stream; pub mod audio_track; pub mod data_channel; +pub mod encoded_video_source; #[cfg(any(target_os = "macos", target_os = "windows", target_os = "linux"))] pub mod desktop_capturer; pub mod frame_cryptor; diff --git a/libwebrtc/src/native/peer_connection_factory.rs b/libwebrtc/src/native/peer_connection_factory.rs index 4edc63047..a275f78c5 100644 --- a/libwebrtc/src/native/peer_connection_factory.rs +++ b/libwebrtc/src/native/peer_connection_factory.rs @@ -22,6 +22,7 @@ use webrtc_sys::{peer_connection_factory as sys_pcf, rtc_error as sys_err, webrt use crate::{ audio_source::native::NativeAudioSource, audio_track::RtcAudioTrack, + encoded_video_source::native::NativeEncodedVideoSource, imp::{audio_track as imp_at, peer_connection as imp_pc, video_track as imp_vt}, peer_connection::PeerConnection, peer_connection_factory::RtcConfiguration, @@ -94,6 +95,21 @@ impl PeerConnectionFactory { } } + pub fn create_video_track_from_encoded_source( + &self, + label: &str, + source: NativeEncodedVideoSource, + ) -> RtcVideoTrack { + RtcVideoTrack { + handle: imp_vt::RtcVideoTrack { + sys_handle: self.sys_handle.create_video_track_from_encoded_source( + label.to_string(), + source.handle.sys_handle(), + ), + }, + } + } + pub fn get_rtp_sender_capabilities(&self, media_type: MediaType) -> RtpCapabilities { self.sys_handle.rtp_sender_capabilities(media_type.into()).into() } diff --git a/libwebrtc/src/peer_connection_factory.rs b/libwebrtc/src/peer_connection_factory.rs index 12f6d24bc..a664047e5 100644 --- a/libwebrtc/src/peer_connection_factory.rs +++ b/libwebrtc/src/peer_connection_factory.rs @@ -87,13 +87,21 @@ impl PeerConnectionFactory { pub mod native { use super::PeerConnectionFactory; use crate::{ - audio_source::native::NativeAudioSource, audio_track::RtcAudioTrack, - video_source::native::NativeVideoSource, video_track::RtcVideoTrack, + audio_source::native::NativeAudioSource, + audio_track::RtcAudioTrack, + encoded_video_source::native::NativeEncodedVideoSource, + video_source::native::NativeVideoSource, + video_track::RtcVideoTrack, }; pub trait PeerConnectionFactoryExt { fn create_video_track(&self, label: &str, source: NativeVideoSource) -> RtcVideoTrack; fn create_audio_track(&self, label: &str, source: NativeAudioSource) -> RtcAudioTrack; + fn create_video_track_from_encoded_source( + &self, + label: &str, + source: NativeEncodedVideoSource, + ) -> RtcVideoTrack; } impl PeerConnectionFactoryExt for PeerConnectionFactory { @@ -104,5 +112,14 @@ pub mod native { fn create_audio_track(&self, label: &str, source: NativeAudioSource) -> RtcAudioTrack { self.handle.create_audio_track(label, source) } + + fn create_video_track_from_encoded_source( + &self, + label: &str, + source: NativeEncodedVideoSource, + ) -> RtcVideoTrack { + self.handle + .create_video_track_from_encoded_source(label, source) + } } } diff --git a/libwebrtc/src/prelude.rs b/libwebrtc/src/prelude.rs index 2b61aa1ce..e51534a40 100644 --- a/libwebrtc/src/prelude.rs +++ b/libwebrtc/src/prelude.rs @@ -17,6 +17,7 @@ pub use crate::{ audio_source::{AudioSourceOptions, RtcAudioSource}, audio_track::RtcAudioTrack, data_channel::{DataBuffer, DataChannel, DataChannelError, DataChannelInit, DataChannelState}, + encoded_video_source::{EncodedFrameInfo, KeyFrameRequestCallback, VideoCodecType}, ice_candidate::IceCandidate, media_stream::MediaStream, media_stream_track::{MediaStreamTrack, RtcTrackState}, diff --git a/libwebrtc/src/video_source.rs b/libwebrtc/src/video_source.rs index d73491748..087d378dd 100644 --- a/libwebrtc/src/video_source.rs +++ b/libwebrtc/src/video_source.rs @@ -35,12 +35,14 @@ pub enum RtcVideoSource { // TODO(theomonnom): Web video sources (eq. to tracks on browsers?) #[cfg(not(target_arch = "wasm32"))] Native(native::NativeVideoSource), + #[cfg(not(target_arch = "wasm32"))] + Encoded(crate::encoded_video_source::native::NativeEncodedVideoSource), } // TODO(theomonnom): Support enum dispatch with conditional compilation? impl RtcVideoSource { enum_dispatch!( - [Native]; + [Native, Encoded]; pub fn video_resolution(self: &Self) -> VideoResolution; ); } diff --git a/livekit-ffi-node-bindings/src/proto/ffi_pb.ts b/livekit-ffi-node-bindings/src/proto/ffi_pb.ts index 844e39175..f0d3d4063 100644 --- a/livekit-ffi-node-bindings/src/proto/ffi_pb.ts +++ b/livekit-ffi-node-bindings/src/proto/ffi_pb.ts @@ -21,7 +21,7 @@ import type { BinaryReadOptions, FieldList, JsonReadOptions, JsonValue, PartialM import { Message, proto2 } from "@bufbuild/protobuf"; import { ConnectCallback, ConnectRequest, ConnectResponse, DisconnectCallback, DisconnectRequest, DisconnectResponse, EditChatMessageRequest, GetSessionStatsCallback, GetSessionStatsRequest, GetSessionStatsResponse, PublishDataCallback, PublishDataRequest, PublishDataResponse, PublishSipDtmfCallback, PublishSipDtmfRequest, PublishSipDtmfResponse, PublishTrackCallback, PublishTrackRequest, PublishTrackResponse, PublishTranscriptionCallback, PublishTranscriptionRequest, PublishTranscriptionResponse, RoomEvent, SendChatMessageCallback, SendChatMessageRequest, SendChatMessageResponse, SendStreamChunkCallback, SendStreamChunkRequest, SendStreamChunkResponse, SendStreamHeaderCallback, SendStreamHeaderRequest, SendStreamHeaderResponse, SendStreamTrailerCallback, SendStreamTrailerRequest, SendStreamTrailerResponse, SetDataChannelBufferedAmountLowThresholdRequest, SetDataChannelBufferedAmountLowThresholdResponse, SetLocalAttributesCallback, SetLocalAttributesRequest, SetLocalAttributesResponse, SetLocalMetadataCallback, SetLocalMetadataRequest, SetLocalMetadataResponse, SetLocalNameCallback, SetLocalNameRequest, SetLocalNameResponse, SetSubscribedRequest, SetSubscribedResponse, UnpublishTrackCallback, UnpublishTrackRequest, UnpublishTrackResponse } from "./room_pb.js"; import { CreateAudioTrackRequest, CreateAudioTrackResponse, CreateVideoTrackRequest, CreateVideoTrackResponse, EnableRemoteTrackRequest, EnableRemoteTrackResponse, GetStatsCallback, GetStatsRequest, GetStatsResponse, LocalTrackMuteRequest, LocalTrackMuteResponse, SetTrackSubscriptionPermissionsRequest, SetTrackSubscriptionPermissionsResponse, TrackEvent } from "./track_pb.js"; -import { CaptureVideoFrameRequest, CaptureVideoFrameResponse, NewVideoSourceRequest, NewVideoSourceResponse, NewVideoStreamRequest, NewVideoStreamResponse, VideoConvertRequest, VideoConvertResponse, VideoStreamEvent, VideoStreamFromParticipantRequest, VideoStreamFromParticipantResponse } from "./video_frame_pb.js"; +import { CaptureEncodedVideoFrameRequest, CaptureEncodedVideoFrameResponse, CaptureVideoFrameRequest, CaptureVideoFrameResponse, NewVideoSourceRequest, NewVideoSourceResponse, NewVideoStreamRequest, NewVideoStreamResponse, VideoConvertRequest, VideoConvertResponse, VideoStreamEvent, VideoStreamFromParticipantRequest, VideoStreamFromParticipantResponse } from "./video_frame_pb.js"; import { ApmProcessReverseStreamRequest, ApmProcessReverseStreamResponse, ApmProcessStreamRequest, ApmProcessStreamResponse, ApmSetStreamDelayRequest, ApmSetStreamDelayResponse, AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, ClearAudioBufferRequest, ClearAudioBufferResponse, FlushSoxResamplerRequest, FlushSoxResamplerResponse, LoadAudioFilterPluginRequest, LoadAudioFilterPluginResponse, NewApmRequest, NewApmResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, NewSoxResamplerRequest, NewSoxResamplerResponse, PushSoxResamplerRequest, PushSoxResamplerResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js"; import { E2eeRequest, E2eeResponse } from "./e2ee_pb.js"; import { PerformRpcCallback, PerformRpcRequest, PerformRpcResponse, RegisterRpcMethodRequest, RegisterRpcMethodResponse, RpcMethodInvocationEvent, RpcMethodInvocationResponseRequest, RpcMethodInvocationResponseResponse, UnregisterRpcMethodRequest, UnregisterRpcMethodResponse } from "./rpc_pb.js"; @@ -226,6 +226,12 @@ export class FfiRequest extends Message { */ value: VideoStreamFromParticipantRequest; case: "videoStreamFromParticipant"; + } | { + /** + * @generated from field: livekit.proto.CaptureEncodedVideoFrameRequest capture_encoded_video_frame = 69; + */ + value: CaptureEncodedVideoFrameRequest; + case: "captureEncodedVideoFrame"; } | { /** * Audio @@ -532,6 +538,7 @@ export class FfiRequest extends Message { { no: 22, name: "capture_video_frame", kind: "message", T: CaptureVideoFrameRequest, oneof: "message" }, { no: 23, name: "video_convert", kind: "message", T: VideoConvertRequest, oneof: "message" }, { no: 24, name: "video_stream_from_participant", kind: "message", T: VideoStreamFromParticipantRequest, oneof: "message" }, + { no: 69, name: "capture_encoded_video_frame", kind: "message", T: CaptureEncodedVideoFrameRequest, oneof: "message" }, { no: 25, name: "new_audio_stream", kind: "message", T: NewAudioStreamRequest, oneof: "message" }, { no: 26, name: "new_audio_source", kind: "message", T: NewAudioSourceRequest, oneof: "message" }, { no: 27, name: "capture_audio_frame", kind: "message", T: CaptureAudioFrameRequest, oneof: "message" }, @@ -1019,6 +1026,12 @@ export class FfiResponse extends Message { */ value: SetRemoteTrackPublicationQualityResponse; case: "setRemoteTrackPublicationQuality"; + } | { + /** + * @generated from field: livekit.proto.CaptureEncodedVideoFrameResponse capture_encoded_video_frame = 68; + */ + value: CaptureEncodedVideoFrameResponse; + case: "captureEncodedVideoFrame"; } | { case: undefined; value?: undefined } = { case: undefined }; constructor(data?: PartialMessage) { @@ -1095,6 +1108,7 @@ export class FfiResponse extends Message { { no: 65, name: "text_stream_close", kind: "message", T: TextStreamWriterCloseResponse, oneof: "message" }, { no: 66, name: "send_bytes", kind: "message", T: StreamSendBytesResponse, oneof: "message" }, { no: 67, name: "set_remote_track_publication_quality", kind: "message", T: SetRemoteTrackPublicationQualityResponse, oneof: "message" }, + { no: 68, name: "capture_encoded_video_frame", kind: "message", T: CaptureEncodedVideoFrameResponse, oneof: "message" }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): FfiResponse { diff --git a/livekit-ffi-node-bindings/src/proto/video_frame_pb.ts b/livekit-ffi-node-bindings/src/proto/video_frame_pb.ts index 87f433b39..84a9220bc 100644 --- a/livekit-ffi-node-bindings/src/proto/video_frame_pb.ts +++ b/livekit-ffi-node-bindings/src/proto/video_frame_pb.ts @@ -203,10 +203,54 @@ export enum VideoSourceType { * @generated from enum value: VIDEO_SOURCE_NATIVE = 0; */ VIDEO_SOURCE_NATIVE = 0, + + /** + * @generated from enum value: VIDEO_SOURCE_ENCODED = 1; + */ + VIDEO_SOURCE_ENCODED = 1, } // Retrieve enum metadata with: proto2.getEnumType(VideoSourceType) proto2.util.setEnumType(VideoSourceType, "livekit.proto.VideoSourceType", [ { no: 0, name: "VIDEO_SOURCE_NATIVE" }, + { no: 1, name: "VIDEO_SOURCE_ENCODED" }, +]); + +/** + * @generated from enum livekit.proto.VideoCodecType + */ +export enum VideoCodecType { + /** + * @generated from enum value: CODEC_VP8 = 0; + */ + CODEC_VP8 = 0, + + /** + * @generated from enum value: CODEC_VP9 = 1; + */ + CODEC_VP9 = 1, + + /** + * @generated from enum value: CODEC_AV1 = 2; + */ + CODEC_AV1 = 2, + + /** + * @generated from enum value: CODEC_H264 = 3; + */ + CODEC_H264 = 3, + + /** + * @generated from enum value: CODEC_H265 = 4; + */ + CODEC_H265 = 4, +} +// Retrieve enum metadata with: proto2.getEnumType(VideoCodecType) +proto2.util.setEnumType(VideoCodecType, "livekit.proto.VideoCodecType", [ + { no: 0, name: "CODEC_VP8" }, + { no: 1, name: "CODEC_VP9" }, + { no: 2, name: "CODEC_AV1" }, + { no: 3, name: "CODEC_H264" }, + { no: 4, name: "CODEC_H265" }, ]); /** @@ -428,6 +472,13 @@ export class NewVideoSourceRequest extends Message { */ resolution?: VideoSourceResolution; + /** + * Required when type is VIDEO_SOURCE_ENCODED + * + * @generated from field: optional livekit.proto.VideoCodecType codec = 3; + */ + codec?: VideoCodecType; + constructor(data?: PartialMessage) { super(); proto2.util.initPartial(data, this); @@ -438,6 +489,7 @@ export class NewVideoSourceRequest extends Message { static readonly fields: FieldList = proto2.util.newFieldList(() => [ { no: 1, name: "type", kind: "enum", T: proto2.getEnumType(VideoSourceType), req: true }, { no: 2, name: "resolution", kind: "message", T: VideoSourceResolution, req: true }, + { no: 3, name: "codec", kind: "enum", T: proto2.getEnumType(VideoCodecType), opt: true }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): NewVideoSourceRequest { @@ -584,6 +636,118 @@ export class CaptureVideoFrameResponse extends Message { + /** + * @generated from field: required uint64 source_handle = 1; + */ + sourceHandle?: bigint; + + /** + * @generated from field: required bytes data = 2; + */ + data?: Uint8Array; + + /** + * @generated from field: required int64 capture_time_us = 3; + */ + captureTimeUs?: bigint; + + /** + * @generated from field: required uint32 rtp_timestamp = 4; + */ + rtpTimestamp?: number; + + /** + * @generated from field: required uint32 width = 5; + */ + width?: number; + + /** + * @generated from field: required uint32 height = 6; + */ + height?: number; + + /** + * @generated from field: required bool is_keyframe = 7; + */ + isKeyframe?: boolean; + + /** + * @generated from field: required bool has_sps_pps = 8; + */ + hasSpsPps?: boolean; + + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.CaptureEncodedVideoFrameRequest"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + { no: 1, name: "source_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, + { no: 2, name: "data", kind: "scalar", T: 12 /* ScalarType.BYTES */, req: true }, + { no: 3, name: "capture_time_us", kind: "scalar", T: 3 /* ScalarType.INT64 */, req: true }, + { no: 4, name: "rtp_timestamp", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 5, name: "width", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 6, name: "height", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 7, name: "is_keyframe", kind: "scalar", T: 8 /* ScalarType.BOOL */, req: true }, + { no: 8, name: "has_sps_pps", kind: "scalar", T: 8 /* ScalarType.BOOL */, req: true }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): CaptureEncodedVideoFrameRequest { + return new CaptureEncodedVideoFrameRequest().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): CaptureEncodedVideoFrameRequest { + return new CaptureEncodedVideoFrameRequest().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): CaptureEncodedVideoFrameRequest { + return new CaptureEncodedVideoFrameRequest().fromJsonString(jsonString, options); + } + + static equals(a: CaptureEncodedVideoFrameRequest | PlainMessage | undefined, b: CaptureEncodedVideoFrameRequest | PlainMessage | undefined): boolean { + return proto2.util.equals(CaptureEncodedVideoFrameRequest, a, b); + } +} + +/** + * @generated from message livekit.proto.CaptureEncodedVideoFrameResponse + */ +export class CaptureEncodedVideoFrameResponse extends Message { + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.CaptureEncodedVideoFrameResponse"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): CaptureEncodedVideoFrameResponse { + return new CaptureEncodedVideoFrameResponse().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): CaptureEncodedVideoFrameResponse { + return new CaptureEncodedVideoFrameResponse().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): CaptureEncodedVideoFrameResponse { + return new CaptureEncodedVideoFrameResponse().fromJsonString(jsonString, options); + } + + static equals(a: CaptureEncodedVideoFrameResponse | PlainMessage | undefined, b: CaptureEncodedVideoFrameResponse | PlainMessage | undefined): boolean { + return proto2.util.equals(CaptureEncodedVideoFrameResponse, a, b); + } +} + /** * @generated from message livekit.proto.VideoConvertRequest */ diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index 143e0c96b..fc4fa910c 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -87,6 +87,7 @@ message FfiRequest { CaptureVideoFrameRequest capture_video_frame = 22; VideoConvertRequest video_convert = 23; VideoStreamFromParticipantRequest video_stream_from_participant = 24; + CaptureEncodedVideoFrameRequest capture_encoded_video_frame = 69; // Audio NewAudioStreamRequest new_audio_stream = 25; @@ -152,7 +153,7 @@ message FfiRequest { SetRemoteTrackPublicationQualityRequest set_remote_track_publication_quality = 68; - // NEXT_ID: 69 + // NEXT_ID: 70 } } @@ -250,8 +251,9 @@ message FfiResponse { StreamSendBytesResponse send_bytes = 66; SetRemoteTrackPublicationQualityResponse set_remote_track_publication_quality = 67; + CaptureEncodedVideoFrameResponse capture_encoded_video_frame = 68; - // NEXT_ID: 68 + // NEXT_ID: 69 } } diff --git a/livekit-ffi/protocol/video_frame.proto b/livekit-ffi/protocol/video_frame.proto index d85c5ca98..12d8df836 100644 --- a/livekit-ffi/protocol/video_frame.proto +++ b/livekit-ffi/protocol/video_frame.proto @@ -49,6 +49,8 @@ message NewVideoSourceRequest { // Used to determine which encodings to use + simulcast layers // Most of the time it corresponds to the source resolution required VideoSourceResolution resolution = 2; + // Required when type is VIDEO_SOURCE_ENCODED + optional VideoCodecType codec = 3; } message NewVideoSourceResponse { required OwnedVideoSource source = 1; } @@ -62,6 +64,19 @@ message CaptureVideoFrameRequest { message CaptureVideoFrameResponse {} +// Push a pre-encoded frame to a VideoSource (type must be VIDEO_SOURCE_ENCODED) +message CaptureEncodedVideoFrameRequest { + required uint64 source_handle = 1; + required bytes data = 2; + required int64 capture_time_us = 3; + required uint32 rtp_timestamp = 4; + required uint32 width = 5; + required uint32 height = 6; + required bool is_keyframe = 7; + required bool has_sps_pps = 8; +} +message CaptureEncodedVideoFrameResponse {} + message VideoConvertRequest { optional bool flip_y = 1; required VideoBufferInfo buffer = 2; @@ -180,6 +195,15 @@ message VideoSourceResolution { enum VideoSourceType { VIDEO_SOURCE_NATIVE = 0; + VIDEO_SOURCE_ENCODED = 1; +} + +enum VideoCodecType { + CODEC_VP8 = 0; + CODEC_VP9 = 1; + CODEC_AV1 = 2; + CODEC_H264 = 3; + CODEC_H265 = 4; } message VideoSourceInfo { diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index a6251e1cd..aef4beb1f 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -469,6 +469,15 @@ unsafe fn on_capture_video_frame( Ok(proto::CaptureVideoFrameResponse::default()) } +fn on_capture_encoded_video_frame( + server: &'static FfiServer, + push: proto::CaptureEncodedVideoFrameRequest, +) -> FfiResult { + let source = server.retrieve_handle::(push.source_handle)?; + source.capture_encoded_frame(server, push)?; + Ok(proto::CaptureEncodedVideoFrameResponse::default()) +} + /// Convert a video frame /// /// # Safety: The user must ensure that the pointers/len provided are valid @@ -1227,6 +1236,9 @@ pub fn handle_request( } Request::NewVideoSource(req) => on_new_video_source(server, req)?.into(), Request::CaptureVideoFrame(req) => unsafe { on_capture_video_frame(server, req)?.into() }, + Request::CaptureEncodedVideoFrame(req) => { + on_capture_encoded_video_frame(server, req)?.into() + } Request::VideoConvert(req) => unsafe { on_video_convert(server, req)?.into() }, Request::NewAudioStream(req) => on_new_audio_stream(server, req)?.into(), Request::NewAudioSource(req) => on_new_audio_source(server, req)?.into(), diff --git a/livekit-ffi/src/server/video_source.rs b/livekit-ffi/src/server/video_source.rs index 5af7d9a38..23d5f46da 100644 --- a/livekit-ffi/src/server/video_source.rs +++ b/livekit-ffi/src/server/video_source.rs @@ -24,6 +24,16 @@ pub struct FfiVideoSource { impl FfiHandle for FfiVideoSource {} +fn proto_codec_to_libwebrtc(codec: proto::VideoCodecType) -> VideoCodecType { + match codec { + proto::VideoCodecType::CodecVp8 => VideoCodecType::VP8, + proto::VideoCodecType::CodecVp9 => VideoCodecType::VP9, + proto::VideoCodecType::CodecAv1 => VideoCodecType::AV1, + proto::VideoCodecType::CodecH264 => VideoCodecType::H264, + proto::VideoCodecType::CodecH265 => VideoCodecType::H265, + } +} + impl FfiVideoSource { pub fn setup( server: &'static server::FfiServer, @@ -39,6 +49,20 @@ impl FfiVideoSource { let video_source = NativeVideoSource::new(new_source.resolution.into()); RtcVideoSource::Native(video_source) } + #[cfg(not(target_arch = "wasm32"))] + proto::VideoSourceType::VideoSourceEncoded => { + use livekit::webrtc::encoded_video_source::native::NativeEncodedVideoSource; + + let proto_codec = new_source + .codec + .and_then(|c| proto::VideoCodecType::try_from(c).ok()) + .unwrap_or(proto::VideoCodecType::CodecH264); + let codec = proto_codec_to_libwebrtc(proto_codec); + let res = new_source.resolution; + let video_source = + NativeEncodedVideoSource::new(res.width, res.height, codec); + RtcVideoSource::Encoded(video_source) + } _ => return Err(FfiError::InvalidRequest("unsupported video source type".into())), }; @@ -74,4 +98,34 @@ impl FfiVideoSource { } Ok(()) } + + pub fn capture_encoded_frame( + &self, + _server: &'static server::FfiServer, + capture: proto::CaptureEncodedVideoFrameRequest, + ) -> FfiResult<()> { + match self.source { + #[cfg(not(target_arch = "wasm32"))] + RtcVideoSource::Encoded(ref source) => { + use livekit::webrtc::encoded_video_source::EncodedFrameInfo; + + let info = EncodedFrameInfo { + data: capture.data, + capture_time_us: capture.capture_time_us, + rtp_timestamp: capture.rtp_timestamp, + width: capture.width, + height: capture.height, + is_keyframe: capture.is_keyframe, + has_sps_pps: capture.has_sps_pps, + }; + source.capture_frame(&info); + } + _ => { + return Err(FfiError::InvalidRequest( + "capture_encoded_frame called on non-encoded source".into(), + )); + } + } + Ok(()) + } } diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index b9d955628..20ae36f61 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -39,7 +39,10 @@ use crate::{ ChatMessage, DataPacket, RoomSession, RpcAck, RpcRequest, RpcResponse, SipDTMF, Transcription, }; use chrono::Utc; -use libwebrtc::{native::create_random_uuid, rtp_parameters::RtpEncodingParameters}; +use libwebrtc::{ + native::create_random_uuid, rtp_parameters::RtpEncodingParameters, + video_source::RtcVideoSource, +}; use livekit_api::signal_client::SignalError; use livekit_protocol as proto; use livekit_runtime::timeout; @@ -252,8 +255,27 @@ impl LocalParticipant { pub async fn publish_track( &self, track: LocalTrack, - options: TrackPublishOptions, + mut options: TrackPublishOptions, ) -> RoomResult { + // When publishing an encoded source, force codec and disable simulcast + #[cfg(not(target_arch = "wasm32"))] + if let LocalTrack::Video(ref video_track) = track { + let source = video_track.rtc_source(); + if let RtcVideoSource::Encoded(ref encoded_source) = source { + use crate::options::VideoCodec; + use libwebrtc::encoded_video_source::VideoCodecType; + + options.simulcast = false; + options.video_codec = match encoded_source.codec_type() { + VideoCodecType::VP8 => VideoCodec::VP8, + VideoCodecType::VP9 => VideoCodec::VP9, + VideoCodecType::AV1 => VideoCodec::AV1, + VideoCodecType::H264 => VideoCodec::H264, + VideoCodecType::H265 => VideoCodec::H265, + }; + } + } + let disable_red = self.local.encryption_type != EncryptionType::None || !options.red; let mut req = proto::AddTrackRequest { diff --git a/livekit/src/room/track/local_video_track.rs b/livekit/src/room/track/local_video_track.rs index c7c26649b..a32c0869b 100644 --- a/livekit/src/room/track/local_video_track.rs +++ b/livekit/src/room/track/local_video_track.rs @@ -58,6 +58,16 @@ impl LocalVideoTrack { .pc_factory() .create_video_track(&libwebrtc::native::create_random_uuid(), native_source) } + #[cfg(not(target_arch = "wasm32"))] + RtcVideoSource::Encoded(encoded_source) => { + use libwebrtc::peer_connection_factory::native::PeerConnectionFactoryExt; + LkRuntime::instance() + .pc_factory() + .create_video_track_from_encoded_source( + &libwebrtc::native::create_random_uuid(), + encoded_source, + ) + } _ => panic!("unsupported video source"), }; diff --git a/webrtc-sys/build.rs b/webrtc-sys/build.rs index 67f77f7c1..c3ba58c8f 100644 --- a/webrtc-sys/build.rs +++ b/webrtc-sys/build.rs @@ -35,6 +35,7 @@ fn main() { "src/media_stream_track.rs", "src/audio_track.rs", "src/video_track.rs", + "src/encoded_video_source.rs", "src/data_channel.rs", "src/frame_cryptor.rs", "src/jsep.rs", @@ -84,6 +85,8 @@ fn main() { "src/video_decoder_factory.cpp", "src/audio_device.cpp", "src/audio_resampler.cpp", + "src/encoded_video_source.cpp", + "src/passthrough_video_encoder.cpp", "src/frame_cryptor.cpp", "src/global_task_queue.cpp", "src/prohibit_libsrtp_initialization.cpp", diff --git a/webrtc-sys/include/livekit/encoded_video_source.h b/webrtc-sys/include/livekit/encoded_video_source.h new file mode 100644 index 000000000..b951b0238 --- /dev/null +++ b/webrtc-sys/include/livekit/encoded_video_source.h @@ -0,0 +1,138 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "api/video/video_frame.h" +#include "api/video/i420_buffer.h" +#include "livekit/video_track.h" +#include "media/base/adapted_video_track_source.h" +#include "rtc_base/synchronization/mutex.h" +#include "rtc_base/timestamp_aligner.h" +#include "rust/cxx.h" + +namespace livekit_ffi { + +class EncodedVideoTrackSource; +class KeyFrameRequestObserverWrapper; +} // namespace livekit_ffi +#include "webrtc-sys/src/encoded_video_source.rs.h" + +namespace livekit_ffi { + +/// Holds a single queued encoded frame payload. +struct EncodedFrameData { + std::vector data; + int64_t capture_time_us; + uint32_t rtp_timestamp; + uint32_t width; + uint32_t height; + bool is_keyframe; + bool has_sps_pps; +}; + +/// A video track source that accepts pre-encoded frames. +/// +/// When `capture_encoded_frame()` is called the encoded payload is queued and +/// a tiny 1×1 dummy raw frame is pushed through the normal +/// `AdaptedVideoTrackSource::OnFrame()` path so that WebRTC's encoding +/// pipeline fires. The paired `PassthroughVideoEncoder` pulls the queued +/// data out of this source instead of actually encoding. +class EncodedVideoTrackSource { + class InternalSource : public webrtc::AdaptedVideoTrackSource { + public: + InternalSource(const VideoResolution& resolution); + ~InternalSource() override; + + bool is_screencast() const override; + std::optional needs_denoising() const override; + SourceState state() const override; + bool remote() const override; + VideoResolution video_resolution() const; + + /// Enqueue an encoded frame and trigger the encode pipeline. + bool capture_encoded_frame(rust::Slice data, + int64_t capture_time_us, + uint32_t rtp_timestamp, + uint32_t width, + uint32_t height, + bool is_keyframe, + bool has_sps_pps); + + /// Called by PassthroughVideoEncoder::Encode() to retrieve the next + /// queued encoded payload. + std::optional dequeue_frame(); + + /// Set by the encoder when WebRTC requests a keyframe. + void request_keyframe(); + bool consume_keyframe_request(); + + private: + mutable webrtc::Mutex mutex_; + webrtc::TimestampAligner timestamp_aligner_; + VideoResolution resolution_; + std::queue frame_queue_ RTC_GUARDED_BY(mutex_); + std::atomic keyframe_requested_{false}; + webrtc::scoped_refptr dummy_buffer_; + }; + + public: + EncodedVideoTrackSource(const VideoResolution& resolution, + VideoCodecType codec); + + VideoResolution video_resolution() const; + VideoCodecType codec_type() const; + + void set_keyframe_request_callback( + rust::Box observer) const; + + webrtc::scoped_refptr get() const; + + private: + webrtc::scoped_refptr source_; + VideoCodecType codec_; + mutable webrtc::Mutex cb_mutex_; + mutable std::unique_ptr> + keyframe_observer_ RTC_GUARDED_BY(cb_mutex_); + + friend class PassthroughVideoEncoder; +}; + +std::shared_ptr new_encoded_video_track_source( + uint32_t width, + uint32_t height, + VideoCodecType codec); + +/// Free function bridge for CXX -- delegates to InternalSource +bool capture_encoded_frame(const EncodedVideoTrackSource& source, + rust::Slice data, + int64_t capture_time_us, + uint32_t rtp_timestamp, + uint32_t width, + uint32_t height, + bool is_keyframe, + bool has_sps_pps); + +static std::shared_ptr +_shared_encoded_video_track_source() { + return nullptr; // Ignore -- needed for CXX SharedPtr codegen +} + +} // namespace livekit_ffi diff --git a/webrtc-sys/include/livekit/passthrough_video_encoder.h b/webrtc-sys/include/livekit/passthrough_video_encoder.h new file mode 100644 index 000000000..7fbe31bd0 --- /dev/null +++ b/webrtc-sys/include/livekit/passthrough_video_encoder.h @@ -0,0 +1,90 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "api/video_codecs/video_encoder.h" +#include "api/video_codecs/video_encoder_factory.h" +#include "livekit/encoded_video_source.h" + +namespace livekit_ffi { + +/// A video encoder that passes through pre-encoded frame data. +/// +/// Instead of actually encoding the incoming raw frame, it pulls the +/// next queued `EncodedFrameData` from the associated +/// `EncodedVideoTrackSource` and delivers it to the WebRTC RTP pipeline +/// via `EncodedImageCallback::OnEncodedImage()`. +class PassthroughVideoEncoder : public webrtc::VideoEncoder { + public: + explicit PassthroughVideoEncoder( + std::shared_ptr source); + ~PassthroughVideoEncoder() override = default; + + int32_t InitEncode(const webrtc::VideoCodec* codec_settings, + const webrtc::VideoEncoder::Settings& settings) override; + + int32_t RegisterEncodeCompleteCallback( + webrtc::EncodedImageCallback* callback) override; + + int32_t Release() override; + + int32_t Encode( + const webrtc::VideoFrame& frame, + const std::vector* frame_types) override; + + void SetRates(const RateControlParameters& parameters) override; + + EncoderInfo GetEncoderInfo() const override; + + private: + std::shared_ptr source_; + webrtc::EncodedImageCallback* callback_ = nullptr; + webrtc::VideoCodec codec_; + bool sending_ = false; +}; + +/// Global registry that maps source pointers to their shared_ptr so the +/// encoder factory can look them up when creating an encoder. +class EncodedSourceRegistry { + public: + static EncodedSourceRegistry& instance(); + + void register_source( + const webrtc::VideoTrackSourceInterface* key, + std::shared_ptr source); + void unregister_source(const webrtc::VideoTrackSourceInterface* key); + + std::shared_ptr find( + const webrtc::VideoTrackSourceInterface* key) const; + + /// Find a registered encoded source whose codec matches the given SDP + /// codec name (e.g. "H264", "VP8"). Returns the first match or nullptr. + std::shared_ptr find_by_codec_name( + const std::string& codec_name) const; + + private: + mutable std::mutex mutex_; + std::unordered_map> + sources_; +}; + +} // namespace livekit_ffi diff --git a/webrtc-sys/include/livekit/peer_connection_factory.h b/webrtc-sys/include/livekit/peer_connection_factory.h index 2af3589dd..fdde89667 100644 --- a/webrtc-sys/include/livekit/peer_connection_factory.h +++ b/webrtc-sys/include/livekit/peer_connection_factory.h @@ -20,6 +20,7 @@ #include "api/scoped_refptr.h" #include "api/task_queue/task_queue_factory.h" #include "livekit/audio_device.h" +#include "livekit/encoded_video_source.h" #include "media_stream.h" #include "rtp_parameters.h" #include "rust/cxx.h" @@ -52,6 +53,10 @@ class PeerConnectionFactory { rust::String label, std::shared_ptr source) const; + std::shared_ptr create_video_track_from_encoded_source( + rust::String label, + std::shared_ptr source) const; + std::shared_ptr create_audio_track( rust::String label, std::shared_ptr source) const; diff --git a/webrtc-sys/src/encoded_video_source.cpp b/webrtc-sys/src/encoded_video_source.cpp new file mode 100644 index 000000000..75ac9de76 --- /dev/null +++ b/webrtc-sys/src/encoded_video_source.cpp @@ -0,0 +1,178 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/encoded_video_source.h" + +#include "api/video/i420_buffer.h" +#include "livekit/passthrough_video_encoder.h" +#include "rtc_base/logging.h" +#include "rtc_base/time_utils.h" + +namespace livekit_ffi { + +// ---------- InternalSource ---------- + +EncodedVideoTrackSource::InternalSource::InternalSource( + const VideoResolution& resolution) + : webrtc::AdaptedVideoTrackSource(4), resolution_(resolution) { + // Create a 2x2 dummy I420 buffer (minimum valid size for WebRTC) + dummy_buffer_ = webrtc::I420Buffer::Create(2, 2); + // Fill with black + webrtc::I420Buffer::SetBlack(dummy_buffer_.get()); +} + +EncodedVideoTrackSource::InternalSource::~InternalSource() {} + +bool EncodedVideoTrackSource::InternalSource::is_screencast() const { + return false; +} + +std::optional +EncodedVideoTrackSource::InternalSource::needs_denoising() const { + return false; +} + +webrtc::MediaSourceInterface::SourceState +EncodedVideoTrackSource::InternalSource::state() const { + return SourceState::kLive; +} + +bool EncodedVideoTrackSource::InternalSource::remote() const { + return false; +} + +VideoResolution +EncodedVideoTrackSource::InternalSource::video_resolution() const { + webrtc::MutexLock lock(&mutex_); + return resolution_; +} + +bool EncodedVideoTrackSource::InternalSource::capture_encoded_frame( + rust::Slice data, + int64_t capture_time_us, + uint32_t rtp_timestamp, + uint32_t width, + uint32_t height, + bool is_keyframe, + bool has_sps_pps) { + // Enqueue the encoded data + { + webrtc::MutexLock lock(&mutex_); + EncodedFrameData frame; + frame.data.assign(data.data(), data.data() + data.size()); + frame.capture_time_us = capture_time_us; + frame.rtp_timestamp = rtp_timestamp; + frame.width = width; + frame.height = height; + frame.is_keyframe = is_keyframe; + frame.has_sps_pps = has_sps_pps; + frame_queue_.push(std::move(frame)); + } + + // Push a dummy frame to trigger the WebRTC encode pipeline. + // The PassthroughVideoEncoder will pull the real data from our queue. + int64_t ts_us = capture_time_us; + if (ts_us == 0) { + ts_us = webrtc::TimeMicros(); + } + + OnFrame(webrtc::VideoFrame::Builder() + .set_video_frame_buffer(dummy_buffer_) + .set_rotation(webrtc::kVideoRotation_0) + .set_timestamp_us(ts_us) + .build()); + + return true; +} + +std::optional +EncodedVideoTrackSource::InternalSource::dequeue_frame() { + webrtc::MutexLock lock(&mutex_); + if (frame_queue_.empty()) { + return std::nullopt; + } + EncodedFrameData frame = std::move(frame_queue_.front()); + frame_queue_.pop(); + return frame; +} + +void EncodedVideoTrackSource::InternalSource::request_keyframe() { + keyframe_requested_.store(true, std::memory_order_release); +} + +bool EncodedVideoTrackSource::InternalSource::consume_keyframe_request() { + return keyframe_requested_.exchange(false, std::memory_order_acq_rel); +} + +// ---------- EncodedVideoTrackSource ---------- + +EncodedVideoTrackSource::EncodedVideoTrackSource( + const VideoResolution& resolution, + VideoCodecType codec) + : codec_(codec) { + source_ = webrtc::make_ref_counted(resolution); +} + +VideoResolution EncodedVideoTrackSource::video_resolution() const { + return source_->video_resolution(); +} + +VideoCodecType EncodedVideoTrackSource::codec_type() const { + return codec_; +} + +void EncodedVideoTrackSource::set_keyframe_request_callback( + rust::Box observer) const { + webrtc::MutexLock lock(&cb_mutex_); + keyframe_observer_ = + std::make_unique>( + std::move(observer)); +} + +webrtc::scoped_refptr +EncodedVideoTrackSource::get() const { + return source_; +} + +std::shared_ptr new_encoded_video_track_source( + uint32_t width, + uint32_t height, + VideoCodecType codec) { + VideoResolution res{width, height}; + auto source = + std::make_shared(res, codec); + + // Register in the global registry so the encoder factory can find it + EncodedSourceRegistry::instance().register_source( + source->get().get(), source); + + return source; +} + +bool capture_encoded_frame(const EncodedVideoTrackSource& source, + rust::Slice data, + int64_t capture_time_us, + uint32_t rtp_timestamp, + uint32_t width, + uint32_t height, + bool is_keyframe, + bool has_sps_pps) { + return source.get()->capture_encoded_frame( + data, capture_time_us, rtp_timestamp, width, height, is_keyframe, + has_sps_pps); +} + +} // namespace livekit_ffi diff --git a/webrtc-sys/src/encoded_video_source.rs b/webrtc-sys/src/encoded_video_source.rs new file mode 100644 index 000000000..854ee9766 --- /dev/null +++ b/webrtc-sys/src/encoded_video_source.rs @@ -0,0 +1,98 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use crate::impl_thread_safety; + +#[cxx::bridge(namespace = "livekit_ffi")] +pub mod ffi { + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + #[repr(i32)] + pub enum VideoCodecType { + VP8 = 1, + VP9 = 2, + AV1 = 3, + H264 = 4, + H265 = 5, + } + + extern "C++" { + include!("livekit/video_track.h"); + + type VideoResolution = crate::video_track::ffi::VideoResolution; + } + + unsafe extern "C++" { + include!("livekit/encoded_video_source.h"); + + type EncodedVideoTrackSource; + + fn video_resolution(self: &EncodedVideoTrackSource) -> VideoResolution; + fn codec_type(self: &EncodedVideoTrackSource) -> VideoCodecType; + + fn capture_encoded_frame( + source: &EncodedVideoTrackSource, + data: &[u8], + capture_time_us: i64, + rtp_timestamp: u32, + width: u32, + height: u32, + is_keyframe: bool, + has_sps_pps: bool, + ) -> bool; + + fn set_keyframe_request_callback( + self: &EncodedVideoTrackSource, + observer: Box, + ); + + fn new_encoded_video_track_source( + width: u32, + height: u32, + codec: VideoCodecType, + ) -> SharedPtr; + + fn _shared_encoded_video_track_source() -> SharedPtr; + } + + extern "Rust" { + type KeyFrameRequestObserverWrapper; + + fn on_keyframe_request(self: &KeyFrameRequestObserverWrapper); + } +} + +// Ensure CXX generates SharedPtr support +// (the dummy function in C++ returns nullptr) + +impl_thread_safety!(ffi::EncodedVideoTrackSource, Send + Sync); + +pub trait KeyFrameRequestObserver: Send + Sync { + fn on_keyframe_request(&self); +} + +pub struct KeyFrameRequestObserverWrapper { + observer: Arc, +} + +impl KeyFrameRequestObserverWrapper { + pub fn new(observer: Arc) -> Self { + Self { observer } + } + + fn on_keyframe_request(&self) { + self.observer.on_keyframe_request(); + } +} diff --git a/webrtc-sys/src/lib.rs b/webrtc-sys/src/lib.rs index fde63e14a..7b88e0ef3 100644 --- a/webrtc-sys/src/lib.rs +++ b/webrtc-sys/src/lib.rs @@ -20,6 +20,7 @@ pub mod audio_resampler; pub mod audio_track; pub mod candidate; pub mod data_channel; +pub mod encoded_video_source; #[cfg(any(target_os = "macos", target_os = "windows", target_os = "linux"))] pub mod desktop_capturer; pub mod frame_cryptor; diff --git a/webrtc-sys/src/passthrough_video_encoder.cpp b/webrtc-sys/src/passthrough_video_encoder.cpp new file mode 100644 index 000000000..3a6800821 --- /dev/null +++ b/webrtc-sys/src/passthrough_video_encoder.cpp @@ -0,0 +1,246 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/passthrough_video_encoder.h" + +#include "api/video/encoded_image.h" +#include "api/video/video_codec_type.h" +#include "api/video_codecs/video_codec.h" +#include "modules/video_coding/include/video_codec_interface.h" +#include "modules/video_coding/include/video_error_codes.h" +#include "rtc_base/logging.h" +#include "rtc_base/time_utils.h" + +namespace livekit_ffi { + +// ---------- PassthroughVideoEncoder ---------- + +PassthroughVideoEncoder::PassthroughVideoEncoder( + std::shared_ptr source) + : source_(std::move(source)) {} + +int32_t PassthroughVideoEncoder::InitEncode( + const webrtc::VideoCodec* codec_settings, + const webrtc::VideoEncoder::Settings& settings) { + if (!codec_settings) { + return WEBRTC_VIDEO_CODEC_ERR_PARAMETER; + } + codec_ = *codec_settings; + sending_ = true; + RTC_LOG(LS_INFO) << "PassthroughVideoEncoder::InitEncode " + << codec_.width << "x" << codec_.height; + return WEBRTC_VIDEO_CODEC_OK; +} + +int32_t PassthroughVideoEncoder::RegisterEncodeCompleteCallback( + webrtc::EncodedImageCallback* callback) { + callback_ = callback; + return WEBRTC_VIDEO_CODEC_OK; +} + +int32_t PassthroughVideoEncoder::Release() { + callback_ = nullptr; + sending_ = false; + return WEBRTC_VIDEO_CODEC_OK; +} + +int32_t PassthroughVideoEncoder::Encode( + const webrtc::VideoFrame& frame, + const std::vector* frame_types) { + if (!callback_) { + return WEBRTC_VIDEO_CODEC_UNINITIALIZED; + } + if (!sending_) { + return WEBRTC_VIDEO_CODEC_NO_OUTPUT; + } + + // Check if WebRTC is requesting a keyframe + bool keyframe_requested = false; + if (frame_types) { + for (auto ft : *frame_types) { + if (ft == webrtc::VideoFrameType::kVideoFrameKey) { + keyframe_requested = true; + break; + } + } + } + + // Signal the keyframe request to the Rust side + if (keyframe_requested) { + source_->source_->request_keyframe(); + // Also invoke the Rust callback if set + webrtc::MutexLock lock(&source_->cb_mutex_); + if (source_->keyframe_observer_) { + (*source_->keyframe_observer_)->on_keyframe_request(); + } + } + + // Pull the queued encoded frame + auto encoded = source_->source_->dequeue_frame(); + if (!encoded.has_value()) { + return WEBRTC_VIDEO_CODEC_NO_OUTPUT; + } + + const auto& data = encoded.value(); + + // Build the EncodedImage + webrtc::EncodedImage encoded_image; + encoded_image.SetEncodedData( + webrtc::EncodedImageBuffer::Create(data.data.data(), data.data.size())); + encoded_image.set_size(data.data.size()); + encoded_image._encodedWidth = data.width; + encoded_image._encodedHeight = data.height; + + // RTP timestamp: if the caller provided one use it, otherwise inherit the + // timestamp that the WebRTC pipeline assigned to the dummy VideoFrame. + // This is critical — without a proper, monotonically increasing RTP + // timestamp the remote jitter buffer cannot order frames and will not + // render anything. + if (data.rtp_timestamp != 0) { + encoded_image.SetRtpTimestamp(data.rtp_timestamp); + } else { + encoded_image.SetRtpTimestamp(frame.rtp_timestamp()); + } + + // Timing fields — mirror what hardware encoders (NVIDIA, VAAPI) set from + // the incoming VideoFrame so the RTP sender and remote jitter buffer see + // consistent, monotonically-increasing times. + encoded_image.ntp_time_ms_ = frame.ntp_time_ms(); + encoded_image.capture_time_ms_ = frame.render_time_ms(); + encoded_image.rotation_ = webrtc::kVideoRotation_0; + encoded_image.content_type_ = webrtc::VideoContentType::UNSPECIFIED; + encoded_image.timing_.flags = webrtc::VideoSendTiming::kInvalid; + encoded_image._frameType = data.is_keyframe + ? webrtc::VideoFrameType::kVideoFrameKey + : webrtc::VideoFrameType::kVideoFrameDelta; + encoded_image.SetSimulcastIndex(0); + + // Build codec-specific info + webrtc::CodecSpecificInfo codec_info; + switch (source_->codec_type()) { + case VideoCodecType::H264: + codec_info.codecType = webrtc::kVideoCodecH264; + codec_info.codecSpecific.H264.packetization_mode = + webrtc::H264PacketizationMode::NonInterleaved; + break; + case VideoCodecType::VP8: + codec_info.codecType = webrtc::kVideoCodecVP8; + break; + case VideoCodecType::VP9: + codec_info.codecType = webrtc::kVideoCodecVP9; + break; + case VideoCodecType::AV1: + codec_info.codecType = webrtc::kVideoCodecAV1; + break; + case VideoCodecType::H265: + codec_info.codecType = webrtc::kVideoCodecH265; + break; + default: + codec_info.codecType = webrtc::kVideoCodecH264; + break; + } + + auto result = callback_->OnEncodedImage(encoded_image, &codec_info); + if (result.error != webrtc::EncodedImageCallback::Result::OK) { + RTC_LOG(LS_ERROR) << "PassthroughVideoEncoder: OnEncodedImage failed: " + << result.error; + return WEBRTC_VIDEO_CODEC_ERROR; + } + + return WEBRTC_VIDEO_CODEC_OK; +} + +void PassthroughVideoEncoder::SetRates( + const RateControlParameters& parameters) { + // Passthrough encoder doesn't control bitrate -- that's handled externally. + if (parameters.bitrate.get_sum_bps() == 0) { + sending_ = false; + } else { + sending_ = true; + } +} + +webrtc::VideoEncoder::EncoderInfo +PassthroughVideoEncoder::GetEncoderInfo() const { + EncoderInfo info; + info.supports_native_handle = false; + info.implementation_name = "PassthroughVideoEncoder"; + info.scaling_settings = webrtc::VideoEncoder::ScalingSettings::kOff; + info.is_hardware_accelerated = false; + info.supports_simulcast = false; + info.preferred_pixel_formats = {webrtc::VideoFrameBuffer::Type::kI420}; + return info; +} + +// ---------- EncodedSourceRegistry ---------- + +EncodedSourceRegistry& EncodedSourceRegistry::instance() { + static EncodedSourceRegistry registry; + return registry; +} + +void EncodedSourceRegistry::register_source( + const webrtc::VideoTrackSourceInterface* key, + std::shared_ptr source) { + std::lock_guard lock(mutex_); + sources_[key] = std::move(source); +} + +void EncodedSourceRegistry::unregister_source( + const webrtc::VideoTrackSourceInterface* key) { + std::lock_guard lock(mutex_); + sources_.erase(key); +} + +std::shared_ptr EncodedSourceRegistry::find( + const webrtc::VideoTrackSourceInterface* key) const { + std::lock_guard lock(mutex_); + auto it = sources_.find(key); + if (it != sources_.end()) { + return it->second; + } + return nullptr; +} + +static std::string codec_type_to_sdp_name(VideoCodecType codec) { + switch (codec) { + case VideoCodecType::VP8: + return "VP8"; + case VideoCodecType::VP9: + return "VP9"; + case VideoCodecType::AV1: + return "AV1"; + case VideoCodecType::H264: + return "H264"; + case VideoCodecType::H265: + return "H265"; + default: + return ""; + } +} + +std::shared_ptr EncodedSourceRegistry::find_by_codec_name( + const std::string& codec_name) const { + std::lock_guard lock(mutex_); + for (const auto& [key, source] : sources_) { + if (codec_type_to_sdp_name(source->codec_type()) == codec_name) { + return source; + } + } + return nullptr; +} + +} // namespace livekit_ffi diff --git a/webrtc-sys/src/peer_connection_factory.cpp b/webrtc-sys/src/peer_connection_factory.cpp index 2fe2741cf..7877889b6 100644 --- a/webrtc-sys/src/peer_connection_factory.cpp +++ b/webrtc-sys/src/peer_connection_factory.cpp @@ -120,6 +120,14 @@ std::shared_ptr PeerConnectionFactory::create_video_track( peer_factory_->CreateVideoTrack(source->get(), label.c_str()))); } +std::shared_ptr PeerConnectionFactory::create_video_track_from_encoded_source( + rust::String label, + std::shared_ptr source) const { + return std::static_pointer_cast( + rtc_runtime_->get_or_create_media_stream_track( + peer_factory_->CreateVideoTrack(source->get(), label.c_str()))); +} + std::shared_ptr PeerConnectionFactory::create_audio_track( rust::String label, std::shared_ptr source) const { diff --git a/webrtc-sys/src/peer_connection_factory.rs b/webrtc-sys/src/peer_connection_factory.rs index c18d8331c..9fa79d634 100644 --- a/webrtc-sys/src/peer_connection_factory.rs +++ b/webrtc-sys/src/peer_connection_factory.rs @@ -50,6 +50,7 @@ pub mod ffi { include!("livekit/webrtc.h"); include!("livekit/peer_connection.h"); include!("livekit/audio_track.h"); + include!("livekit/encoded_video_source.h"); type RtcConfiguration = crate::peer_connection::ffi::RtcConfiguration; type PeerConnectionState = crate::peer_connection::ffi::PeerConnectionState; @@ -79,6 +80,8 @@ pub mod ffi { type MediaStreamTrack = crate::media_stream::ffi::MediaStreamTrack; type SessionDescription = crate::jsep::ffi::SessionDescription; type MediaType = crate::webrtc::ffi::MediaType; + type EncodedVideoTrackSource = + crate::encoded_video_source::ffi::EncodedVideoTrackSource; } unsafe extern "C++" { @@ -107,6 +110,12 @@ pub mod ffi { source: SharedPtr, ) -> SharedPtr; + fn create_video_track_from_encoded_source( + self: &PeerConnectionFactory, + label: String, + source: SharedPtr, + ) -> SharedPtr; + fn rtp_sender_capabilities( self: &PeerConnectionFactory, kind: MediaType, diff --git a/webrtc-sys/src/video_encoder_factory.cpp b/webrtc-sys/src/video_encoder_factory.cpp index 7435760b4..e62fbdebf 100644 --- a/webrtc-sys/src/video_encoder_factory.cpp +++ b/webrtc-sys/src/video_encoder_factory.cpp @@ -21,6 +21,7 @@ #include "api/video_codecs/video_encoder.h" #include "api/video_codecs/video_encoder_factory_template.h" #include "livekit/objc_video_factory.h" +#include "livekit/passthrough_video_encoder.h" #include "media/base/media_constants.h" #include "media/engine/simulcast_encoder_adapter.h" #include "rtc_base/logging.h" @@ -146,6 +147,15 @@ VideoEncoderFactory::CodecSupport VideoEncoderFactory::QueryCodecSupport( std::unique_ptr VideoEncoderFactory::Create( const webrtc::Environment& env, const webrtc::SdpVideoFormat& format) { + // Check if there is a registered encoded video source that wants a + // passthrough encoder for this codec type. + auto encoded_source = + EncodedSourceRegistry::instance().find_by_codec_name(format.name); + if (encoded_source) { + RTC_LOG(LS_INFO) << "Creating PassthroughVideoEncoder for " << format.name; + return std::make_unique(encoded_source); + } + std::unique_ptr encoder; if (format.IsCodecInList(internal_factory_->GetSupportedFormats())) { encoder = std::make_unique(