Skip to content
1 change: 1 addition & 0 deletions components/controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rusqlite = { version = "0.32.1", features = ["bundled"], optional = true}
serde = "1.0.219"
reqwest = "0.12.22"
async-trait = "0.1.57"
tracing = "0.1"

[dev-dependencies]
witness = { path = "../witness" }
Expand Down
2 changes: 2 additions & 0 deletions components/watcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ thiserror = "1.0.63"
regex = "1.10.6"
tokio = { version = "1", features = ["full"] }
reqwest = "0.12.22"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[dev-dependencies]
keri-controller = { path = "../controller" }
Expand Down
6 changes: 5 additions & 1 deletion components/watcher/src/http_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,9 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) {
"/query/tel",
actix_web::web::post().to(http_handlers::process_tel_query_redb),
)
.route("info", actix_web::web::get().to(http_handlers::info));
.route("info", actix_web::web::get().to(http_handlers::info))
.route(
"/health",
actix_web::web::get().to(http_handlers::health_redb),
);
}
2 changes: 1 addition & 1 deletion components/watcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub use crate::{
watcher::{config::WatcherConfig, Watcher},
watcher::{config::WatcherConfig, health::WitnessHealthTracker, poller::WitnessPoller, Watcher},
watcher_listener::WatcherListener,
};

