Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions lib/bencher_json/src/runner/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ pub struct JsonNewRunJob {
/// Default poll timeout in seconds for job claiming long-poll.
pub const DEFAULT_POLL_TIMEOUT: u32 = 30;
/// Minimum poll timeout in seconds.
pub const MIN_POLL_TIMEOUT: u32 = 1;
pub const MIN_POLL_TIMEOUT: u32 = PollTimeout::MIN_VALUE;
/// Maximum poll timeout in seconds.
pub const MAX_POLL_TIMEOUT: u32 = 900;
pub const MAX_POLL_TIMEOUT: u32 = PollTimeout::MAX_VALUE;

pub use crate::{MAX_CMD_LEN, MAX_ENTRYPOINT_LEN, MAX_ENV_LEN, MAX_FILE_PATHS_LEN};

Expand Down
171 changes: 158 additions & 13 deletions lib/bencher_json/src/runner/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,59 @@
use bencher_valid::PollTimeout;
use serde::{Deserialize, Serialize};

use super::job::JsonIterationOutput;
use super::job::{JobUuid, JsonClaimedJob, JsonIterationOutput};

/// Messages sent from the runner to the server over the WebSocket channel.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum RunnerMessage {
/// Runner is idle, requesting a job.
Ready {
/// Maximum time to wait for a job (long-poll), in seconds (1-900)
#[serde(skip_serializing_if = "Option::is_none")]
poll_timeout: Option<PollTimeout>,
},
/// Job setup complete, benchmark execution starting.
Running,
/// Periodic heartbeat, keeps job alive and triggers billing.
Heartbeat,
/// Benchmark completed successfully.
Completed {
/// The job this result belongs to (enables retry on reconnect)
job: JobUuid,
/// Per-iteration results
results: Vec<JsonIterationOutput>,
},
/// Benchmark failed.
Failed {
/// The job this result belongs to (enables retry on reconnect)
job: JobUuid,
/// Per-iteration results collected before failure
results: Vec<JsonIterationOutput>,
/// Error description
error: String,
},
/// Acknowledge cancellation from server.
Canceled,
Canceled {
/// The job this cancellation belongs to (enables retry on reconnect)
job: JobUuid,
},
}

