Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions livekit-ffi-node-bindings/src/proto/room_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ proto2.util.setEnumType(ContinualGatheringPolicy, "livekit.proto.ContinualGather
]);

/**
*
* Room
*
*
* @generated from enum livekit.proto.ConnectionQuality
*/
export enum ConnectionQuality {
Expand Down Expand Up @@ -2471,6 +2475,11 @@ export class RoomOptions extends Message<RoomOptions> {
*/
encryption?: E2eeOptions;

/**
* @generated from field: repeated string registered_rpc_methods = 8;
*/
registeredRpcMethods: string[] = [];

constructor(data?: PartialMessage<RoomOptions>) {
super();
proto2.util.initPartial(data, this);
Expand All @@ -2486,6 +2495,7 @@ export class RoomOptions extends Message<RoomOptions> {
{ no: 5, name: "rtc_config", kind: "message", T: RtcConfig, opt: true },
{ no: 6, name: "join_retries", kind: "scalar", T: 13 /* ScalarType.UINT32 */, opt: true },
{ no: 7, name: "encryption", kind: "message", T: E2eeOptions, opt: true },
{ no: 8, name: "registered_rpc_methods", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): RoomOptions {
Expand Down
2 changes: 1 addition & 1 deletion livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,12 @@ message RoomOptions {
optional RtcConfig rtc_config = 5; // allow to setup a custom RtcConfiguration
optional uint32 join_retries = 6;
optional E2eeOptions encryption = 7;
repeated string registered_rpc_methods = 8;
}

//
// Room
//

enum ConnectionQuality {
QUALITY_POOR = 0;
QUALITY_GOOD = 1;
Expand Down
17 changes: 16 additions & 1 deletion livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,27 @@ struct FfiSipDtmfPacket {
impl FfiRoom {
pub fn connect(
server: &'static FfiServer,
connect: proto::ConnectRequest,
mut connect: proto::ConnectRequest,
) -> proto::ConnectResponse {
let async_id = server.resolve_async_id(connect.request_async_id);

let req = connect.clone();

// TODO: move this conversion
let rpc_handler_registry =
std::mem::take(&mut connect.options.registered_rpc_methods).into_iter().fold(
livekit::participant::RpcHandlerRegistry::default(),
|mut registry, name| {
registry.register(name, async |invocation| {
// TODO: delegate to handler (probably via static), error if called early.
todo!()
});
registry
},
);

let mut options: RoomOptions = connect.options.into();
options.rpc_handlers = rpc_handler_registry;

{
let config = server.config.lock();
Expand Down
7 changes: 4 additions & 3 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ pub use self::{
};
pub use crate::rtc_engine::SimulateScenario;
use crate::{
participant::ConnectionQuality,
participant::{ConnectionQuality, RpcHandlerRegistry},
prelude::*,
registered_audio_filter_plugins,
rtc_engine::{
EngineError, EngineEvent, EngineEvents, EngineOptions, EngineResult, RtcEngine,
SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD,
EngineError, EngineEvent, EngineEvents, EngineOptions, EngineResult, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD, RtcEngine, SessionStats
},
};

Expand Down Expand Up @@ -368,6 +367,7 @@ pub struct RoomOptions {
pub rtc_config: RtcConfiguration,
pub join_retries: u32,
pub sdk_options: RoomSdkOptions,
pub rpc_handlers: RpcHandlerRegistry
}

impl Default for RoomOptions {
Expand All @@ -388,6 +388,7 @@ impl Default for RoomOptions {
},
join_retries: 3,
sdk_options: RoomSdkOptions::default(),
rpc_handlers: RpcHandlerRegistry::default()
}
}
}
Expand Down
20 changes: 7 additions & 13 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,10 @@ use super::{
ParticipantTrackPermission,
};
use crate::{
data_stream::{
ChatMessage, DataPacket, RoomSession, RpcAck, RpcRequest, RpcResponse, SipDTMF, Transcription, data_stream::{
ByteStreamInfo, ByteStreamWriter, StreamByteOptions, StreamResult, StreamTextOptions,
TextStreamInfo, TextStreamWriter,
},
e2ee::EncryptionType,
options::{self, compute_video_encodings, video_layers_from_encodings, TrackPublishOptions},
prelude::*,
room::participant::rpc::{RpcError, RpcErrorCode, RpcInvocationData, MAX_PAYLOAD_BYTES},
rtc_engine::{EngineError, RtcEngine},
ChatMessage, DataPacket, RoomSession, RpcAck, RpcRequest, RpcResponse, SipDTMF, Transcription,
}, e2ee::EncryptionType, options::{self, TrackPublishOptions, compute_video_encodings, video_layers_from_encodings}, participant::RpcHandlerRegistry, prelude::*, room::participant::rpc::{MAX_PAYLOAD_BYTES, RpcError, RpcErrorCode, RpcInvocationData}, rtc_engine::{EngineError, RtcEngine}
};
use chrono::Utc;
use libwebrtc::{native::create_random_uuid, rtp_parameters::RtpEncodingParameters};
Expand Down Expand Up @@ -68,15 +62,15 @@ struct LocalEvents {
struct RpcState {
pending_acks: HashMap<String, oneshot::Sender<()>>,
pending_responses: HashMap<String, oneshot::Sender<Result<String, RpcError>>>,
handlers: HashMap<String, RpcHandler>,
handlers: RpcHandlerRegistry
}

impl RpcState {
fn new() -> Self {
Self {
pending_acks: HashMap::new(),
pending_responses: HashMap::new(),
handlers: HashMap::new(),
handlers: Default::default()
}
}
}
Expand Down Expand Up @@ -867,11 +861,11 @@ impl LocalParticipant {
+ Sync
+ 'static,
) {
self.local.rpc_state.lock().handlers.insert(method, Arc::new(handler));
self.local.rpc_state.lock().handlers.register(method, handler);
}

pub fn unregister_rpc_method(&self, method: String) {
self.local.rpc_state.lock().handlers.remove(&method);
self.local.rpc_state.lock().handlers.unregister(&method);
}

pub(crate) fn handle_incoming_rpc_ack(&self, request_id: String) {
Expand Down Expand Up @@ -925,7 +919,7 @@ impl LocalParticipant {
let response = if version != 1 {
Err(RpcError::built_in(RpcErrorCode::UnsupportedVersion, None))
} else {
let handler = self.local.rpc_state.lock().handlers.get(&method).cloned();
let handler = self.local.rpc_state.lock().handlers.get(&method);

match handler {
Some(handler) => {
Expand Down
53 changes: 52 additions & 1 deletion livekit/src/room/participant/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
// limitations under the License.

use crate::room::participant::ParticipantIdentity;
use core::fmt;
use futures_util::future::BoxFuture;
use livekit_protocol::RpcError as RpcError_Proto;
use std::{error::Error, fmt::Display, time::Duration};
use std::{
collections::HashMap, error::Error, fmt::Display, future::Future, sync::Arc, time::Duration,
};

/// Parameters for performing an RPC call
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -166,3 +170,50 @@ pub(crate) fn truncate_bytes(s: &str, max_bytes: usize) -> String {
}
result
}

type RpcHandler =
Arc<dyn Fn(RpcInvocationData) -> BoxFuture<'static, Result<String, RpcError>> + Send + Sync>;

#[derive(Default, Clone)]
pub struct RpcHandlerRegistry {
inner: HashMap<String, RpcHandler>,
}

impl RpcHandlerRegistry {
pub fn register<H, Fut>(&mut self, method: impl Into<String>, handler: H)
where
H: for<'a> Fn(RpcInvocationData) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<String, RpcError>> + Send + 'static,
{
self.inner.insert(
method.into(),
Arc::new(move |invocation| Box::pin(handler(invocation)) as BoxFuture<_>),
);
}

pub fn unregister(&mut self, method: impl Into<String>) {
self.inner.remove(&method.into());
}

pub(crate) fn get(&self, method: &str) -> Option<RpcHandler> {
self.inner.get(method).cloned().into()
}
}

impl fmt::Debug for RpcHandlerRegistry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut dbg = f.debug_struct("RpcHandlerRegistry");
dbg.field("methods", &self.inner.keys().collect::<Vec<_>>());
dbg.finish()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_register() {
RpcHandlerRegistry::default().register("my_method", async |_invocation| Ok("ok".into()));
}
}