diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 32867d17a..6ce2650fb 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -651,7 +651,8 @@ fn get_livekit_url( .append_pair("protocol", PROTOCOL_VERSION.to_string().as_str()) .append_pair("auto_subscribe", if options.auto_subscribe { "1" } else { "0" }) .append_pair("adaptive_stream", if options.adaptive_stream { "1" } else { "0" }); - + // client_protocol=1 indicates support for RPC compression + .append_pair("client_protocol", "1"); if let Some(sdk_version) = &options.sdk_options.sdk_version { lk_url.query_pairs_mut().append_pair("version", sdk_version.as_str()); } diff --git a/livekit-api/src/signal_client/signal_stream.rs b/livekit-api/src/signal_client/signal_stream.rs index 89c429fdc..4025f492f 100644 --- a/livekit-api/src/signal_client/signal_stream.rs +++ b/livekit-api/src/signal_client/signal_stream.rs @@ -63,6 +63,10 @@ enum InternalMessage { signal: proto::signal_request::Message, response_chn: oneshot::Sender>, }, + RawBytes { + data: Vec, + response_chn: oneshot::Sender>, + }, Pong { ping_data: Vec, }, @@ -348,6 +352,15 @@ impl SignalStream { recv.await.map_err(|_| SignalError::SendError)? } + /// Send raw bytes to the websocket (for JoinRequest, etc.) + /// It also waits for the message to be sent + pub async fn send_raw(&self, data: Vec) -> SignalResult<()> { + let (send, recv) = oneshot::channel(); + let msg = InternalMessage::RawBytes { data, response_chn: send }; + let _ = self.internal_tx.send(msg).await; + recv.await.map_err(|_| SignalError::SendError)? + } + /// This task is used to send messages to the websocket /// It is also responsible for closing the connection async fn write_task( @@ -366,6 +379,14 @@ impl SignalStream { let _ = response_chn.send(Ok(())); } + InternalMessage::RawBytes { data, response_chn } => { + if let Err(err) = ws_writer.send(Message::Binary(data)).await { + let _ = response_chn.send(Err(err.into())); + break; + } + + let _ = response_chn.send(Ok(())); + } InternalMessage::Pong { ping_data } => { if let Err(err) = ws_writer.send(Message::Pong(ping_data)).await { log::error!("failed to send pong message: {:?}", err); diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index 9881ebfa7..91d600cfa 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -44,7 +44,6 @@ semver = "1.0" libloading = { version = "0.8.6" } bytes = { workspace = true } bmrng = "0.5.2" -base64 = "0.22" [dev-dependencies] anyhow = { workspace = true } diff --git a/livekit/src/room/data_stream/incoming.rs b/livekit/src/room/data_stream/incoming.rs index d9b9ecf3b..ff5a025c9 100644 --- a/livekit/src/room/data_stream/incoming.rs +++ b/livekit/src/room/data_stream/incoming.rs @@ -249,10 +249,13 @@ impl IncomingStreamManager { /// Handles an incoming header packet. pub fn handle_header( &self, - header: proto::Header, + mut header: proto::Header, identity: String, encryption_type: livekit_protocol::encryption::Type, ) { + let inline_content = std::mem::take(&mut header.content); + let inline_trailer = header.trailer.take(); + let Ok(info) = AnyStreamInfo::try_from_with_encryption(header, encryption_type.into()) .inspect_err(|e| log::error!("Invalid header: {}", e)) else { @@ -278,15 +281,37 @@ impl IncomingStreamManager { encryption_type: stream_encryption_type, }; inner.open_streams.insert(id, descriptor); + + if !inline_content.is_empty() { + let mut length_exceeded = false; + if let Some(descriptor) = inner.open_streams.get_mut(&id) { + descriptor.progress.bytes_processed += inline_content.len() as u64; + descriptor.progress.chunk_index += 1; + if let Some(total) = descriptor.progress.bytes_total { + length_exceeded = descriptor.progress.bytes_processed > total; + } + } + + if length_exceeded { + inner.close_stream_with_error(&id, StreamError::LengthExceeded); + return; + } + inner.yield_chunk(&id, Bytes::from(inline_content)); + } + + if let Some(trailer) = inline_trailer { + Self::apply_trailer(&mut inner, trailer); + } } /// Handles an incoming chunk packet. pub fn handle_chunk( &self, - chunk: proto::Chunk, + mut chunk: proto::Chunk, encryption_type: livekit_protocol::encryption::Type, ) { - let id = chunk.stream_id; + let id = chunk.stream_id.clone(); + let trailer = chunk.trailer.take(); let mut inner = self.inner.lock(); let Some(descriptor) = inner.open_streams.get_mut(&id) else { return; @@ -313,19 +338,27 @@ impl IncomingStreamManager { return; } inner.yield_chunk(&id, Bytes::from(chunk.content)); + + if let Some(trailer) = trailer { + Self::apply_trailer(&mut inner, trailer); + } // TODO: also yield progress } /// Handles an incoming trailer packet. pub fn handle_trailer(&self, trailer: proto::Trailer) { - let id = trailer.stream_id; let mut inner = self.inner.lock(); + Self::apply_trailer(&mut inner, trailer); + } + + fn apply_trailer(inner: &mut ManagerInner, trailer: proto::Trailer) { + let id = trailer.stream_id; let Some(descriptor) = inner.open_streams.get_mut(&id) else { return; }; if !match descriptor.progress.bytes_total { - Some(total) => descriptor.progress.bytes_processed >= total as u64, + Some(total) => descriptor.progress.bytes_processed >= total, None => true, } { inner.close_stream_with_error(&id, StreamError::Incomplete); diff --git a/livekit/src/room/data_stream/outgoing.rs b/livekit/src/room/data_stream/outgoing.rs index 5a96888d2..5cac1b858 100644 --- a/livekit/src/room/data_stream/outgoing.rs +++ b/livekit/src/room/data_stream/outgoing.rs @@ -137,12 +137,16 @@ struct RawStreamOpenOptions { header: proto::data_stream::Header, destination_identities: Vec, packet_tx: UnboundedRequestSender>, + packet_coalescing_enabled: bool, } struct RawStream { id: String, progress: StreamProgress, is_closed: bool, + pending_header: Option, + pending_chunk: Option, + packet_coalescing_enabled: bool, /// Request channel for sending packets. packet_tx: UnboundedRequestSender>, } @@ -151,19 +155,30 @@ impl RawStream { async fn open(options: RawStreamOpenOptions) -> StreamResult { let id = options.header.stream_id.to_string(); let bytes_total = options.header.total_length; - - let packet = Self::create_header_packet(options.header, options.destination_identities); - Self::send_packet(&options.packet_tx, packet).await?; + let header_packet = + Self::create_header_packet(options.header, options.destination_identities); + let pending_header = if options.packet_coalescing_enabled { + Some(header_packet) + } else { + Self::send_packet(&options.packet_tx, header_packet).await?; + None + }; Ok(Self { id, progress: StreamProgress { bytes_total, ..Default::default() }, is_closed: false, + pending_header, + pending_chunk: None, + packet_coalescing_enabled: options.packet_coalescing_enabled, packet_tx: options.packet_tx, }) } async fn write_chunk(&mut self, bytes: &[u8]) -> StreamResult<()> { + if self.packet_coalescing_enabled { + return self.write_chunk_with_coalescing(bytes).await; + } let packet = Self::create_chunk_packet(&self.id, self.progress.chunk_index, bytes); Self::send_packet(&self.packet_tx, packet).await?; self.progress.bytes_processed += bytes.len() as u64; @@ -175,6 +190,9 @@ impl RawStream { if self.is_closed { Err(StreamError::AlreadyClosed)? } + if self.packet_coalescing_enabled { + return self.close_with_coalescing(reason).await; + } let packet = Self::create_trailer_packet(&self.id, reason); Self::send_packet(&self.packet_tx, packet).await?; self.is_closed = true; @@ -220,6 +238,15 @@ impl RawStream { } } + fn create_chunk_packet_from(chunk: proto::data_stream::Chunk) -> proto::DataPacket { + proto::DataPacket { + kind: proto::data_packet::Kind::Reliable.into(), + participant_identity: String::new(), // populate later + value: Some(livekit_protocol::data_packet::Value::StreamChunk(chunk)), + ..Default::default() + } + } + fn create_trailer_packet(id: &str, reason: Option<&str>) -> proto::DataPacket { let trailer = proto::data_stream::Trailer { stream_id: id.to_string(), @@ -233,6 +260,91 @@ impl RawStream { ..Default::default() } } + + fn create_trailer(id: &str, reason: Option<&str>) -> proto::data_stream::Trailer { + proto::data_stream::Trailer { + stream_id: id.to_string(), + reason: reason.unwrap_or_default().to_owned(), + ..Default::default() + } + } + + async fn write_chunk_with_coalescing(&mut self, bytes: &[u8]) -> StreamResult<()> { + let mut offset = 0; + + if let Some(mut header_packet) = self.pending_header.take() { + if let Some(livekit_protocol::data_packet::Value::StreamHeader(ref mut header)) = + header_packet.value + { + let remaining_header_capacity = CHUNK_SIZE.saturating_sub(header.content.len()); + let inline_len = bytes.len().min(remaining_header_capacity); + if inline_len > 0 { + header.content.extend_from_slice(&bytes[..inline_len]); + self.progress.bytes_processed += inline_len as u64; + if self.progress.chunk_index == 0 { + self.progress.chunk_index += 1; + } + offset = inline_len; + } + } + + // Keep a chance to emit a one-packet open/body/close on close(). + if offset < bytes.len() { + Self::send_packet(&self.packet_tx, header_packet).await?; + } else { + self.pending_header = Some(header_packet); + return Ok(()); + } + } + + while offset < bytes.len() { + let end = (offset + CHUNK_SIZE).min(bytes.len()); + let chunk = proto::data_stream::Chunk { + stream_id: self.id.clone(), + chunk_index: self.progress.chunk_index, + content: bytes[offset..end].to_vec(), + ..Default::default() + }; + self.progress.bytes_processed += (end - offset) as u64; + self.progress.chunk_index += 1; + offset = end; + + if let Some(previous) = self.pending_chunk.replace(chunk) { + Self::send_packet(&self.packet_tx, Self::create_chunk_packet_from(previous)) + .await?; + } + } + + Ok(()) + } + + async fn close_with_coalescing(&mut self, reason: Option<&str>) -> StreamResult<()> { + let trailer = Self::create_trailer(&self.id, reason); + + if let Some(mut header_packet) = self.pending_header.take() { + if let Some(livekit_protocol::data_packet::Value::StreamHeader(ref mut header)) = + header_packet.value + { + header.trailer = Some(trailer); + } + Self::send_packet(&self.packet_tx, header_packet).await?; + self.is_closed = true; + return Ok(()); + } + + if let Some(mut chunk) = self.pending_chunk.take() { + chunk.trailer = Some(trailer); + Self::send_packet(&self.packet_tx, Self::create_chunk_packet_from(chunk)).await?; + self.is_closed = true; + return Ok(()); + } + + // No body was buffered in this mode, fallback to standalone trailer packet. + let packet = Self::create_trailer_packet(&self.id, reason); + Self::send_packet(&self.packet_tx, packet).await?; + self.is_closed = true; + Ok(()) + } } impl Drop for RawStream { @@ -241,7 +353,23 @@ impl Drop for RawStream { if self.is_closed { return; } - let packet = Self::create_trailer_packet(&self.id, None); + let packet = if self.packet_coalescing_enabled { + if let Some(mut header_packet) = self.pending_header.take() { + if let Some(livekit_protocol::data_packet::Value::StreamHeader(ref mut header)) = + header_packet.value + { + header.trailer = Some(Self::create_trailer(&self.id, None)); + } + header_packet + } else if let Some(mut chunk) = self.pending_chunk.take() { + chunk.trailer = Some(Self::create_trailer(&self.id, None)); + Self::create_chunk_packet_from(chunk) + } else { + Self::create_trailer_packet(&self.id, None) + } + } else { + Self::create_trailer_packet(&self.id, None) + }; let packet_tx = self.packet_tx.clone(); tokio::spawn(async move { Self::send_packet(&packet_tx, packet).await }); } @@ -257,6 +385,7 @@ pub struct StreamByteOptions { pub mime_type: Option, pub name: Option, pub total_length: Option, + pub(crate) packet_coalescing_enabled: bool, } /// Options used when opening an outgoing text data stream. @@ -271,6 +400,7 @@ pub struct StreamTextOptions { pub reply_to_stream_id: Option, pub attached_stream_ids: Vec, pub generated: Option, + pub(crate) packet_coalescing_enabled: bool, } #[derive(Clone)] @@ -310,6 +440,7 @@ impl OutgoingStreamManager { header: header.clone(), destination_identities: options.destination_identities, packet_tx: self.packet_tx.clone(), + packet_coalescing_enabled: options.packet_coalescing_enabled, }; let writer = TextStreamWriter { info: Arc::new(TextStreamInfo::from_headers(header, text_header)), @@ -337,6 +468,7 @@ impl OutgoingStreamManager { header: header.clone(), destination_identities: options.destination_identities, packet_tx: self.packet_tx.clone(), + packet_coalescing_enabled: options.packet_coalescing_enabled, }; let writer = ByteStreamWriter { info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)), @@ -373,6 +505,7 @@ impl OutgoingStreamManager { header: header.clone(), destination_identities: options.destination_identities, packet_tx: self.packet_tx.clone(), + packet_coalescing_enabled: options.packet_coalescing_enabled, }; let writer = TextStreamWriter { info: Arc::new(TextStreamInfo::from_headers(header, text_header)), @@ -422,6 +555,7 @@ impl OutgoingStreamManager { header: header.clone(), destination_identities: options.destination_identities, packet_tx: self.packet_tx.clone(), + packet_coalescing_enabled: options.packet_coalescing_enabled, }; let writer = ByteStreamWriter { info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)), @@ -465,6 +599,7 @@ impl OutgoingStreamManager { header: header.clone(), destination_identities: options.destination_identities, packet_tx: self.packet_tx.clone(), + packet_coalescing_enabled: options.packet_coalescing_enabled, }; let writer = ByteStreamWriter { info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)), diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index ffd383e56..fc47189d1 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -521,6 +521,7 @@ impl Room { pi.attributes, e2ee_manager.encryption_type(), pi.permission, + pi.client_protocol, ); let dispatcher = Dispatcher::::default(); @@ -674,6 +675,7 @@ impl Room { pi.metadata, pi.attributes, pi.permission, + pi.client_protocol, ) }; participant.update_info(pi.clone()); @@ -1043,6 +1045,7 @@ impl RoomSession { pi.metadata, pi.attributes, pi.permission, + pi.client_protocol, ) }; @@ -1712,6 +1715,7 @@ impl RoomSession { metadata: String, attributes: HashMap, permission: Option, + client_protocol: i32, ) -> RemoteParticipant { let participant = RemoteParticipant::new( self.rtc_engine.clone(), @@ -1724,6 +1728,7 @@ impl RoomSession { attributes, self.options.auto_subscribe, permission, + client_protocol, ); participant.on_track_published({ @@ -1859,7 +1864,7 @@ impl RoomSession { self.remote_participants.read().values().find(|x| &x.sid() == sid).cloned() } - fn get_participant_by_identity( + pub(crate) fn get_participant_by_identity( &self, identity: &ParticipantIdentity, ) -> Option { diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index c72b5f4c4..4993863dd 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -34,7 +34,9 @@ use crate::{ e2ee::EncryptionType, options::{self, compute_video_encodings, video_layers_from_encodings, TrackPublishOptions}, prelude::*, - room::participant::rpc::{RpcError, RpcErrorCode, RpcInvocationData, MAX_PAYLOAD_BYTES}, + room::participant::rpc::{ + compress_rpc_payload_bytes, RpcError, RpcErrorCode, RpcInvocationData, MAX_PAYLOAD_BYTES, + }, rtc_engine::{EngineError, RtcEngine}, ChatMessage, DataPacket, RoomSession, RpcAck, RpcRequest, RpcResponse, SipDTMF, Transcription, }; @@ -117,6 +119,7 @@ impl LocalParticipant { attributes: HashMap, encryption_type: EncryptionType, permission: Option, + client_protocol: i32, ) -> Self { Self { inner: super::new_inner( @@ -129,6 +132,7 @@ impl LocalParticipant { kind, kind_details, permission, + client_protocol, ), local: Arc::new(LocalInfo { events: LocalEvents::default(), @@ -613,15 +617,62 @@ impl LocalParticipant { .map_err(Into::into) } + /// Check if a remote participant supports RPC compression. + /// Returns true if the participant has client_protocol >= 1. + fn destination_supports_compression(&self, destination_identity: &str) -> bool { + let Some(session) = self.session() else { + return false; + }; + let participant_identity: ParticipantIdentity = destination_identity.to_string().into(); + let Some(participant) = session.get_participant_by_identity(&participant_identity) else { + return false; + }; + participant.client_protocol() >= 1 + } + + fn destinations_support_stream_packet_coalescing( + &self, + destination_identities: &[ParticipantIdentity], + ) -> bool { + if destination_identities.is_empty() { + return false; + } + let Some(session) = self.session() else { + return false; + }; + + destination_identities.iter().all(|participant_identity| { + session + .get_participant_by_identity(participant_identity) + .map(|participant| participant.client_protocol() >= 1) + .unwrap_or(false) + }) + } + async fn publish_rpc_request(&self, rpc_request: RpcRequest) -> RoomResult<()> { + // Compress the payload only if destination supports it + let supports_compression = + self.destination_supports_compression(&rpc_request.destination_identity); + + // Use compressed_payload field (raw bytes) when compression is beneficial + let (payload, compressed_payload) = if supports_compression { + match compress_rpc_payload_bytes(&rpc_request.payload) { + Some(compressed) => (String::new(), compressed), + None => (rpc_request.payload, Vec::new()), + } + } else { + (rpc_request.payload, Vec::new()) + }; + let destination_identities = vec![rpc_request.destination_identity]; + let rpc_request_message = proto::RpcRequest { id: rpc_request.id, method: rpc_request.method, - payload: rpc_request.payload, + payload, + compressed_payload, response_timeout_ms: rpc_request.response_timeout.as_millis() as u32, version: rpc_request.version, - ..Default::default() }; let data = proto::DataPacket { @@ -638,23 +689,40 @@ impl LocalParticipant { } async fn publish_rpc_response(&self, rpc_response: RpcResponse) -> RoomResult<()> { + // Compress the payload only if destination supports it + let supports_compression = + self.destination_supports_compression(&rpc_response.destination_identity); let destination_identities = vec![rpc_response.destination_identity]; - let rpc_response_message = proto::RpcResponse { - request_id: rpc_response.request_id, - value: Some(match rpc_response.error { - Some(error) => proto::rpc_response::Value::Error(proto::RpcError { - code: error.code, - message: error.message, - data: error.data, - }), - None => proto::rpc_response::Value::Payload(rpc_response.payload.unwrap()), + + // Determine the response value (error, compressed payload, or plain payload) + let response_value = match rpc_response.error { + Some(error) => proto::rpc_response::Value::Error(proto::RpcError { + code: error.code, + message: error.message, + data: error.data, }), - ..Default::default() + None => { + let payload = rpc_response.payload.unwrap_or_default(); + if supports_compression { + // Try to compress and use CompressedPayload variant + match compress_rpc_payload_bytes(&payload) { + Some(compressed) => { + proto::rpc_response::Value::CompressedPayload(compressed) + } + None => proto::rpc_response::Value::Payload(payload), + } + } else { + proto::rpc_response::Value::Payload(payload) + } + } }; + let rpc_response_message = + proto::RpcResponse { request_id: rpc_response.request_id, value: Some(response_value) }; + let data = proto::DataPacket { value: Some(proto::data_packet::Value::RpcResponse(rpc_response_message)), - destination_identities: destination_identities.clone(), + destination_identities, ..Default::default() }; @@ -666,13 +734,12 @@ impl LocalParticipant { } async fn publish_rpc_ack(&self, rpc_ack: RpcAck) -> RoomResult<()> { - let destination_identities = vec![rpc_ack.destination_identity]; let rpc_ack_message = proto::RpcAck { request_id: rpc_ack.request_id, ..Default::default() }; let data = proto::DataPacket { value: Some(proto::data_packet::Value::RpcAck(rpc_ack_message)), - destination_identities: destination_identities.clone(), + destination_identities: vec![rpc_ack.destination_identity], ..Default::default() }; @@ -734,6 +801,10 @@ impl LocalParticipant { self.inner.info.read().attributes.clone() } + pub fn client_protocol(&self) -> i32 { + self.inner.info.read().client_protocol + } + pub fn is_speaking(&self) -> bool { self.inner.info.read().speaking } @@ -786,6 +857,13 @@ impl LocalParticipant { let min_effective_timeout = Duration::from_millis(1000); if data.payload.len() > MAX_PAYLOAD_BYTES { + log::error!( + "RPC request payload too large: {} bytes (max: {} bytes), method: {}, destination: {}", + data.payload.len(), + MAX_PAYLOAD_BYTES, + data.method, + data.destination_identity + ); return Err(RpcError::built_in(RpcErrorCode::RequestPayloadTooLarge, None)); } @@ -796,6 +874,13 @@ impl LocalParticipant { let server_version = Version::parse(&server_info.version).unwrap(); let min_required_version = Version::parse("1.8.0").unwrap(); if server_version < min_required_version { + log::error!( + "RPC error code {}: Server version {} does not support RPC (requires >= 1.8.0), method: {}, destination: {}", + RpcErrorCode::UnsupportedServer as u32, + server_info.version, + data.method, + data.destination_identity + ); return Err(RpcError::built_in(RpcErrorCode::UnsupportedServer, None)); } } @@ -839,6 +924,14 @@ impl LocalParticipant { // Wait for ack timeout match tokio::time::timeout(max_round_trip_latency, ack_rx).await { Err(_) => { + log::error!( + "RPC error code {}: Connection timeout waiting for ACK (timeout: {:?}), request_id: {}, method: {}, destination: {}", + RpcErrorCode::ConnectionTimeout as u32, + max_round_trip_latency, + id, + data.method, + data.destination_identity + ); let mut rpc_state = self.local.rpc_state.lock(); rpc_state.pending_acks.remove(&id); rpc_state.pending_responses.remove(&id); @@ -849,9 +942,17 @@ impl LocalParticipant { } } - // Wait for response timout + // Wait for response timeout let response = match tokio::time::timeout(data.response_timeout, response_rx).await { Err(_) => { + log::error!( + "RPC error code {}: Response timeout (timeout: {:?}), request_id: {}, method: {}, destination: {}", + RpcErrorCode::ResponseTimeout as u32, + data.response_timeout, + id, + data.method, + data.destination_identity + ); self.local.rpc_state.lock().pending_responses.remove(&id); return Err(RpcError::built_in(RpcErrorCode::ResponseTimeout, None)); } @@ -861,10 +962,26 @@ impl LocalParticipant { match response { Err(_) => { // Something went wrong locally + log::error!( + "RPC error code {}: Recipient disconnected, request_id: {}, method: {}, destination: {}", + RpcErrorCode::RecipientDisconnected as u32, + id, + data.method, + data.destination_identity + ); Err(RpcError::built_in(RpcErrorCode::RecipientDisconnected, None)) } Ok(Err(e)) => { // RPC error from remote, forward it + log::error!( + "RPC error code {}: {} (from remote), request_id: {}, method: {}, destination: {}, data: {:?}", + e.code, + e.message, + id, + data.method, + data.destination_identity, + e.data + ); Err(e) } Ok(Ok(payload)) => { @@ -911,10 +1028,11 @@ impl LocalParticipant { ) { let mut rpc_state = self.local.rpc_state.lock(); if let Some(tx) = rpc_state.pending_responses.remove(&request_id) { - let _ = tx.send(match error { + let result = match error { Some(e) => Err(RpcError::from_proto(e)), None => Ok(payload.unwrap_or_default()), - }); + }; + let _ = tx.send(result); } else { log::error!("Response received for unexpected RPC request: {}", request_id); } @@ -943,6 +1061,14 @@ impl LocalParticipant { let request_id_2 = request_id.clone(); let response = if version != 1 { + log::error!( + "RPC error code {}: Unsupported RPC version {}, request_id: {}, method: {}, caller: {}", + RpcErrorCode::UnsupportedVersion as u32, + version, + request_id, + method, + caller_identity + ); Err(RpcError::built_in(RpcErrorCode::UnsupportedVersion, None)) } else { let handler = self.local.rpc_state.lock().handlers.get(&method).cloned(); @@ -951,9 +1077,9 @@ impl LocalParticipant { Some(handler) => { match tokio::task::spawn(async move { handler(RpcInvocationData { - request_id: request_id.clone(), - caller_identity: caller_identity.clone(), - payload: payload.clone(), + request_id, + caller_identity, + payload, response_timeout, }) .await @@ -962,12 +1088,27 @@ impl LocalParticipant { { Ok(result) => result, Err(e) => { - log::error!("RPC method handler returned an error: {:?}", e); + log::error!( + "RPC error code {}: Method handler panicked: {:?}, request_id: {}, method: {}", + RpcErrorCode::ApplicationError as u32, + e, + request_id_2, + method + ); Err(RpcError::built_in(RpcErrorCode::ApplicationError, None)) } } } - None => Err(RpcError::built_in(RpcErrorCode::UnsupportedMethod, None)), + None => { + log::error!( + "RPC error code {}: Unsupported method '{}', request_id: {}, caller: {}", + RpcErrorCode::UnsupportedMethod as u32, + method, + request_id_2, + caller_identity_2 + ); + Err(RpcError::built_in(RpcErrorCode::UnsupportedMethod, None)) + } } }; @@ -975,7 +1116,17 @@ impl LocalParticipant { Ok(response_payload) if response_payload.len() <= MAX_PAYLOAD_BYTES => { (Some(response_payload), None) } - Ok(_) => (None, Some(RpcError::built_in(RpcErrorCode::ResponsePayloadTooLarge, None))), + Ok(response_payload) => { + log::error!( + "RPC error code {}: Response payload too large: {} bytes (max: {} bytes), request_id: {}, caller: {}", + RpcErrorCode::ResponsePayloadTooLarge as u32, + response_payload.len(), + MAX_PAYLOAD_BYTES, + request_id_2, + caller_identity_2 + ); + (None, Some(RpcError::built_in(RpcErrorCode::ResponsePayloadTooLarge, None))) + } Err(e) => (None, Some(e.into())), }; @@ -1007,8 +1158,10 @@ impl LocalParticipant { pub async fn send_text( &self, text: &str, - options: StreamTextOptions, + mut options: StreamTextOptions, ) -> StreamResult { + options.packet_coalescing_enabled = + self.destinations_support_stream_packet_coalescing(&options.destination_identities); self.session().unwrap().outgoing_stream_manager.send_text(text, options).await } @@ -1027,8 +1180,10 @@ impl LocalParticipant { pub async fn send_file( &self, path: impl AsRef, - options: StreamByteOptions, + mut options: StreamByteOptions, ) -> StreamResult { + options.packet_coalescing_enabled = + self.destinations_support_stream_packet_coalescing(&options.destination_identities); self.session().unwrap().outgoing_stream_manager.send_file(path, options).await } @@ -1044,8 +1199,10 @@ impl LocalParticipant { pub async fn send_bytes( &self, data: impl AsRef<[u8]>, - options: StreamByteOptions, + mut options: StreamByteOptions, ) -> StreamResult { + options.packet_coalescing_enabled = + self.destinations_support_stream_packet_coalescing(&options.destination_identities); self.session().unwrap().outgoing_stream_manager.send_bytes(data, options).await } @@ -1060,7 +1217,12 @@ impl LocalParticipant { /// * `options` - Configuration options for the text stream, including topic and /// destination participants. /// - pub async fn stream_text(&self, options: StreamTextOptions) -> StreamResult { + pub async fn stream_text( + &self, + mut options: StreamTextOptions, + ) -> StreamResult { + options.packet_coalescing_enabled = + self.destinations_support_stream_packet_coalescing(&options.destination_identities); self.session().unwrap().outgoing_stream_manager.stream_text(options).await } @@ -1075,7 +1237,12 @@ impl LocalParticipant { /// * `options` - Configuration options for the byte stream, including topic and /// destination participants. /// - pub async fn stream_bytes(&self, options: StreamByteOptions) -> StreamResult { + pub async fn stream_bytes( + &self, + mut options: StreamByteOptions, + ) -> StreamResult { + options.packet_coalescing_enabled = + self.destinations_support_stream_packet_coalescing(&options.destination_identities); self.session().unwrap().outgoing_stream_manager.stream_bytes(options).await } diff --git a/livekit/src/room/participant/mod.rs b/livekit/src/room/participant/mod.rs index 9c660dc3a..eb36ed828 100644 --- a/livekit/src/room/participant/mod.rs +++ b/livekit/src/room/participant/mod.rs @@ -91,6 +91,7 @@ impl Participant { pub fn name(self: &Self) -> String; pub fn metadata(self: &Self) -> String; pub fn attributes(self: &Self) -> HashMap; + pub fn client_protocol(self: &Self) -> i32; pub fn is_speaking(self: &Self) -> bool; pub fn audio_level(self: &Self) -> f32; pub fn connection_quality(self: &Self) -> ConnectionQuality; @@ -132,6 +133,8 @@ struct ParticipantInfo { pub kind_details: Vec, pub disconnect_reason: DisconnectReason, pub permission: Option, + /// Client protocol version indicating feature support (e.g., 1 = compression support) + pub client_protocol: i32, } type TrackMutedHandler = Box; @@ -180,6 +183,7 @@ pub(super) fn new_inner( kind: ParticipantKind, kind_details: Vec, permission: Option, + client_protocol: i32, ) -> Arc { Arc::new(ParticipantInner { rtc_engine, @@ -196,6 +200,7 @@ pub(super) fn new_inner( connection_quality: ConnectionQuality::Excellent, disconnect_reason: DisconnectReason::UnknownReason, permission, + client_protocol, }), track_publications: Default::default(), events: Default::default(), @@ -245,6 +250,9 @@ pub(super) fn update_info( cb(participant.clone(), new_info.permission.clone()); } } + + // Update client_protocol + info.client_protocol = new_info.client_protocol; } pub(super) fn set_speaking( diff --git a/livekit/src/room/participant/remote_participant.rs b/livekit/src/room/participant/remote_participant.rs index 9d4245ace..25da872b3 100644 --- a/livekit/src/room/participant/remote_participant.rs +++ b/livekit/src/room/participant/remote_participant.rs @@ -85,6 +85,7 @@ impl RemoteParticipant { attributes: HashMap, auto_subscribe: bool, permission: Option, + client_protocol: i32, ) -> Self { Self { inner: super::new_inner( @@ -97,6 +98,7 @@ impl RemoteParticipant { kind, kind_details, permission, + client_protocol, ), remote: Arc::new(RemoteInfo { events: Default::default(), auto_subscribe }), } @@ -518,6 +520,10 @@ impl RemoteParticipant { self.inner.info.read().attributes.clone() } + pub fn client_protocol(&self) -> i32 { + self.inner.info.read().client_protocol + } + pub fn is_speaking(&self) -> bool { self.inner.info.read().speaking } diff --git a/livekit/src/room/participant/rpc.rs b/livekit/src/room/participant/rpc.rs index b04691dda..6e4b8c261 100644 --- a/livekit/src/room/participant/rpc.rs +++ b/livekit/src/room/participant/rpc.rs @@ -146,6 +146,54 @@ impl RpcError { /// Maximum payload size in bytes pub const MAX_PAYLOAD_BYTES: usize = 15360; // 15 KB +/// Minimum payload size to trigger compression (1 KB) +pub const COMPRESSION_THRESHOLD_BYTES: usize = 1024; + +/// Compress an RPC payload to raw bytes using Zstd. +/// Returns Some(compressed_bytes) if compression is beneficial, None otherwise. +/// This is used with the new `compressed_payload` proto field (no base64 overhead). +pub fn compress_rpc_payload_bytes(payload: &str) -> Option> { + use std::io::Cursor; + + let payload_bytes = payload.as_bytes(); + + // Only compress if payload is large enough + if payload_bytes.len() < COMPRESSION_THRESHOLD_BYTES { + return None; + } + + // Compress the payload + match zstd::encode_all(Cursor::new(payload_bytes), 3) { + Ok(compressed) => { + // Only use compressed version if it's actually smaller + if compressed.len() < payload_bytes.len() { + return Some(compressed); + } + // Compression didn't help + None + } + Err(e) => { + log::warn!("Failed to compress RPC payload: {}", e); + None + } + } +} + +/// Decompress raw bytes RPC payload using Zstd. +/// Returns the decompressed string payload. +/// This is used with the new `compressed_payload` proto field. +pub fn decompress_rpc_payload_bytes(compressed: &[u8]) -> Result { + use std::io::Cursor; + + match zstd::decode_all(Cursor::new(compressed)) { + Ok(decompressed) => match String::from_utf8(decompressed) { + Ok(s) => Ok(s), + Err(e) => Err(format!("Failed to decode decompressed RPC payload as UTF-8: {}", e)), + }, + Err(e) => Err(format!("Failed to decompress RPC payload: {}", e)), + } +} + /// Calculate the byte length of a string pub(crate) fn byte_length(s: &str) -> usize { s.as_bytes().len() diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 8337a7f5b..3684c3bfa 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -39,6 +39,7 @@ use tokio::sync::{mpsc, oneshot, watch, Notify}; use super::{rtc_events, EngineError, EngineOptions, EngineResult, SimulateScenario}; use crate::{ id::ParticipantIdentity, + room::participant::decompress_rpc_payload_bytes, utils::{ ttl_map::TtlMap, tx_queue::{TxQueue, TxQueueItem}, @@ -1295,11 +1296,23 @@ impl SessionInner { } proto::data_packet::Value::RpcRequest(rpc_request) => { let caller_identity = participant_identity; + // Check for compressed_payload first, fall back to regular payload + let payload = if !rpc_request.compressed_payload.is_empty() { + match decompress_rpc_payload_bytes(&rpc_request.compressed_payload) { + Ok(decompressed) => decompressed, + Err(e) => { + log::error!("Failed to decompress RPC request payload: {}", e); + rpc_request.payload + } + } + } else { + rpc_request.payload + }; self.emitter.send(SessionEvent::RpcRequest { caller_identity, request_id: rpc_request.id.clone(), method: rpc_request.method, - payload: rpc_request.payload, + payload, response_timeout: Duration::from_millis(rpc_request.response_timeout_ms as u64), version: rpc_request.version, }) @@ -1308,6 +1321,15 @@ impl SessionInner { let (payload, error) = match rpc_response.value { None => (None, None), Some(proto::rpc_response::Value::Payload(payload)) => (Some(payload), None), + Some(proto::rpc_response::Value::CompressedPayload(compressed)) => { + match decompress_rpc_payload_bytes(&compressed) { + Ok(decompressed) => (Some(decompressed), None), + Err(e) => { + log::error!("Failed to decompress RPC response payload: {}", e); + (None, None) + } + } + } Some(proto::rpc_response::Value::Error(err)) => (None, Some(err)), }; self.emitter.send(SessionEvent::RpcResponse {