Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ console-subscriber = "0.1"
env_logger = "0.11"
futures = "0.3"
futures-util = { version = "0.3", default-features = false }
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] }
lazy_static = "1.4"
log = "0.4"
parking_lot = "0.12"
Expand Down
6 changes: 5 additions & 1 deletion livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ fn create_join_request_param(
sdk: proto::client_info::Sdk::Rust as i32,
version: options.sdk_options.sdk_version.clone().unwrap_or_default(),
protocol: PROTOCOL_VERSION as i32,
client_protocol: proto::RPC_GZIP_CLIENT_PROTOCOL,
os: std::env::consts::OS.to_string(),
..Default::default()
};
Expand Down Expand Up @@ -644,13 +645,16 @@ fn get_livekit_url(
create_join_request_param(options, reconnect, reconnect_reason, participant_sid);
lk_url.query_pairs_mut().append_pair("join_request", &join_request_param);
} else {
let client_protocol = proto::RPC_GZIP_CLIENT_PROTOCOL.to_string();
// For v0 path (dual PC mode): use URL query parameters
lk_url
.query_pairs_mut()
.append_pair("sdk", options.sdk_options.sdk.as_str())
.append_pair("protocol", PROTOCOL_VERSION.to_string().as_str())
.append_pair("auto_subscribe", if options.auto_subscribe { "1" } else { "0" })
.append_pair("adaptive_stream", if options.adaptive_stream { "1" } else { "0" });
.append_pair("adaptive_stream", if options.adaptive_stream { "1" } else { "0" })
// `client_protocol=1` indicates support for gzip RPC compression.
.append_pair("client_protocol", &client_protocol);

if let Some(sdk_version) = &options.sdk_options.sdk_version {
lk_url.query_pairs_mut().append_pair("version", sdk_version.as_str());
Expand Down
21 changes: 21 additions & 0 deletions livekit-api/src/signal_client/signal_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ enum InternalMessage {
signal: proto::signal_request::Message,
response_chn: oneshot::Sender<SignalResult<()>>,
},
RawBytes {
data: Vec<u8>,
response_chn: oneshot::Sender<SignalResult<()>>,
},
Pong {
ping_data: Vec<u8>,
},
Expand Down Expand Up @@ -348,6 +352,15 @@ impl SignalStream {
recv.await.map_err(|_| SignalError::SendError)?
}

/// Send raw bytes to the websocket (for JoinRequest, etc.)
/// It also waits for the message to be sent
pub async fn send_raw(&self, data: Vec<u8>) -> SignalResult<()> {
let (send, recv) = oneshot::channel();
let msg = InternalMessage::RawBytes { data, response_chn: send };
let _ = self.internal_tx.send(msg).await;
recv.await.map_err(|_| SignalError::SendError)?
}

/// This task is used to send messages to the websocket
/// It is also responsible for closing the connection
async fn write_task(
Expand All @@ -366,6 +379,14 @@ impl SignalStream {

let _ = response_chn.send(Ok(()));
}
InternalMessage::RawBytes { data, response_chn } => {
if let Err(err) = ws_writer.send(Message::Binary(data)).await {
let _ = response_chn.send(Err(err.into()));
break;
}

let _ = response_chn.send(Ok(()));
}
InternalMessage::Pong { ping_data } => {
if let Err(err) = ws_writer.send(Message::Pong(ping_data)).await {
log::error!("failed to send pong message: {:?}", err);
Expand Down
3 changes: 3 additions & 0 deletions livekit-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub mod enum_dispatch;
pub mod observer;
pub mod promise;

/// `client_protocol=1` indicates support for RPC `compressed_payload` using gzip.
pub const RPC_GZIP_CLIENT_PROTOCOL: i32 = 1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are you thinking about adding additional features here?

Would those get added as separate consts ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is one option, so that we can put clear comments on the previous supported features and make them compatible ?
what do you think ? any better option here ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion, for the existing protocol we are missing documentation about what exactly each version represents so I'm generally in favour of anything that improves upon that situation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree. probably the documentation should be done in both the proto and sdks.


include!("livekit.rs");

#[cfg(feature = "serde")]
Expand Down
73 changes: 9 additions & 64 deletions livekit-protocol/src/livekit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,9 @@ pub struct RpcRequest {
pub response_timeout_ms: u32,
#[prost(uint32, tag="5")]
pub version: u32,
/// Compressed payload data. When set, this field is used instead of `payload`.
#[prost(bytes="vec", tag="6")]
pub compressed_payload: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand All @@ -958,7 +961,7 @@ pub struct RpcAck {
pub struct RpcResponse {
#[prost(string, tag="1")]
pub request_id: ::prost::alloc::string::String,
#[prost(oneof="rpc_response::Value", tags="2, 3")]
#[prost(oneof="rpc_response::Value", tags="2, 3, 4")]
pub value: ::core::option::Option<rpc_response::Value>,
}
/// Nested message and enum types in `RpcResponse`.
Expand All @@ -970,6 +973,9 @@ pub mod rpc_response {
Payload(::prost::alloc::string::String),
#[prost(message, tag="3")]
Error(super::RpcError),
/// Compressed payload data. When set, this field is used instead of `payload`.
#[prost(bytes, tag="4")]
CompressedPayload(::prost::alloc::vec::Vec<u8>),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -4243,7 +4249,7 @@ pub struct JobState {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct WorkerMessage {
#[prost(oneof="worker_message::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9")]
#[prost(oneof="worker_message::Message", tags="1, 2, 3, 4, 5, 6, 7")]
pub message: ::core::option::Option<worker_message::Message>,
}
/// Nested message and enum types in `WorkerMessage`.
Expand All @@ -4269,17 +4275,13 @@ pub mod worker_message {
SimulateJob(super::SimulateJobRequest),
#[prost(message, tag="7")]
MigrateJob(super::MigrateJobRequest),
#[prost(message, tag="8")]
TextResponse(super::TextMessageResponse),
#[prost(message, tag="9")]
PushText(super::PushTextRequest),
}
}
/// from Server to Worker
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ServerMessage {
#[prost(oneof="server_message::Message", tags="1, 2, 3, 5, 4, 6")]
#[prost(oneof="server_message::Message", tags="1, 2, 3, 5, 4")]
pub message: ::core::option::Option<server_message::Message>,
}
/// Nested message and enum types in `ServerMessage`.
Expand All @@ -4299,8 +4301,6 @@ pub mod server_message {
Termination(super::JobTermination),
#[prost(message, tag="4")]
Pong(super::WorkerPong),
#[prost(message, tag="6")]
TextRequest(super::TextMessageRequest),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -4430,61 +4430,6 @@ pub struct JobTermination {
#[prost(string, tag="1")]
pub job_id: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AgentSessionState {
#[prost(uint64, tag="1")]
pub version: u64,
#[prost(oneof="agent_session_state::Data", tags="2, 3")]
pub data: ::core::option::Option<agent_session_state::Data>,
}
/// Nested message and enum types in `AgentSessionState`.
pub mod agent_session_state {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Data {
#[prost(bytes, tag="2")]
Snapshot(::prost::alloc::vec::Vec<u8>),
#[prost(bytes, tag="3")]
Delta(::prost::alloc::vec::Vec<u8>),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TextMessageRequest {
#[prost(string, tag="1")]
pub message_id: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub session_id: ::prost::alloc::string::String,
#[prost(string, tag="3")]
pub agent_name: ::prost::alloc::string::String,
#[prost(string, tag="4")]
pub metadata: ::prost::alloc::string::String,
#[prost(message, optional, tag="5")]
pub session_state: ::core::option::Option<AgentSessionState>,
#[prost(string, tag="6")]
pub text: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PushTextRequest {
/// The message_id of the TextMessageRequest that this push is for
#[prost(string, tag="1")]
pub message_id: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub content: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TextMessageResponse {
/// Indicate the request is completed
#[prost(string, tag="1")]
pub message_id: ::prost::alloc::string::String,
#[prost(message, optional, tag="2")]
pub session_state: ::core::option::Option<AgentSessionState>,
#[prost(string, tag="3")]
pub error: ::prost::alloc::string::String,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum JobType {
Expand Down
Loading
Loading