Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion protocols/rendezvous/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
## 0.18.0

- Port [8fde2dc](https://github.com/libp2p/rust-libp2p/commit/8fde2dc0fae8b433f97c6cdf9ee24f59d51a359c) and make unrequired pub functions private
See [PR 6364](https://github.com/libp2p/rust-libp2p/pull/6364).

- refactor: `Codec` no longer requires `#[async_trait]`
See [PR 6292](https://github.com/libp2p/rust-libp2p/pull/6292)

- Raise MSRV to 1.88.0.
See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273).

## 0.17.1

- Add limits for the per-peer and total number of store registrations, and the stored client cookies.
See [GHSA-cqfx-gf56-8x59](https://github.com/libp2p/rust-libp2p/security/advisories/GHSA-cqfx-gf56-8x59) and [GHSA-v5hw-cv9c-rpg7](https://github.com/libp2p/rust-libp2p/security/advisories/GHSA-v5hw-cv9c-rpg7)

## 0.17.0

- Emit `ToSwarm::NewExternalAddrOfPeer` for newly discovered peers.
See [PR 5138](https://github.com/libp2p/rust-libp2p/pull/5138).
- Log error instead of panicking when sending response to channel fails
See [PR 6002](https://github.com/libp2p/rust-libp2p/pull/6002).

<!-- Update to libp2p-swarm v0.47.0 -->

## 0.16.0

Expand Down
1 change: 1 addition & 0 deletions protocols/rendezvous/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ asynchronous-codec = { workspace = true }
bimap = "0.6.3"
futures = { workspace = true, features = ["std"] }
futures-timer = "3.0.3"
hashlink = { workspace = true }
web-time = { workspace = true }
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
Expand Down
122 changes: 62 additions & 60 deletions protocols/rendezvous/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::{

use bimap::BiMap;
use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered};
use hashlink::LruCache;
use libp2p_core::{Endpoint, Multiaddr, transport::PortUse};
use libp2p_identity::PeerId;
use libp2p_request_response::ProtocolSupport;
Expand All @@ -40,6 +41,13 @@ use crate::{
codec::{Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, Ttl},
};

/// Default maximum active registrations per peer.
pub const MAX_REGISTRATION_PEER: usize = 32;
/// Default maximum active registrations in total.
pub const MAX_REGISTRATIONS_TOTAL: usize = 10_000;
/// Default size of the cache that stores client cookies.
pub const COOKIES_CACHE_SIZE: usize = 10_000;

pub struct Behaviour {
inner: libp2p_request_response::Behaviour<crate::codec::Codec>,

Expand All @@ -49,6 +57,9 @@ pub struct Behaviour {
pub struct Config {
min_ttl: Ttl,
max_ttl: Ttl,
max_registrations_per_peer: usize,
max_registrations_total: usize,
max_cookies: usize,
}

impl Config {
Expand All @@ -61,13 +72,31 @@ impl Config {
self.max_ttl = max_ttl;
self
}

pub fn with_max_registration_per_peer(mut self, max: usize) -> Self {
self.max_registrations_per_peer = max;
self
}

pub fn with_max_registration_total(mut self, max: usize) -> Self {
self.max_registrations_total = max;
self
}

pub fn with_max_stored_cookies(mut self, max: usize) -> Self {
self.max_cookies = max;
self
}
}

impl Default for Config {
fn default() -> Self {
Self {
min_ttl: MIN_TTL,
max_ttl: MAX_TTL,
max_registrations_per_peer: MAX_REGISTRATION_PEER,
max_registrations_total: MAX_REGISTRATIONS_TOTAL,
max_cookies: COOKIES_CACHE_SIZE,
}
}
}
Expand Down Expand Up @@ -279,9 +308,7 @@ fn handle_request(

Some((event, Some(response)))
}
Err(TtlOutOfRange::TooLong { .. }) | Err(TtlOutOfRange::TooShort { .. }) => {
let error = ErrorCode::InvalidTtl;

Err(error) => {
let response = Message::RegisterResponse(Err(error));

let event = Event::PeerNotRegistered {
Expand Down Expand Up @@ -351,69 +378,53 @@ impl RegistrationId {
#[derive(Debug, PartialEq)]
struct ExpiredRegistration(Registration);

pub struct Registrations {
struct Registrations {
config: Config,
registrations_for_peer: BiMap<(PeerId, Namespace), RegistrationId>,
registrations: HashMap<RegistrationId, Registration>,
cookies: HashMap<Cookie, HashSet<RegistrationId>>,
min_ttl: Ttl,
max_ttl: Ttl,
cookies: LruCache<Cookie, HashSet<RegistrationId>>,
next_expiry: FuturesUnordered<BoxFuture<'static, RegistrationId>>,
}

#[derive(Debug, thiserror::Error)]
pub enum TtlOutOfRange {
#[error("Requested TTL ({requested}s) is too long; max {bound}s")]
TooLong { bound: Ttl, requested: Ttl },
#[error("Requested TTL ({requested}s) is too short; min {bound}s")]
TooShort { bound: Ttl, requested: Ttl },
}

impl Default for Registrations {
fn default() -> Self {
Registrations::with_config(Config::default())
}
}

impl Registrations {
pub fn with_config(config: Config) -> Self {
fn with_config(config: Config) -> Self {
Self {
registrations_for_peer: Default::default(),
registrations: Default::default(),
min_ttl: config.min_ttl,
max_ttl: config.max_ttl,
cookies: Default::default(),
cookies: LruCache::new(config.max_cookies),
config,
next_expiry: FuturesUnordered::from_iter(vec![futures::future::pending().boxed()]),
}
}

pub fn add(
&mut self,
new_registration: NewRegistration,
) -> Result<Registration, TtlOutOfRange> {
fn add(&mut self, new_registration: NewRegistration) -> Result<Registration, ErrorCode> {
let ttl = new_registration.effective_ttl();
if ttl > self.max_ttl {
return Err(TtlOutOfRange::TooLong {
bound: self.max_ttl,
requested: ttl,
});
}
if ttl < self.min_ttl {
return Err(TtlOutOfRange::TooShort {
bound: self.min_ttl,
requested: ttl,
});
if ttl > self.config.max_ttl || ttl < self.config.min_ttl {
return Err(ErrorCode::InvalidTtl);
}

let namespace = new_registration.namespace;
let registration_id = RegistrationId::new();
let peer = new_registration.record.peer_id();

if let Some(old_registration) = self
if self
.registrations_for_peer
.get_by_left(&(new_registration.record.peer_id(), namespace.clone()))
.left_values()
.filter(|(p, _)| p == &peer)
.count()
>= self.config.max_registrations_per_peer
|| self.registrations_for_peer.len() > self.config.max_registrations_total
{
self.registrations.remove(old_registration);
return Err(ErrorCode::Unavailable);
}

let namespace = new_registration.namespace;
let registration_id = RegistrationId::new();

self.registrations_for_peer.insert(
(new_registration.record.peer_id(), namespace.clone()),
registration_id,
Expand All @@ -436,7 +447,7 @@ impl Registrations {
Ok(registration)
}

pub fn remove(&mut self, namespace: Namespace, peer_id: PeerId) {
fn remove(&mut self, namespace: Namespace, peer_id: PeerId) {
let reggo_to_remove = self
.registrations_for_peer
.remove_by_left(&(peer_id, namespace));
Expand All @@ -446,7 +457,7 @@ impl Registrations {
}
}

pub fn get(
fn get(
&mut self,
discover_namespace: Option<Namespace>,
cookie: Option<Cookie>,
Expand Down Expand Up @@ -536,9 +547,8 @@ impl Registrations {
}
}

#[derive(Debug, thiserror::Error, Eq, PartialEq)]
#[error("The provided cookie is not valid for a DISCOVER request for the given namespace")]
pub struct CookieNamespaceMismatch;
#[derive(Debug, Eq, PartialEq)]
struct CookieNamespaceMismatch;

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -646,10 +656,8 @@ mod tests {

#[tokio::test]
async fn given_two_registration_ttls_one_expires_one_lives() {
let mut registrations = Registrations::with_config(Config {
min_ttl: 0,
max_ttl: 4,
});
let mut registrations =
Registrations::with_config(Config::default().with_min_ttl(0).with_max_ttl(4));

let start_time = SystemTime::now();

Expand Down Expand Up @@ -682,10 +690,8 @@ mod tests {

#[tokio::test]
async fn given_peer_unregisters_before_expiry_do_not_emit_registration_expired() {
let mut registrations = Registrations::with_config(Config {
min_ttl: 1,
max_ttl: 10,
});
let mut registrations =
Registrations::with_config(Config::default().with_min_ttl(1).with_max_ttl(10));
let dummy_registration = new_dummy_registration_with_ttl("foo", 2);
let namespace = dummy_registration.namespace.clone();
let peer_id = dummy_registration.record.peer_id();
Expand All @@ -704,10 +710,8 @@ mod tests {
#[tokio::test]
async fn given_all_registrations_expired_then_successfully_handle_new_registration_and_expiry()
{
let mut registrations = Registrations::with_config(Config {
min_ttl: 0,
max_ttl: 10,
});
let mut registrations =
Registrations::with_config(Config::default().with_min_ttl(0).with_max_ttl(10));
let dummy_registration = new_dummy_registration_with_ttl("foo", 1);

registrations.add(dummy_registration.clone()).unwrap();
Expand All @@ -721,10 +725,8 @@ mod tests {

#[tokio::test]
async fn cookies_are_cleaned_up_if_registrations_expire() {
let mut registrations = Registrations::with_config(Config {
min_ttl: 1,
max_ttl: 10,
});
let mut registrations =
Registrations::with_config(Config::default().with_min_ttl(1).with_max_ttl(10));

registrations
.add(new_dummy_registration_with_ttl("foo", 2))
Expand Down
Loading