/// Messages sent from the server to the runner over the WebSocket channel.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum ServerMessage {
/// Acknowledge received message.
Ack,
Ack {
/// The job this acknowledgement is for (None for non-job messages like Ready)
#[serde(skip_serializing_if = "Option::is_none")]
job: Option<JobUuid>,
},
/// Server assigned a job (boxed because it's large).
Job(Box<JsonClaimedJob>),
/// Poll timeout expired, no job available.
NoJob,
/// Job was canceled, stop execution immediately.
Cancel,
}
Expand Down Expand Up @@ -68,6 +90,85 @@ mod tests {

use super::*;

fn test_job_uuid() -> JobUuid {
"550e8400-e29b-41d4-a716-446655440000".parse().unwrap()
}

#[test]
fn ready_no_timeout_roundtrip() {
let msg = RunnerMessage::Ready { poll_timeout: None };
let json = serde_json::to_string(&msg).unwrap();
assert_eq!(json, r#"{"event":"ready"}"#);
let deserialized: RunnerMessage = serde_json::from_str(&json).unwrap();
match deserialized {
RunnerMessage::Ready { poll_timeout } => assert!(poll_timeout.is_none()),
other => panic!("Expected Ready, got {other:?}"),
}
}

#[test]
fn ready_with_timeout_roundtrip() {
let msg = RunnerMessage::Ready {
poll_timeout: Some(PollTimeout::try_from(30).unwrap()),
};
let json = serde_json::to_string(&msg).unwrap();
let deserialized: RunnerMessage = serde_json::from_str(&json).unwrap();
match deserialized {
RunnerMessage::Ready { poll_timeout } => {
assert_eq!(u32::from(poll_timeout.unwrap()), 30);
},
other => panic!("Expected Ready, got {other:?}"),
}
}

#[test]
fn server_job_roundtrip() {
// Build a minimal JsonClaimedJob via JSON
let job_json = serde_json::json!({
"uuid": "550e8400-e29b-41d4-a716-446655440000",
"spec": {
"uuid": "00000000-0000-0000-0000-000000000001",
"name": "test-spec",
"slug": "test-spec",
"architecture": "x86_64",
"cpu": 2,
"memory": 0x4000_0000,
"disk": 0x2_8000_0000i64,
"network": false,
"created": "2025-01-01T00:00:00Z",
"modified": "2025-01-01T00:00:00Z"
},
"config": {
"registry": "https://registry.bencher.dev",
"project": "11111111-2222-3333-4444-555555555555",
"digest": "sha256:a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3",
"timeout": 300
},
"oci_token": "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c",
"timeout": 300,
"created": "2025-01-01T00:00:00Z"
});
let claimed: JsonClaimedJob = serde_json::from_value(job_json).unwrap();
let msg = ServerMessage::Job(Box::new(claimed));
let json = serde_json::to_string(&msg).unwrap();
let deserialized: ServerMessage = serde_json::from_str(&json).unwrap();
match deserialized {
ServerMessage::Job(job) => {
assert_eq!(job.uuid.to_string(), "550e8400-e29b-41d4-a716-446655440000");
},
other => panic!("Expected Job, got {other:?}"),
}
}

#[test]
fn server_no_job_roundtrip() {
let msg = ServerMessage::NoJob;
let json = serde_json::to_string(&msg).unwrap();
assert_eq!(json, r#"{"event":"no_job"}"#);
let deserialized: ServerMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(deserialized, ServerMessage::NoJob));
}

#[test]
fn running_roundtrip() {
let msg = RunnerMessage::Running;
Expand All @@ -86,7 +187,9 @@ mod tests {

#[test]
fn completed_roundtrip() {
let job_uuid = test_job_uuid();
let msg = RunnerMessage::Completed {
job: job_uuid,
results: vec![JsonIterationOutput {
exit_code: 0,
stdout: Some("hello".into()),
Expand All @@ -97,7 +200,8 @@ mod tests {
let json = serde_json::to_string(&msg).unwrap();
let deserialized: RunnerMessage = serde_json::from_str(&json).unwrap();
match deserialized {
RunnerMessage::Completed { results } => {
RunnerMessage::Completed { job, results } => {
assert_eq!(job, job_uuid);
assert_eq!(results.len(), 1);
assert_eq!(results[0].exit_code, 0);
assert_eq!(results[0].stdout.as_deref(), Some("hello"));
Expand All @@ -109,13 +213,16 @@ mod tests {

#[test]
fn completed_minimal_roundtrip() {
let job_uuid = test_job_uuid();
let msg = RunnerMessage::Completed {
job: job_uuid,
results: Vec::new(),
};
let json = serde_json::to_string(&msg).unwrap();
let deserialized: RunnerMessage = serde_json::from_str(&json).unwrap();
match deserialized {
RunnerMessage::Completed { results } => {
RunnerMessage::Completed { job, results } => {
assert_eq!(job, job_uuid);
assert!(results.is_empty());
},
other => panic!("Expected Completed, got {other:?}"),
Expand All @@ -124,7 +231,9 @@ mod tests {

#[test]
fn failed_roundtrip() {
let job_uuid = test_job_uuid();
let msg = RunnerMessage::Failed {
job: job_uuid,
results: vec![JsonIterationOutput {
exit_code: 1,
stdout: Some("out".into()),
Expand All @@ -136,7 +245,12 @@ mod tests {
let json = serde_json::to_string(&msg).unwrap();
let deserialized: RunnerMessage = serde_json::from_str(&json).unwrap();
match deserialized {
RunnerMessage::Failed { results, error } => {
RunnerMessage::Failed {
job,
results,
error,
} => {
assert_eq!(job, job_uuid);
assert_eq!(results.len(), 1);
assert_eq!(results[0].exit_code, 1);
assert_eq!(error, "something broke");
Expand All @@ -147,14 +261,21 @@ mod tests {

#[test]
fn failed_no_results_roundtrip() {
let job_uuid = test_job_uuid();
let msg = RunnerMessage::Failed {
job: job_uuid,
results: Vec::new(),
error: "startup failure".into(),
};
let json = serde_json::to_string(&msg).unwrap();
let deserialized: RunnerMessage = serde_json::from_str(&json).unwrap();
match deserialized {
RunnerMessage::Failed { results, error } => {
RunnerMessage::Failed {
job,
results,
error,
} => {
assert_eq!(job, job_uuid);
assert!(results.is_empty());
assert_eq!(error, "startup failure");
},
Expand All @@ -164,18 +285,39 @@ mod tests {

#[test]
fn canceled_roundtrip() {
let msg = RunnerMessage::Canceled;
let job_uuid = test_job_uuid();
let msg = RunnerMessage::Canceled { job: job_uuid };
let json = serde_json::to_string(&msg).unwrap();
let deserialized: RunnerMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(deserialized, RunnerMessage::Canceled));
match deserialized {
RunnerMessage::Canceled { job } => {
assert_eq!(job, job_uuid);
},
other => panic!("Expected Canceled, got {other:?}"),
}
}

#[test]
fn server_ack_roundtrip() {
let msg = ServerMessage::Ack;
fn server_ack_no_job_roundtrip() {
let msg = ServerMessage::Ack { job: None };
let json = serde_json::to_string(&msg).unwrap();
assert_eq!(json, r#"{"event":"ack"}"#);
let deserialized: ServerMessage = serde_json::from_str(&json).unwrap();
assert!(matches!(deserialized, ServerMessage::Ack));
assert!(matches!(deserialized, ServerMessage::Ack { job: None }));
}

#[test]
fn server_ack_with_job_roundtrip() {
let job_uuid = test_job_uuid();
let msg = ServerMessage::Ack {
job: Some(job_uuid),
};
let json = serde_json::to_string(&msg).unwrap();
let deserialized: ServerMessage = serde_json::from_str(&json).unwrap();
match deserialized {
ServerMessage::Ack { job } => assert_eq!(job, Some(job_uuid)),
other => panic!("Expected Ack, got {other:?}"),
}
}

#[test]
Expand Down Expand Up @@ -219,12 +361,14 @@ mod tests {

#[test]
fn completed_with_file_output_roundtrip() {
let job_uuid = test_job_uuid();
let mut output = BTreeMap::new();
output.insert(
Utf8PathBuf::from("/tmp/results.json"),
r#"{"metric": 42}"#.to_owned(),
);
let msg = RunnerMessage::Completed {
job: job_uuid,
results: vec![JsonIterationOutput {
exit_code: 0,
stdout: None,
Expand All @@ -235,7 +379,8 @@ mod tests {
let json = serde_json::to_string(&msg).unwrap();
let deserialized: RunnerMessage = serde_json::from_str(&json).unwrap();
match deserialized {
RunnerMessage::Completed { results } => {
RunnerMessage::Completed { job, results } => {
assert_eq!(job, job_uuid);
assert_eq!(results.len(), 1);
let output = results[0].output.as_ref().unwrap();
assert_eq!(
Expand Down
2 changes: 2 additions & 0 deletions lib/bencher_valid/src/plus/poll_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ impl FromStr for PollTimeout {
impl PollTimeout {
pub const MIN: Self = Self(MIN_POLL_TIMEOUT);
pub const MAX: Self = Self(MAX_POLL_TIMEOUT);
pub const MIN_VALUE: u32 = MIN_POLL_TIMEOUT;
pub const MAX_VALUE: u32 = MAX_POLL_TIMEOUT;
}

impl<'de> Deserialize<'de> for PollTimeout {
Expand Down
2 changes: 2 additions & 0 deletions plus/api_runners/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ bencher_json = { workspace = true, features = ["server", "schema", "db", "test-c
bencher_schema.workspace = true
camino.workspace = true
diesel.workspace = true
futures.workspace = true
futures-concurrency.workspace = true
http.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "test-util"] }
tokio-tungstenite.workspace = true

[lints]
workspace = true
Loading
Loading