Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,8 @@ fn get_livekit_url(
.append_pair("protocol", PROTOCOL_VERSION.to_string().as_str())
.append_pair("auto_subscribe", if options.auto_subscribe { "1" } else { "0" })
.append_pair("adaptive_stream", if options.adaptive_stream { "1" } else { "0" });

// client_protocol=1 indicates support for RPC compression
.append_pair("client_protocol", "1");
if let Some(sdk_version) = &options.sdk_options.sdk_version {
lk_url.query_pairs_mut().append_pair("version", sdk_version.as_str());
}
Expand Down
21 changes: 21 additions & 0 deletions livekit-api/src/signal_client/signal_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ enum InternalMessage {
signal: proto::signal_request::Message,
response_chn: oneshot::Sender<SignalResult<()>>,
},
RawBytes {
data: Vec<u8>,
response_chn: oneshot::Sender<SignalResult<()>>,
},
Pong {
ping_data: Vec<u8>,
},
Expand Down Expand Up @@ -348,6 +352,15 @@ impl SignalStream {
recv.await.map_err(|_| SignalError::SendError)?
}

/// Send raw bytes to the websocket (for JoinRequest, etc.)
/// It also waits for the message to be sent
pub async fn send_raw(&self, data: Vec<u8>) -> SignalResult<()> {
let (send, recv) = oneshot::channel();
let msg = InternalMessage::RawBytes { data, response_chn: send };
let _ = self.internal_tx.send(msg).await;
recv.await.map_err(|_| SignalError::SendError)?
}

/// This task is used to send messages to the websocket
/// It is also responsible for closing the connection
async fn write_task(
Expand All @@ -366,6 +379,14 @@ impl SignalStream {

let _ = response_chn.send(Ok(()));
}
InternalMessage::RawBytes { data, response_chn } => {
if let Err(err) = ws_writer.send(Message::Binary(data)).await {
let _ = response_chn.send(Err(err.into()));
break;
}

let _ = response_chn.send(Ok(()));
}
InternalMessage::Pong { ping_data } => {
if let Err(err) = ws_writer.send(Message::Pong(ping_data)).await {
log::error!("failed to send pong message: {:?}", err);
Expand Down
1 change: 0 additions & 1 deletion livekit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ semver = "1.0"
libloading = { version = "0.8.6" }
bytes = { workspace = true }
bmrng = "0.5.2"
base64 = "0.22"

[dev-dependencies]
anyhow = { workspace = true }
Expand Down
43 changes: 38 additions & 5 deletions livekit/src/room/data_stream/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,13 @@ impl IncomingStreamManager {
/// Handles an incoming header packet.
pub fn handle_header(
&self,
header: proto::Header,
mut header: proto::Header,
identity: String,
encryption_type: livekit_protocol::encryption::Type,
) {
let inline_content = std::mem::take(&mut header.content);
let inline_trailer = header.trailer.take();

let Ok(info) = AnyStreamInfo::try_from_with_encryption(header, encryption_type.into())
.inspect_err(|e| log::error!("Invalid header: {}", e))
else {
Expand All @@ -278,15 +281,37 @@ impl IncomingStreamManager {
encryption_type: stream_encryption_type,
};
inner.open_streams.insert(id, descriptor);

if !inline_content.is_empty() {
let mut length_exceeded = false;
if let Some(descriptor) = inner.open_streams.get_mut(&id) {
descriptor.progress.bytes_processed += inline_content.len() as u64;
descriptor.progress.chunk_index += 1;
if let Some(total) = descriptor.progress.bytes_total {
length_exceeded = descriptor.progress.bytes_processed > total;
}
}

if length_exceeded {
inner.close_stream_with_error(&id, StreamError::LengthExceeded);
return;
}
inner.yield_chunk(&id, Bytes::from(inline_content));
}

if let Some(trailer) = inline_trailer {
Self::apply_trailer(&mut inner, trailer);
}
}

/// Handles an incoming chunk packet.
pub fn handle_chunk(
&self,
chunk: proto::Chunk,
mut chunk: proto::Chunk,
encryption_type: livekit_protocol::encryption::Type,
) {
let id = chunk.stream_id;
let id = chunk.stream_id.clone();
let trailer = chunk.trailer.take();
let mut inner = self.inner.lock();
let Some(descriptor) = inner.open_streams.get_mut(&id) else {
return;
Expand All @@ -313,19 +338,27 @@ impl IncomingStreamManager {
return;
}
inner.yield_chunk(&id, Bytes::from(chunk.content));

if let Some(trailer) = trailer {
Self::apply_trailer(&mut inner, trailer);
}
// TODO: also yield progress
}

/// Handles an incoming trailer packet.
pub fn handle_trailer(&self, trailer: proto::Trailer) {
let id = trailer.stream_id;
let mut inner = self.inner.lock();
Self::apply_trailer(&mut inner, trailer);
}

fn apply_trailer(inner: &mut ManagerInner, trailer: proto::Trailer) {
let id = trailer.stream_id;
let Some(descriptor) = inner.open_streams.get_mut(&id) else {
return;
};

if !match descriptor.progress.bytes_total {
Some(total) => descriptor.progress.bytes_processed >= total as u64,
Some(total) => descriptor.progress.bytes_processed >= total,
None => true,
} {
inner.close_stream_with_error(&id, StreamError::Incomplete);
Expand Down
143 changes: 139 additions & 4 deletions livekit/src/room/data_stream/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,16 @@ struct RawStreamOpenOptions {
header: proto::data_stream::Header,
destination_identities: Vec<ParticipantIdentity>,
packet_tx: UnboundedRequestSender<proto::DataPacket, Result<(), EngineError>>,
packet_coalescing_enabled: bool,
}

struct RawStream {
id: String,
progress: StreamProgress,
is_closed: bool,
pending_header: Option<proto::DataPacket>,
pending_chunk: Option<proto::data_stream::Chunk>,
packet_coalescing_enabled: bool,
/// Request channel for sending packets.
packet_tx: UnboundedRequestSender<proto::DataPacket, Result<(), EngineError>>,
}
Expand All @@ -151,19 +155,30 @@ impl RawStream {
async fn open(options: RawStreamOpenOptions) -> StreamResult<Self> {
let id = options.header.stream_id.to_string();
let bytes_total = options.header.total_length;

let packet = Self::create_header_packet(options.header, options.destination_identities);
Self::send_packet(&options.packet_tx, packet).await?;
let header_packet =
Self::create_header_packet(options.header, options.destination_identities);
let pending_header = if options.packet_coalescing_enabled {
Some(header_packet)
} else {
Self::send_packet(&options.packet_tx, header_packet).await?;
None
};

Ok(Self {
id,
progress: StreamProgress { bytes_total, ..Default::default() },
is_closed: false,
pending_header,
pending_chunk: None,
packet_coalescing_enabled: options.packet_coalescing_enabled,
packet_tx: options.packet_tx,
})
}

async fn write_chunk(&mut self, bytes: &[u8]) -> StreamResult<()> {
if self.packet_coalescing_enabled {
return self.write_chunk_with_coalescing(bytes).await;
}
let packet = Self::create_chunk_packet(&self.id, self.progress.chunk_index, bytes);
Self::send_packet(&self.packet_tx, packet).await?;
self.progress.bytes_processed += bytes.len() as u64;
Expand All @@ -175,6 +190,9 @@ impl RawStream {
if self.is_closed {
Err(StreamError::AlreadyClosed)?
}
if self.packet_coalescing_enabled {
return self.close_with_coalescing(reason).await;
}
let packet = Self::create_trailer_packet(&self.id, reason);
Self::send_packet(&self.packet_tx, packet).await?;
self.is_closed = true;
Expand Down Expand Up @@ -220,6 +238,15 @@ impl RawStream {
}
}

fn create_chunk_packet_from(chunk: proto::data_stream::Chunk) -> proto::DataPacket {
proto::DataPacket {
kind: proto::data_packet::Kind::Reliable.into(),
participant_identity: String::new(), // populate later
value: Some(livekit_protocol::data_packet::Value::StreamChunk(chunk)),
..Default::default()
}
}

fn create_trailer_packet(id: &str, reason: Option<&str>) -> proto::DataPacket {
let trailer = proto::data_stream::Trailer {
stream_id: id.to_string(),
Expand All @@ -233,6 +260,91 @@ impl RawStream {
..Default::default()
}
}

fn create_trailer(id: &str, reason: Option<&str>) -> proto::data_stream::Trailer {
proto::data_stream::Trailer {
stream_id: id.to_string(),
reason: reason.unwrap_or_default().to_owned(),
..Default::default()
}
}

async fn write_chunk_with_coalescing(&mut self, bytes: &[u8]) -> StreamResult<()> {
let mut offset = 0;

if let Some(mut header_packet) = self.pending_header.take() {
if let Some(livekit_protocol::data_packet::Value::StreamHeader(ref mut header)) =
header_packet.value
{
let remaining_header_capacity = CHUNK_SIZE.saturating_sub(header.content.len());
let inline_len = bytes.len().min(remaining_header_capacity);
if inline_len > 0 {
header.content.extend_from_slice(&bytes[..inline_len]);
self.progress.bytes_processed += inline_len as u64;
if self.progress.chunk_index == 0 {
self.progress.chunk_index += 1;
}
offset = inline_len;
}
}

// Keep a chance to emit a one-packet open/body/close on close().
if offset < bytes.len() {
Self::send_packet(&self.packet_tx, header_packet).await?;
} else {
self.pending_header = Some(header_packet);
return Ok(());
}
}

while offset < bytes.len() {
let end = (offset + CHUNK_SIZE).min(bytes.len());
let chunk = proto::data_stream::Chunk {
stream_id: self.id.clone(),
chunk_index: self.progress.chunk_index,
content: bytes[offset..end].to_vec(),
..Default::default()
};
self.progress.bytes_processed += (end - offset) as u64;
self.progress.chunk_index += 1;
offset = end;

if let Some(previous) = self.pending_chunk.replace(chunk) {
Self::send_packet(&self.packet_tx, Self::create_chunk_packet_from(previous))
.await?;
}
}

Ok(())
}

async fn close_with_coalescing(&mut self, reason: Option<&str>) -> StreamResult<()> {
let trailer = Self::create_trailer(&self.id, reason);

if let Some(mut header_packet) = self.pending_header.take() {
if let Some(livekit_protocol::data_packet::Value::StreamHeader(ref mut header)) =
header_packet.value
{
header.trailer = Some(trailer);
}
Self::send_packet(&self.packet_tx, header_packet).await?;
self.is_closed = true;
return Ok(());
}

if let Some(mut chunk) = self.pending_chunk.take() {
chunk.trailer = Some(trailer);
Self::send_packet(&self.packet_tx, Self::create_chunk_packet_from(chunk)).await?;
self.is_closed = true;
return Ok(());
}

// No body was buffered in this mode, fallback to standalone trailer packet.
let packet = Self::create_trailer_packet(&self.id, reason);
Self::send_packet(&self.packet_tx, packet).await?;
self.is_closed = true;
Ok(())
}
}

impl Drop for RawStream {
Expand All @@ -241,7 +353,23 @@ impl Drop for RawStream {
if self.is_closed {
return;
}
let packet = Self::create_trailer_packet(&self.id, None);
let packet = if self.packet_coalescing_enabled {
if let Some(mut header_packet) = self.pending_header.take() {
if let Some(livekit_protocol::data_packet::Value::StreamHeader(ref mut header)) =
header_packet.value
{
header.trailer = Some(Self::create_trailer(&self.id, None));
}
header_packet
} else if let Some(mut chunk) = self.pending_chunk.take() {
chunk.trailer = Some(Self::create_trailer(&self.id, None));
Self::create_chunk_packet_from(chunk)
} else {
Self::create_trailer_packet(&self.id, None)
}
} else {
Self::create_trailer_packet(&self.id, None)
};
let packet_tx = self.packet_tx.clone();
tokio::spawn(async move { Self::send_packet(&packet_tx, packet).await });
}
Expand All @@ -257,6 +385,7 @@ pub struct StreamByteOptions {
pub mime_type: Option<String>,
pub name: Option<String>,
pub total_length: Option<u64>,
pub(crate) packet_coalescing_enabled: bool,
}

/// Options used when opening an outgoing text data stream.
Expand All @@ -271,6 +400,7 @@ pub struct StreamTextOptions {
pub reply_to_stream_id: Option<String>,
pub attached_stream_ids: Vec<String>,
pub generated: Option<bool>,
pub(crate) packet_coalescing_enabled: bool,
}

#[derive(Clone)]
Expand Down Expand Up @@ -310,6 +440,7 @@ impl OutgoingStreamManager {
header: header.clone(),
destination_identities: options.destination_identities,
packet_tx: self.packet_tx.clone(),
packet_coalescing_enabled: options.packet_coalescing_enabled,
};
let writer = TextStreamWriter {
info: Arc::new(TextStreamInfo::from_headers(header, text_header)),
Expand Down Expand Up @@ -337,6 +468,7 @@ impl OutgoingStreamManager {
header: header.clone(),
destination_identities: options.destination_identities,
packet_tx: self.packet_tx.clone(),
packet_coalescing_enabled: options.packet_coalescing_enabled,
};
let writer = ByteStreamWriter {
info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)),
Expand Down Expand Up @@ -373,6 +505,7 @@ impl OutgoingStreamManager {
header: header.clone(),
destination_identities: options.destination_identities,
packet_tx: self.packet_tx.clone(),
packet_coalescing_enabled: options.packet_coalescing_enabled,
};
let writer = TextStreamWriter {
info: Arc::new(TextStreamInfo::from_headers(header, text_header)),
Expand Down Expand Up @@ -422,6 +555,7 @@ impl OutgoingStreamManager {
header: header.clone(),
destination_identities: options.destination_identities,
packet_tx: self.packet_tx.clone(),
packet_coalescing_enabled: options.packet_coalescing_enabled,
};
let writer = ByteStreamWriter {
info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)),
Expand Down Expand Up @@ -465,6 +599,7 @@ impl OutgoingStreamManager {
header: header.clone(),
destination_identities: options.destination_identities,
packet_tx: self.packet_tx.clone(),
packet_coalescing_enabled: options.packet_coalescing_enabled,
};
let writer = ByteStreamWriter {
info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)),
Expand Down
Loading
Loading