Expand Down
31 changes: 22 additions & 9 deletions components/watcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ use keri_core::{
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
use url::Url;
use tracing::info;
use watcher::{transport::HttpTelTransport, WatcherConfig, WatcherListener};

#[serde_as]
#[derive(Deserialize)]
pub struct Config {
db_path: PathBuf,
Expand All @@ -36,6 +38,12 @@ pub struct Config {
escrow_config: EscrowConfig,

tel_storage_path: PathBuf,

/// Interval in seconds between background witness polling cycles.
/// Defaults to 30 seconds. Set to 0 to disable.
#[serde_as(as = "Option<DurationSeconds>")]
#[serde(default)]
poll_interval: Option<Duration>,
}

#[serde_as]
Expand Down Expand Up @@ -119,8 +127,15 @@ const ENV_PREFIX: &str = "WATCHER_";

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();

let args = Args::parse();
println!("Using config file: {:?}", args.config_file);
info!(config_file = %args.config_file, "Loading configuration");

let cfg = Figment::new()
.merge(Yaml::file(args.config_file.clone()))
Expand All @@ -137,6 +152,7 @@ async fn main() -> anyhow::Result<()> {
tel_transport: Box::new(HttpTelTransport),
escrow_config: cfg.escrow_config,
tel_storage_path: cfg.tel_storage_path,
poll_interval: cfg.poll_interval.unwrap_or(Duration::from_secs(30)),
})?;

// Resolve oobi to know how to find witness
Expand All @@ -150,14 +166,11 @@ async fn main() -> anyhow::Result<()> {
url: cfg.public_url.clone(),
};

println!(
"Watcher {} is listening on port {}",
watcher_id.to_str(),
cfg.http_port,
);
println!(
"Watcher's oobi: {}",
serde_json::to_string(&watcher_loc_scheme).unwrap()
info!(
watcher_id = %watcher_id.to_str(),
port = cfg.http_port,
oobi = %serde_json::to_string(&watcher_loc_scheme).unwrap(),
"Watcher started",
);

watcher_listener
Expand Down
12 changes: 11 additions & 1 deletion components/watcher/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use keri_core::oobi::{LocationScheme, Scheme};
use teliox::query::SignedTelQuery;

Expand All @@ -18,6 +20,14 @@ pub enum TransportError {
InvalidResponse,
}

fn http_client() -> reqwest::Client {
reqwest::Client::builder()
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(30))
.build()
.expect("Failed to build HTTP client")
}

pub struct HttpTelTransport;

#[async_trait::async_trait]
Expand All @@ -31,7 +41,7 @@ impl WatcherTelTransport for HttpTelTransport {
Scheme::Http => location.url.join("query/tel").unwrap(),
Scheme::Tcp => todo!(),
};
let resp = reqwest::Client::new()
let resp = http_client()
.post(url)
.body(qry.to_cesr().unwrap())
.send()
Expand Down
6 changes: 5 additions & 1 deletion components/watcher/src/watcher/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::{path::PathBuf, time::Duration};

use keri_core::{
processor::escrow::EscrowConfig,
Expand All @@ -15,6 +15,9 @@ pub struct WatcherConfig {
pub tel_transport: Box<dyn WatcherTelTransport + Send + Sync>,
pub tel_storage_path: PathBuf,
pub escrow_config: EscrowConfig,
/// Interval between background witness polling cycles.
/// Set to Duration::ZERO to disable polling.
pub poll_interval: Duration,
}

impl Default for WatcherConfig {
Expand All @@ -27,6 +30,7 @@ impl Default for WatcherConfig {
tel_transport: Box::new(HttpTelTransport),
tel_storage_path: PathBuf::from("tel_storage"),
escrow_config: EscrowConfig::default(),
poll_interval: Duration::from_secs(30),
}
}
}
156 changes: 156 additions & 0 deletions components/watcher/src/watcher/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use std::{
collections::HashMap,
sync::RwLock,
time::{Duration, Instant},
};

use keri_core::prefix::IdentifierPrefix;
use serde::Serialize;

/// Per-witness health record.
#[derive(Debug, Clone, Serialize)]
pub struct WitnessHealth {
/// Total number of successful queries.
pub successes: u64,
/// Total number of failed queries.
pub failures: u64,
/// Consecutive failures (resets on success).
pub consecutive_failures: u64,
/// Average response time in milliseconds (rolling).
pub avg_response_ms: f64,
/// Last successful contact time (seconds ago, computed at serialization).
#[serde(skip)]
pub last_success: Option<Instant>,
/// Last failure time.
#[serde(skip)]
pub last_failure: Option<Instant>,
/// Last error message.
pub last_error: Option<String>,
}

impl Default for WitnessHealth {
fn default() -> Self {
Self {
successes: 0,
failures: 0,
consecutive_failures: 0,
avg_response_ms: 0.0,
last_success: None,
last_failure: None,
last_error: None,
}
}
}

impl WitnessHealth {
pub fn record_success(&mut self, response_time: Duration) {
self.successes += 1;
self.consecutive_failures = 0;
self.last_success = Some(Instant::now());

// Rolling average
let ms = response_time.as_secs_f64() * 1000.0;
if self.successes == 1 {
self.avg_response_ms = ms;
} else {
// Exponential moving average with alpha = 0.2
self.avg_response_ms = self.avg_response_ms * 0.8 + ms * 0.2;
}
}

pub fn record_failure(&mut self, error: String) {
self.failures += 1;
self.consecutive_failures += 1;
self.last_failure = Some(Instant::now());
self.last_error = Some(error);
}

/// Whether this witness is considered healthy (responsive).
pub fn is_healthy(&self) -> bool {
self.consecutive_failures < 3
}
}

/// Aggregated health status for the watcher's view of a specific AID.
#[derive(Debug, Clone, Serialize)]
pub struct AidHealthStatus {
pub prefix: String,
pub total_witnesses: usize,
pub healthy_witnesses: usize,
pub degraded: bool,
}

/// Tracks health statistics for all witnesses the watcher interacts with.
pub struct WitnessHealthTracker {
/// Per-witness health records keyed by witness identifier string.
records: RwLock<HashMap<String, WitnessHealth>>,
}

impl WitnessHealthTracker {
pub fn new() -> Self {
Self {
records: RwLock::new(HashMap::new()),
}
}

/// Record a successful response from a witness.
pub fn record_success(&self, witness_id: &IdentifierPrefix, response_time: Duration) {
let key = witness_id.to_string();
let mut records = self.records.write().unwrap();
records
.entry(key)
.or_insert_with(WitnessHealth::default)
.record_success(response_time);
}

/// Record a failed response from a witness.
pub fn record_failure(&self, witness_id: &IdentifierPrefix, error: String) {
let key = witness_id.to_string();
let mut records = self.records.write().unwrap();
records
.entry(key)
.or_insert_with(WitnessHealth::default)
.record_failure(error);
}

/// Check if a specific witness is considered healthy.
pub fn is_healthy(&self, witness_id: &IdentifierPrefix) -> bool {
let key = witness_id.to_string();
let records = self.records.read().unwrap();
records
.get(&key)
.map(|h| h.is_healthy())
.unwrap_or(true) // unknown witnesses are assumed healthy
}

/// Get health snapshot for all tracked witnesses.
pub fn get_all_health(&self) -> HashMap<String, WitnessHealth> {
let records = self.records.read().unwrap();
records.clone()
}

/// Get health status for witnesses of a specific AID.
pub fn get_aid_health(
&self,
aid: &IdentifierPrefix,
witness_ids: &[IdentifierPrefix],
) -> AidHealthStatus {
let records = self.records.read().unwrap();
let healthy_count = witness_ids
.iter()
.filter(|w| {
records
.get(&w.to_string())
.map(|h| h.is_healthy())
.unwrap_or(true)
})
.count();

AidHealthStatus {
prefix: aid.to_string(),
total_witnesses: witness_ids.len(),
healthy_witnesses: healthy_count,
degraded: healthy_count == 0 && !witness_ids.is_empty(),
}
}
}
Loading
Loading