Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/stackable-certs/src/ca/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ where
//
// The root profile doesn't add the AuthorityKeyIdentifier extension.
// We manually add it below by using the 160-bit SHA-1 hash of the
// subject pulic key. This conforms to one of the outlined methods for
// subject public key. This conforms to one of the outlined methods for
// generating key identifiers outlined in RFC 5280, section 4.2.1.2.
//
// Prepare extensions so we can avoid clones.
Expand Down
8 changes: 8 additions & 0 deletions crates/stackable-webhook/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Added

- BREAKING: Add support to gracefully shutdown `WebhookServer` and `TlsServer` ([#1144]).
Both `WebhookServer::run` and `TlsServer::run` now require passing a shutdown signal, which is any
`Future<Output = ()>`.

[#1144]: https://github.com/stackabletech/operator-rs/pull/1144

## [0.8.1] - 2026-01-07

### Fixed
Expand Down
90 changes: 35 additions & 55 deletions crates/stackable-webhook/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use ::x509_cert::Certificate;
use axum::{Router, routing::get};
use futures_util::{FutureExt as _, TryFutureExt, select};
use futures_util::TryFutureExt;
use k8s_openapi::ByteString;
use snafu::{ResultExt, Snafu};
use stackable_telemetry::AxumTraceLayer;
use tokio::{
signal::unix::{SignalKind, signal},
sync::mpsc,
try_join,
};
use tokio::{sync::mpsc, try_join};
use tower::ServiceBuilder;
use webhooks::{Webhook, WebhookError};
use x509_cert::der::{EncodePem, pem::LineEnding};
Expand Down Expand Up @@ -59,6 +55,7 @@ pub enum WebhookServerError {
///
/// ```
/// use stackable_webhook::{WebhookServer, WebhookServerOptions, webhooks::Webhook};
/// use tokio::time::{Duration, sleep};
///
/// # async fn docs() {
/// let mut webhooks: Vec<Box<dyn Webhook>> = vec![];
Expand All @@ -69,8 +66,9 @@ pub enum WebhookServerError {
/// webhook_service_name: "my-operator".to_owned(),
/// };
/// let webhook_server = WebhookServer::new(webhooks, webhook_options).await.unwrap();
/// let shutdown_signal = sleep(Duration::from_millis(100));
///
/// webhook_server.run().await.unwrap();
/// webhook_server.run(shutdown_signal).await.unwrap();
/// # }
/// ```
pub struct WebhookServer {
Expand Down Expand Up @@ -154,52 +152,16 @@ impl WebhookServer {
})
}

/// Runs the Webhook server and sets up signal handlers for shutting down.
/// Runs the [`WebhookServer`] and handles underlying certificate rotations of the [`TlsServer`].
///
/// This does not implement graceful shutdown of the underlying server. Additionally, the server
/// is never started in cases where no [`Webhook`] is registered. Callers of this function need
/// to ensure to choose the correct joining mechanism for their use-case to for example avoid
/// unexpected shutdowns of the whole Kubernetes controller.
pub async fn run(self) -> Result<()> {
let future_server = self.run_server();
let future_signal = async {
let mut sigint = signal(SignalKind::interrupt()).expect("create SIGINT listener");
let mut sigterm = signal(SignalKind::terminate()).expect("create SIGTERM listener");

tracing::debug!("created unix signal handlers");

select! {
signal = sigint.recv().fuse() => {
if signal.is_some() {
tracing::debug!( "received SIGINT");
}
},
signal = sigterm.recv().fuse() => {
if signal.is_some() {
tracing::debug!( "received SIGTERM");
}
},
};
};

// select requires Future + Unpin
tokio::pin!(future_server);
tokio::pin!(future_signal);

tokio::select! {
res = &mut future_server => {
// If the server future errors, propagate the error
res?;
}
_ = &mut future_signal => {
tracing::info!("shutdown signal received, stopping webhook server");
}
}

Ok(())
}

async fn run_server(self) -> Result<()> {
/// It should be noted that the server is never started in cases where no [`Webhook`] is
/// registered. Callers of this function need to ensure to choose the correct joining mechanism
/// for their use-case to for example avoid unexpected shutdowns of the whole Kubernetes
/// controller.
pub async fn run<F>(self, shutdown_signal: F) -> Result<()>
where
F: Future<Output = ()>,
{
tracing::debug!("run webhook server");

let Self {
Expand All @@ -217,13 +179,29 @@ impl WebhookServer {
}

let tls_server = tls_server
.run()
.run(shutdown_signal)
.map_err(|err| WebhookServerError::RunTlsServer { source: err });

let cert_update_loop = async {
while let Some(cert) = cert_rx.recv().await {
// Once the shutdown signal is triggered, the TlsServer above should be dropped as the
// run associated function consumes self. This in turn means that when the receiver is
// polled, it will return `Ok(Ready(None))`, which will cause this while loop to break
// and the future to complete.
while let Some(certificate) = cert_rx.recv().await {
// NOTE (@Techassi): There are currently NO semantic conventions for X509 certificates
// and as such, these are pretty much made up and potentially not ideal.
#[rustfmt::skip]
tracing::info!(
x509.not_before = certificate.tbs_certificate.validity.not_before.to_string(),
x509.not_after = certificate.tbs_certificate.validity.not_after.to_string(),
x509.serial_number = certificate.tbs_certificate.serial_number.to_string(),
x509.subject = certificate.tbs_certificate.subject.to_string(),
x509.issuer = certificate.tbs_certificate.issuer.to_string(),
"rotate certificate for registered webhooks"
);

// The caBundle needs to be provided as a base64-encoded PEM envelope.
let ca_bundle = cert
let ca_bundle = certificate
.to_pem(LineEnding::LF)
.context(EncodeCertificateAuthorityAsPemSnafu)?;
let ca_bundle = ByteString(ca_bundle.as_bytes().to_vec());
Expand All @@ -243,6 +221,8 @@ impl WebhookServer {
Ok(())
};

// This either returns if one of the two futures complete with Err(_) or when both complete
// with Ok(_). Both futures complete with Ok(_) when a shutdown signal is received.
try_join!(cert_update_loop, tls_server).map(|_| ())
}
}
56 changes: 40 additions & 16 deletions crates/stackable-webhook/src/tls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use axum::{
extract::{ConnectInfo, Request},
middleware::AddExtension,
};
use futures_util::FutureExt as _;
use hyper::{body::Incoming, service::service_fn};
use hyper_util::rt::{TokioExecutor, TokioIo};
use opentelemetry::trace::{FutureExt, SpanKind};
use opentelemetry::trace::{FutureExt as _, SpanKind};
use opentelemetry_semantic_conventions as semconv;
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_shared::time::Duration;
Expand Down Expand Up @@ -138,17 +139,27 @@ impl TlsServer {
/// router.
///
/// It also starts a background task to rotate the certificate as needed.
pub async fn run(self) -> Result<()> {
///
/// The `shutdown_signal` can be used to notify the [`TlsServer`] to
/// gracefully shutdown.
pub async fn run<F>(self, shutdown_signal: F) -> Result<()>
where
F: Future<Output = ()>,
{
let Self {
cert_resolver,
socket_addr,
config,
router,
} = self;

let start = tokio::time::Instant::now() + *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL;
let mut interval = tokio::time::interval_at(start, *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL);

let tls_acceptor = TlsAcceptor::from(Arc::new(self.config));
let tcp_listener =
TcpListener::bind(self.socket_addr)
.await
.context(BindTcpListenerSnafu {
socket_addr: self.socket_addr,
})?;
let tls_acceptor = TlsAcceptor::from(Arc::new(config));
let tcp_listener = TcpListener::bind(socket_addr)
.await
.context(BindTcpListenerSnafu { socket_addr })?;

// To be able to extract the connect info from incoming requests, it is
// required to turn the router into a Tower service which is capable of
Expand All @@ -161,24 +172,35 @@ impl TlsServer {
// - https://github.com/tokio-rs/axum/discussions/2397
// - https://github.com/tokio-rs/axum/blob/b02ce307371a973039018a13fa012af14775948c/examples/serve-with-hyper/src/main.rs#L98

let mut router = self
.router
.into_make_service_with_connect_info::<SocketAddr>();
let mut router = router.into_make_service_with_connect_info::<SocketAddr>();

// Fuse the future so that it only yields `Poll::Ready` once. The future
// additionally needs to be pinned to be able to be used in the select!
// macro below.
let shutdown_signal = shutdown_signal.fuse();
tokio::pin!(shutdown_signal);

loop {
let tls_acceptor = tls_acceptor.clone();

// Wait for either a new TCP connection or the certificate rotation interval tick
tokio::select! {
// We opt for a biased execution of arms to make sure we always check if the
// certificate needs rotation based on the interval. This ensures, we always use
// a valid certificate for the TLS connection.
// We opt for a biased execution of arms to make sure we always check if a
// shutdown signal was received or the certificate needs rotation based on the
// interval. This ensures, we always use a valid certificate for the TLS connection.
biased;

// As soon as this future because `Poll::Ready`, break out of the loop which cancels
// the certification rotation interval and stops accepting new TCP connections.
_ = &mut shutdown_signal => {
tracing::trace!("received shutdown signal");
break;
}

// This is cancellation-safe. If this branch is cancelled, the tick is NOT consumed.
// As such, we will not miss rotating the certificate.
_ = interval.tick() => {
self.cert_resolver
cert_resolver
.rotate_certificate()
.await
.context(RotateCertificateSnafu)?
Expand Down Expand Up @@ -210,6 +232,8 @@ impl TlsServer {
}
};
}

Ok(())
}

async fn handle_request(
Expand Down
20 changes: 11 additions & 9 deletions crates/stackable-webhook/src/webhooks/conversion_webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use snafu::{ResultExt, Snafu};
use tokio::sync::oneshot;
use tracing::instrument;

use super::{Webhook, WebhookError};
use crate::WebhookServerOptions;
use crate::{Webhook, WebhookError, WebhookServerOptions};

#[derive(Debug, Snafu)]
pub enum ConversionWebhookError {
Expand Down Expand Up @@ -51,6 +50,7 @@ pub enum ConversionWebhookError {
/// WebhookServer,
/// webhooks::{ConversionWebhook, ConversionWebhookOptions},
/// };
/// use tokio::time::{Duration, sleep};
///
/// # async fn docs() {
/// // The Kubernetes client
Expand All @@ -75,7 +75,9 @@ pub enum ConversionWebhookError {
/// let webhook_server = WebhookServer::new(vec![Box::new(conversion_webhook)], webhook_options)
/// .await
/// .unwrap();
/// webhook_server.run().await.unwrap();
/// let shutdown_signal = sleep(Duration::from_millis(100));
///
/// webhook_server.run(shutdown_signal).await.unwrap();
/// # }
/// ```
pub struct ConversionWebhook<H> {
Expand Down Expand Up @@ -135,7 +137,7 @@ impl<H> ConversionWebhook<H> {
}

#[instrument(
skip(self, crd, crd_api, new_ca_bundle),
skip(self, crd, crd_api, ca_bundle),
fields(
name = crd.name_any(),
kind = &crd.spec.names.kind
Expand All @@ -145,7 +147,7 @@ impl<H> ConversionWebhook<H> {
&self,
mut crd: CustomResourceDefinition,
crd_api: &Api<CustomResourceDefinition>,
new_ca_bundle: &ByteString,
ca_bundle: ByteString,
options: &WebhookServerOptions,
) -> Result<(), WebhookError> {
let crd_kind = &crd.spec.names.kind;
Expand Down Expand Up @@ -175,7 +177,7 @@ impl<H> ConversionWebhook<H> {
port: Some(options.socket_addr.port().into()),
}),
// Here, ByteString takes care of encoding the provided content as base64.
ca_bundle: Some(new_ca_bundle.to_owned()),
ca_bundle: Some(ca_bundle),
url: None,
}),
}),
Expand Down Expand Up @@ -244,15 +246,15 @@ where
self.options.disable_crd_maintenance
}

#[instrument(skip(self, new_ca_bundle))]
#[instrument(skip(self, ca_bundle))]
async fn handle_certificate_rotation(
&mut self,
new_ca_bundle: &ByteString,
ca_bundle: &ByteString,
options: &WebhookServerOptions,
) -> Result<(), WebhookError> {
let crd_api: Api<CustomResourceDefinition> = Api::all(self.client.clone());
for (crd, _) in &self.crds_and_handlers {
self.reconcile_crd(crd.clone(), &crd_api, new_ca_bundle, options)
self.reconcile_crd(crd.clone(), &crd_api, ca_bundle.to_owned(), options)
.await?;
}

Expand Down
2 changes: 1 addition & 1 deletion crates/stackable-webhook/src/webhooks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub trait Webhook {
/// Webhooks are informed about new certificates by this function and can react accordingly.
async fn handle_certificate_rotation(
&mut self,
new_ca_bundle: &ByteString,
ca_bundle: &ByteString,
options: &WebhookServerOptions,
) -> Result<(), WebhookError>;
}
Expand Down
Loading
Loading