diff --git a/crates/stackable-certs/src/ca/mod.rs b/crates/stackable-certs/src/ca/mod.rs index c3bb57099..7f7160f09 100644 --- a/crates/stackable-certs/src/ca/mod.rs +++ b/crates/stackable-certs/src/ca/mod.rs @@ -247,7 +247,7 @@ where // // The root profile doesn't add the AuthorityKeyIdentifier extension. // We manually add it below by using the 160-bit SHA-1 hash of the - // subject pulic key. This conforms to one of the outlined methods for + // subject public key. This conforms to one of the outlined methods for // generating key identifiers outlined in RFC 5280, section 4.2.1.2. // // Prepare extensions so we can avoid clones. diff --git a/crates/stackable-webhook/CHANGELOG.md b/crates/stackable-webhook/CHANGELOG.md index cd9d460c1..a8fe9fe2e 100644 --- a/crates/stackable-webhook/CHANGELOG.md +++ b/crates/stackable-webhook/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- BREAKING: Add support to gracefully shutdown `WebhookServer` and `TlsServer` ([#1144]). + Both `WebhookServer::run` and `TlsServer::run` now require passing a shutdown signal, which is any + `Future`. + +[#1144]: https://github.com/stackabletech/operator-rs/pull/1144 + ## [0.8.1] - 2026-01-07 ### Fixed diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 43b16d816..d74a69fbd 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -15,15 +15,11 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use ::x509_cert::Certificate; use axum::{Router, routing::get}; -use futures_util::{FutureExt as _, TryFutureExt, select}; +use futures_util::TryFutureExt; use k8s_openapi::ByteString; use snafu::{ResultExt, Snafu}; use stackable_telemetry::AxumTraceLayer; -use tokio::{ - signal::unix::{SignalKind, signal}, - sync::mpsc, - try_join, -}; +use tokio::{sync::mpsc, try_join}; use tower::ServiceBuilder; use webhooks::{Webhook, WebhookError}; use x509_cert::der::{EncodePem, pem::LineEnding}; @@ -59,6 +55,7 @@ pub enum WebhookServerError { /// /// ``` /// use stackable_webhook::{WebhookServer, WebhookServerOptions, webhooks::Webhook}; +/// use tokio::time::{Duration, sleep}; /// /// # async fn docs() { /// let mut webhooks: Vec> = vec![]; @@ -69,8 +66,9 @@ pub enum WebhookServerError { /// webhook_service_name: "my-operator".to_owned(), /// }; /// let webhook_server = WebhookServer::new(webhooks, webhook_options).await.unwrap(); +/// let shutdown_signal = sleep(Duration::from_millis(100)); /// -/// webhook_server.run().await.unwrap(); +/// webhook_server.run(shutdown_signal).await.unwrap(); /// # } /// ``` pub struct WebhookServer { @@ -154,52 +152,16 @@ impl WebhookServer { }) } - /// Runs the Webhook server and sets up signal handlers for shutting down. + /// Runs the [`WebhookServer`] and handles underlying certificate rotations of the [`TlsServer`]. /// - /// This does not implement graceful shutdown of the underlying server. Additionally, the server - /// is never started in cases where no [`Webhook`] is registered. Callers of this function need - /// to ensure to choose the correct joining mechanism for their use-case to for example avoid - /// unexpected shutdowns of the whole Kubernetes controller. - pub async fn run(self) -> Result<()> { - let future_server = self.run_server(); - let future_signal = async { - let mut sigint = signal(SignalKind::interrupt()).expect("create SIGINT listener"); - let mut sigterm = signal(SignalKind::terminate()).expect("create SIGTERM listener"); - - tracing::debug!("created unix signal handlers"); - - select! { - signal = sigint.recv().fuse() => { - if signal.is_some() { - tracing::debug!( "received SIGINT"); - } - }, - signal = sigterm.recv().fuse() => { - if signal.is_some() { - tracing::debug!( "received SIGTERM"); - } - }, - }; - }; - - // select requires Future + Unpin - tokio::pin!(future_server); - tokio::pin!(future_signal); - - tokio::select! { - res = &mut future_server => { - // If the server future errors, propagate the error - res?; - } - _ = &mut future_signal => { - tracing::info!("shutdown signal received, stopping webhook server"); - } - } - - Ok(()) - } - - async fn run_server(self) -> Result<()> { + /// It should be noted that the server is never started in cases where no [`Webhook`] is + /// registered. Callers of this function need to ensure to choose the correct joining mechanism + /// for their use-case to for example avoid unexpected shutdowns of the whole Kubernetes + /// controller. + pub async fn run(self, shutdown_signal: F) -> Result<()> + where + F: Future, + { tracing::debug!("run webhook server"); let Self { @@ -217,13 +179,29 @@ impl WebhookServer { } let tls_server = tls_server - .run() + .run(shutdown_signal) .map_err(|err| WebhookServerError::RunTlsServer { source: err }); let cert_update_loop = async { - while let Some(cert) = cert_rx.recv().await { + // Once the shutdown signal is triggered, the TlsServer above should be dropped as the + // run associated function consumes self. This in turn means that when the receiver is + // polled, it will return `Ok(Ready(None))`, which will cause this while loop to break + // and the future to complete. + while let Some(certificate) = cert_rx.recv().await { + // NOTE (@Techassi): There are currently NO semantic conventions for X509 certificates + // and as such, these are pretty much made up and potentially not ideal. + #[rustfmt::skip] + tracing::info!( + x509.not_before = certificate.tbs_certificate.validity.not_before.to_string(), + x509.not_after = certificate.tbs_certificate.validity.not_after.to_string(), + x509.serial_number = certificate.tbs_certificate.serial_number.to_string(), + x509.subject = certificate.tbs_certificate.subject.to_string(), + x509.issuer = certificate.tbs_certificate.issuer.to_string(), + "rotate certificate for registered webhooks" + ); + // The caBundle needs to be provided as a base64-encoded PEM envelope. - let ca_bundle = cert + let ca_bundle = certificate .to_pem(LineEnding::LF) .context(EncodeCertificateAuthorityAsPemSnafu)?; let ca_bundle = ByteString(ca_bundle.as_bytes().to_vec()); @@ -243,6 +221,8 @@ impl WebhookServer { Ok(()) }; + // This either returns if one of the two futures complete with Err(_) or when both complete + // with Ok(_). Both futures complete with Ok(_) when a shutdown signal is received. try_join!(cert_update_loop, tls_server).map(|_| ()) } } diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index fa493159d..3118392fe 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -7,9 +7,10 @@ use axum::{ extract::{ConnectInfo, Request}, middleware::AddExtension, }; +use futures_util::FutureExt as _; use hyper::{body::Incoming, service::service_fn}; use hyper_util::rt::{TokioExecutor, TokioIo}; -use opentelemetry::trace::{FutureExt, SpanKind}; +use opentelemetry::trace::{FutureExt as _, SpanKind}; use opentelemetry_semantic_conventions as semconv; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_shared::time::Duration; @@ -138,17 +139,27 @@ impl TlsServer { /// router. /// /// It also starts a background task to rotate the certificate as needed. - pub async fn run(self) -> Result<()> { + /// + /// The `shutdown_signal` can be used to notify the [`TlsServer`] to + /// gracefully shutdown. + pub async fn run(self, shutdown_signal: F) -> Result<()> + where + F: Future, + { + let Self { + cert_resolver, + socket_addr, + config, + router, + } = self; + let start = tokio::time::Instant::now() + *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL; let mut interval = tokio::time::interval_at(start, *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL); - let tls_acceptor = TlsAcceptor::from(Arc::new(self.config)); - let tcp_listener = - TcpListener::bind(self.socket_addr) - .await - .context(BindTcpListenerSnafu { - socket_addr: self.socket_addr, - })?; + let tls_acceptor = TlsAcceptor::from(Arc::new(config)); + let tcp_listener = TcpListener::bind(socket_addr) + .await + .context(BindTcpListenerSnafu { socket_addr })?; // To be able to extract the connect info from incoming requests, it is // required to turn the router into a Tower service which is capable of @@ -161,24 +172,35 @@ impl TlsServer { // - https://github.com/tokio-rs/axum/discussions/2397 // - https://github.com/tokio-rs/axum/blob/b02ce307371a973039018a13fa012af14775948c/examples/serve-with-hyper/src/main.rs#L98 - let mut router = self - .router - .into_make_service_with_connect_info::(); + let mut router = router.into_make_service_with_connect_info::(); + + // Fuse the future so that it only yields `Poll::Ready` once. The future + // additionally needs to be pinned to be able to be used in the select! + // macro below. + let shutdown_signal = shutdown_signal.fuse(); + tokio::pin!(shutdown_signal); loop { let tls_acceptor = tls_acceptor.clone(); // Wait for either a new TCP connection or the certificate rotation interval tick tokio::select! { - // We opt for a biased execution of arms to make sure we always check if the - // certificate needs rotation based on the interval. This ensures, we always use - // a valid certificate for the TLS connection. + // We opt for a biased execution of arms to make sure we always check if a + // shutdown signal was received or the certificate needs rotation based on the + // interval. This ensures, we always use a valid certificate for the TLS connection. biased; + // As soon as this future because `Poll::Ready`, break out of the loop which cancels + // the certification rotation interval and stops accepting new TCP connections. + _ = &mut shutdown_signal => { + tracing::trace!("received shutdown signal"); + break; + } + // This is cancellation-safe. If this branch is cancelled, the tick is NOT consumed. // As such, we will not miss rotating the certificate. _ = interval.tick() => { - self.cert_resolver + cert_resolver .rotate_certificate() .await .context(RotateCertificateSnafu)? @@ -210,6 +232,8 @@ impl TlsServer { } }; } + + Ok(()) } async fn handle_request( diff --git a/crates/stackable-webhook/src/webhooks/conversion_webhook.rs b/crates/stackable-webhook/src/webhooks/conversion_webhook.rs index c0f11b7d0..64cd58f2d 100644 --- a/crates/stackable-webhook/src/webhooks/conversion_webhook.rs +++ b/crates/stackable-webhook/src/webhooks/conversion_webhook.rs @@ -22,8 +22,7 @@ use snafu::{ResultExt, Snafu}; use tokio::sync::oneshot; use tracing::instrument; -use super::{Webhook, WebhookError}; -use crate::WebhookServerOptions; +use crate::{Webhook, WebhookError, WebhookServerOptions}; #[derive(Debug, Snafu)] pub enum ConversionWebhookError { @@ -51,6 +50,7 @@ pub enum ConversionWebhookError { /// WebhookServer, /// webhooks::{ConversionWebhook, ConversionWebhookOptions}, /// }; +/// use tokio::time::{Duration, sleep}; /// /// # async fn docs() { /// // The Kubernetes client @@ -75,7 +75,9 @@ pub enum ConversionWebhookError { /// let webhook_server = WebhookServer::new(vec![Box::new(conversion_webhook)], webhook_options) /// .await /// .unwrap(); -/// webhook_server.run().await.unwrap(); +/// let shutdown_signal = sleep(Duration::from_millis(100)); +/// +/// webhook_server.run(shutdown_signal).await.unwrap(); /// # } /// ``` pub struct ConversionWebhook { @@ -135,7 +137,7 @@ impl ConversionWebhook { } #[instrument( - skip(self, crd, crd_api, new_ca_bundle), + skip(self, crd, crd_api, ca_bundle), fields( name = crd.name_any(), kind = &crd.spec.names.kind @@ -145,7 +147,7 @@ impl ConversionWebhook { &self, mut crd: CustomResourceDefinition, crd_api: &Api, - new_ca_bundle: &ByteString, + ca_bundle: ByteString, options: &WebhookServerOptions, ) -> Result<(), WebhookError> { let crd_kind = &crd.spec.names.kind; @@ -175,7 +177,7 @@ impl ConversionWebhook { port: Some(options.socket_addr.port().into()), }), // Here, ByteString takes care of encoding the provided content as base64. - ca_bundle: Some(new_ca_bundle.to_owned()), + ca_bundle: Some(ca_bundle), url: None, }), }), @@ -244,15 +246,15 @@ where self.options.disable_crd_maintenance } - #[instrument(skip(self, new_ca_bundle))] + #[instrument(skip(self, ca_bundle))] async fn handle_certificate_rotation( &mut self, - new_ca_bundle: &ByteString, + ca_bundle: &ByteString, options: &WebhookServerOptions, ) -> Result<(), WebhookError> { let crd_api: Api = Api::all(self.client.clone()); for (crd, _) in &self.crds_and_handlers { - self.reconcile_crd(crd.clone(), &crd_api, new_ca_bundle, options) + self.reconcile_crd(crd.clone(), &crd_api, ca_bundle.to_owned(), options) .await?; } diff --git a/crates/stackable-webhook/src/webhooks/mod.rs b/crates/stackable-webhook/src/webhooks/mod.rs index 23cbca7ef..ac165efac 100644 --- a/crates/stackable-webhook/src/webhooks/mod.rs +++ b/crates/stackable-webhook/src/webhooks/mod.rs @@ -48,7 +48,7 @@ pub trait Webhook { /// Webhooks are informed about new certificates by this function and can react accordingly. async fn handle_certificate_rotation( &mut self, - new_ca_bundle: &ByteString, + ca_bundle: &ByteString, options: &WebhookServerOptions, ) -> Result<(), WebhookError>; } diff --git a/crates/stackable-webhook/src/webhooks/mutating_webhook.rs b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs index 1beb12cde..9ff9192fd 100644 --- a/crates/stackable-webhook/src/webhooks/mutating_webhook.rs +++ b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs @@ -12,8 +12,7 @@ use serde::{Serialize, de::DeserializeOwned}; use snafu::{ResultExt, Snafu}; use tracing::instrument; -use super::{Webhook, WebhookError}; -use crate::{WebhookServerOptions, webhooks::create_webhook_client_config}; +use crate::{Webhook, WebhookError, WebhookServerOptions, webhooks::create_webhook_client_config}; #[derive(Debug, Snafu)] pub enum MutatingWebhookError { @@ -51,6 +50,7 @@ pub enum MutatingWebhookError { /// WebhookServer, /// webhooks::{MutatingWebhook, MutatingWebhookOptions}, /// }; +/// use tokio::time::{Duration, sleep}; /// /// # async fn docs() { /// // The Kubernetes client @@ -76,7 +76,9 @@ pub enum MutatingWebhookError { /// let webhook_server = WebhookServer::new(vec![mutating_webhook], webhook_options) /// .await /// .unwrap(); -/// webhook_server.run().await.unwrap(); +/// let shutdown_signal = sleep(Duration::from_millis(100)); +/// +/// webhook_server.run(shutdown_signal).await.unwrap(); /// # } /// /// fn get_mutating_webhook_configuration() -> MutatingWebhookConfiguration { @@ -206,14 +208,15 @@ where self.options.disable_mwc_maintenance } - #[instrument(skip(self, new_ca_bundle))] + #[instrument(skip(self, ca_bundle))] async fn handle_certificate_rotation( &mut self, - new_ca_bundle: &ByteString, + ca_bundle: &ByteString, options: &WebhookServerOptions, ) -> Result<(), WebhookError> { let mut mutating_webhook_configuration = self.mutating_webhook_configuration.clone(); let mwc_name = mutating_webhook_configuration.name_any(); + tracing::info!( k8s.mutatingwebhookconfiguration.name = mwc_name, "reconciling mutating webhook configurations" @@ -222,7 +225,7 @@ where for webhook in mutating_webhook_configuration.webhooks.iter_mut().flatten() { // We know how we can be called (and with what certificate), so we can always set that webhook.client_config = - create_webhook_client_config(options, new_ca_bundle.to_owned(), self.http_path()); + create_webhook_client_config(options, ca_bundle.to_owned(), self.http_path()); } let mwc_api: Api = Api::all(self.client.clone());