Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gateway/gateway.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ app_address_ns_prefix = "_dstack-app-address"
app_address_ns_compat = true
workers = 32
external_port = 443
# Maximum concurrent connections per app. 0 means unlimited.
max_connections_per_app = 2000

[core.proxy.timeouts]
# Timeout for establishing a connection to the target app.
Expand Down
2 changes: 2 additions & 0 deletions gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub struct ProxyConfig {
pub workers: usize,
pub app_address_ns_prefix: String,
pub app_address_ns_compat: bool,
/// Maximum concurrent connections per app. 0 means unlimited.
pub max_connections_per_app: u64,
}

#[derive(Debug, Clone, Deserialize)]
Expand Down
36 changes: 33 additions & 3 deletions gateway/src/proxy/tls_passthough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
//
// SPDX-License-Identifier: Apache-2.0

use anyhow::{Context, Result};
use std::fmt::Debug;
use std::sync::atomic::Ordering;

use anyhow::{bail, Context, Result};
use tokio::{io::AsyncWriteExt, net::TcpStream, task::JoinSet, time::timeout};
use tracing::{debug, info};
use tracing::{debug, info, warn};

use crate::{
main_service::Proxy,
Expand Down Expand Up @@ -88,11 +90,38 @@ pub(crate) async fn proxy_with_sni(
proxy_to_app(state, inbound, buffer, &addr.app_id, addr.port).await
}

/// Check if app has reached max connections limit
fn check_connection_limit(
addresses: &AddressGroup,
max_connections: u64,
app_id: &str,
) -> Result<()> {
if max_connections == 0 {
return Ok(());
}
let total: u64 = addresses
.iter()
.map(|a| a.counter.load(Ordering::Relaxed))
.sum();
if total >= max_connections {
warn!(
app_id,
total, max_connections, "app connection limit exceeded"
);
bail!("app connection limit exceeded: {total}/{max_connections}");
}
Ok(())
}

/// connect to multiple hosts simultaneously and return the first successful connection
pub(crate) async fn connect_multiple_hosts(
addresses: AddressGroup,
port: u16,
max_connections: u64,
app_id: &str,
) -> Result<(TcpStream, EnteredCounter)> {
check_connection_limit(&addresses, max_connections, app_id)?;

let mut join_set = JoinSet::new();
for addr in addresses {
let counter = addr.counter.enter();
Expand Down Expand Up @@ -127,9 +156,10 @@ pub(crate) async fn proxy_to_app(
port: u16,
) -> Result<()> {
let addresses = state.lock().select_top_n_hosts(app_id)?;
let max_connections = state.config.proxy.max_connections_per_app;
let (mut outbound, _counter) = timeout(
state.config.proxy.timeouts.connect,
connect_multiple_hosts(addresses.clone(), port),
connect_multiple_hosts(addresses.clone(), port, max_connections, app_id),
)
.await
.with_context(|| format!("connecting timeout to app {app_id}: {addresses:?}:{port}"))?
Expand Down
3 changes: 2 additions & 1 deletion gateway/src/proxy/tls_terminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,10 @@ impl Proxy {
.with_context(|| format!("app {app_id} not found"))?;
debug!("selected top n hosts: {addresses:?}");
let tls_stream = self.tls_accept(inbound, buffer, h2).await?;
let max_connections = self.config.proxy.max_connections_per_app;
let (outbound, _counter) = timeout(
self.config.proxy.timeouts.connect,
connect_multiple_hosts(addresses, port),
connect_multiple_hosts(addresses, port, max_connections, app_id),
)
.await
.map_err(|_| anyhow!("connecting timeout"))?
Expand Down