Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
647 changes: 310 additions & 337 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 1 addition & 4 deletions default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ let
]
);

# We choose a minimal Rust channel to keep the Nix closure size smaller
rust = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;

defaultEnv = pkgs.buildEnv {
name = "opsqueue-env-default";
paths = [
Expand All @@ -39,7 +36,7 @@ let
pkgs.ruff

# For compiling the Rust parts
rust
pkgs.rustToolchain
pkgs.sqlx-cli

# Manage nix pins
Expand Down
8 changes: 4 additions & 4 deletions libs/opsqueue_python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ crate-type = ["cdylib"]
opsqueue = {path = "../../opsqueue/", default-features = false, features = ["client-logic"] }

# General-purpose datatypes:
uuid = {version = "1.11.0", features = [
uuid = {version = "1.19.0", features = [
"v7", # Temporally orderable UUIDs with random component
"fast-rng", # Use a faster (but still sufficiently random) RNG
]}
chrono = { version = "0.4.38"}
ux = "0.1.6"

# Concurrency:
tokio = {version = "1.38", features = ["macros", "rt-multi-thread"]}
tokio = {version = "1.49", features = ["macros", "rt-multi-thread"]}
futures = "0.3.30"

# Error handling:
anyhow = "1.0.86"
thiserror = "1.0.65"
thiserror = "2.0.17"

# Python FFI:
pyo3 = { version = "0.23.4", features = ["chrono"] }
Expand All @@ -36,4 +36,4 @@ once_cell = "1.21.3" # Only currently used for `unsync::OnceCell` as part of PyO

# Logging/tracing:
pyo3-log = "0.12.1"
tracing = { version = "0.1.41", features = ["log"] }
tracing = { version = "0.1.44", features = ["log"] }
11 changes: 8 additions & 3 deletions libs/opsqueue_python/opsqueue_python.nix
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ buildPythonPackage rec {
"./libs/opsqueue_python/Cargo.toml"
];

nativeBuildInputs = with rustPlatform; [
configurePhase = ''
rustc --version;
cargo --version
'';

nativeBuildInputs = [
perl
git
cargoSetupHook
maturinBuildHook
rustPlatform.cargoSetupHook
rustPlatform.maturinBuildHook
];

propagatedBuildInputs = [
Expand Down
21 changes: 18 additions & 3 deletions nix/overlay.nix
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
# Overlay for Nixpkgs which holds all opsqueue related packages.
final: prev:
let
pythonOverlay = import ./python-overlay.nix;

# This takes the tooling configuration (rust-analyzer, sources, etc.) from the
# toolchain file. We don't need all of these extra components for building Nix
# packages, but having these extra components does not affect the runtime
# closures of the built Nix packages, and since those packages also need to be
# available on CI anyways we'll only define a single toolchain that be reused
# for both packages and dev shells.
rustToolchain = final.rust-bin.fromRustupToolchainFile ../rust-toolchain.toml;

rustPlatform = final.makeRustPlatform {
cargo = rustToolchain;
rustc = rustToolchain;
};

pythonOverlay = import ./python-overlay.nix { inherit rustPlatform; };
in
{
opsqueue = final.callPackage ../opsqueue/opsqueue.nix { };
rustToolchain = rustToolchain;
opsqueue = final.callPackage ../opsqueue/opsqueue.nix { inherit rustPlatform; };

# The explicit choice is made not to override `python312`, as this will cause a rebuild of many
# packages when nixpkgs uses python 3.12 as default python environment.
# These packages should not be affected, e.g. cachix. This is because of a transitive
# dependency on the Python packages that we override.
# In our case cachix > ghc > shpinx > Python libraries.
pythonChannable = prev.python312.override { packageOverrides = pythonOverlay; };
pythonChannable = prev.python313.override { packageOverrides = pythonOverlay; };

}
5 changes: 4 additions & 1 deletion nix/python-overlay.nix
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{ rustPlatform }:
final: prev: {
opsqueue_python = final.callPackage ../libs/opsqueue_python/opsqueue_python.nix { };
opsqueue_python = final.callPackage ../libs/opsqueue_python/opsqueue_python.nix {
inherit rustPlatform;
};
}
46 changes: 23 additions & 23 deletions opsqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ edition = "2021"
description = "lightweight batch processing queue for heavy loads"
repository = "https://github.com/channable/opsqueue"
license = "MIT"
include=["opsqueue_example_database_schema.db"]

[lib]
name="opsqueue"
path="src/lib.rs"
include=["opsqueue_example_database_schema.db"]

[[bin]]
name="opsqueue"
Expand All @@ -19,12 +19,12 @@ required-features = ["server-logic"]
[dependencies]
# Datatypes and concurrency:
itertools = "0.14.0"
arc-swap = {version = "1.7.1", optional = true}
moka = { version = "0.12.8", features = ["sync"], optional = true }
arc-swap = {version = "1.8.0", optional = true}
moka = { version = "0.12.12", features = ["sync"], optional = true }
chrono = { version = "0.4.38", features = ["serde"]}
futures = "0.3.30"
tokio = { version = "1.38.0", features = ["macros", "signal", "rt-multi-thread"] }
uuid = {version = "1.11.0", features = [
tokio = { version = "1.49.0", features = ["macros", "signal", "rt-multi-thread"] }
uuid = {version = "1.19.0", features = [
"v7", # Timestamp-sortable UUIDs with random component
"fast-rng", # Use a faster (but still sufficiently random) RNG
"serde",
Expand All @@ -36,42 +36,42 @@ anyhow = "1.0.86"
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"], optional = true }
# Serialization:
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.124"
serde_json = "1.0.149"
ciborium = "0.2.2"
# Webservers/clients
http = "1.2.0"
object_store = {version = "0.11.1", features = ["gcp", "http"]}
http = "1.4.0"
object_store = {version = "0.13.0", features = ["gcp", "http"]}
snowflaked = {version = "1.0.3", features = ["sync"] }
tokio-tungstenite = {version = "0.24.0", optional = true}
axum = { version = "0.7.5", features = ["ws", "macros"], optional = true }
tokio-tungstenite = {version = "0.28.0", optional = true}
axum = { version = "0.8.8", features = ["ws", "macros"], optional = true }
reqwest = { version = "0.12.9", default-features = false, features = ["json", "rustls-tls"], optional = true }
url = {version = "2.5.2"}
tokio-util = { version = "0.7.11", features = ["io", "rt", "time"] }
tower-http = { version = "0.6.1", features = ["trace", "catch-panic"], optional = true }
url = {version = "2.5.8"}
tokio-util = { version = "0.7.18", features = ["io", "rt", "time"] }
tower-http = { version = "0.6.8", features = ["trace", "catch-panic"], optional = true }
# Logging and tracing:
tracing = {version = "0.1", features = ["log"] }
tracing-subscriber = {version = "0.3", features = ["std", "env-filter"] }
sentry = {version = "0.35", optional = true, default-features=false, features=["rustls", "reqwest"]}
sentry-tracing = {version = "0.35", optional = true}
sentry = {version = "0.46", optional = true, default-features=false, features=["rustls", "reqwest"]}
sentry-tracing = {version = "0.46", optional = true}
# Exporting traces to Opentelemetry:
axum-tracing-opentelemetry = {version = "0.24.0", optional = true }
axum-tracing-opentelemetry = {version = "0.32.2", optional = true }
opentelemetry = { version = "0.26", default-features = false, features = ["trace"] }
opentelemetry_sdk = { version = "0.26", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry-http = { version = "0.26" }
opentelemetry-otlp = { version = "0.26", optional = true }
tracing-opentelemetry = {version = "0.27.0" }
opentelemetry-semantic-conventions = {version = "0.26.0", features = ["semconv_experimental"], optional = true}
moro-local = "0.4.0"
thiserror = "1.0.65"
thiserror = "2.0.17"
either = "1.13.0"
serde-error = "0.1.3"
backon = { version = "1.3.0", features = ["tokio-sleep"] }
rand = "0.8.5"
backon = { version = "1.6.0", features = ["tokio-sleep"] }
rand = "0.9.2"
rustc-hash = "2.0.0"
axum-prometheus = {version = "0.7.0", optional = true}
axum-prometheus = {version = "0.10.0", optional = true}

# Configuration:
clap = { version = "4.5.21", features = ["derive"] }
clap = { version = "4.5.54", features = ["derive"] }
humantime = "2.1.0"

dashmap = "6.1.0"
Expand All @@ -80,8 +80,8 @@ crossbeam-skiplist = "0.1.3"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dev-dependencies]
criterion = {version = "0.3", features = ["async_tokio"]}
insta = { version = "1.41.1" }
criterion = {version = "0.8", features = ["async_tokio"]}
insta = { version = "1.46.0" }
assert_matches = { version = "1.5.0" }

# [[bench]]
Expand Down
8 changes: 7 additions & 1 deletion opsqueue/opsqueue.nix
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# Testing options
checkType ? "debug",
doCheck ? true,
useNextest ? false, # Disabled for now. Re-enable as part of https://github.com/channable/opsqueue/issues/81
useNextest ? true,
perl,
git,
}:
Expand Down Expand Up @@ -51,6 +51,12 @@ rustPlatform.buildRustPackage {
chmod +w /build/opsqueue/Cargo.lock
'';

# Print Rust and Cargo versions so we are 100% certain we are using the right ones
configurePhase = ''
rustc --version
cargo --version
'';

env = {
DATABASE_URL = "sqlite:///build/opsqueue/opsqueue_example_database_schema.db";
};
Expand Down
8 changes: 4 additions & 4 deletions opsqueue/src/consumer/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl From<ServerToClientMessage> for ws::Message {
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ServerToClientMessage");

ws::Message::Binary(writer)
ws::Message::Binary(writer.into())
}
}

Expand All @@ -114,7 +114,7 @@ impl From<Envelope<ClientToServerMessage>> for ws::Message {
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ClientToServerMessage");

ws::Message::Binary(writer)
ws::Message::Binary(writer.into())
}
}

Expand Down Expand Up @@ -146,7 +146,7 @@ impl From<ServerToClientMessage> for tokio_tungstenite::tungstenite::Message {
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ServerToClientMessage");

tokio_tungstenite::tungstenite::Message::Binary(writer)
tokio_tungstenite::tungstenite::Message::Binary(writer.into())
}
}

Expand All @@ -157,6 +157,6 @@ impl From<Envelope<ClientToServerMessage>> for tokio_tungstenite::tungstenite::M
ciborium::into_writer(&val, &mut writer)
.expect("Failed to serialize ClientToServerMessage");

tokio_tungstenite::tungstenite::Message::Binary(writer)
tokio_tungstenite::tungstenite::Message::Binary(writer.into())
}
}
4 changes: 2 additions & 2 deletions opsqueue/src/consumer/dispatcher/metastate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ mod tests {
.collect();

// Increment in one order
vals.shuffle(&mut rand::thread_rng());
vals.shuffle(&mut rand::rng());
for val in &vals {
sut.increment(key, val);
}
Expand All @@ -153,7 +153,7 @@ mod tests {
assert_eq!(too_highs.len(), n_groups);

// Decrement in a different order
vals.shuffle(&mut rand::thread_rng());
vals.shuffle(&mut rand::rng());
for val in &vals {
sut.decrement(key, val);
}
Expand Down
3 changes: 2 additions & 1 deletion opsqueue/src/consumer/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use axum::extract::ws::{Message, WebSocket};
use futures::SinkExt;
use tokio::{
select,
sync::{
Expand Down Expand Up @@ -134,7 +135,7 @@ impl ConsumerConn {
}
}

async fn graceful_shutdown(self) {
async fn graceful_shutdown(mut self) {
const GRACEFUL_WEBSOCKET_CLOSE_TIMEOUT: Duration = Duration::from_millis(100);
select! {
_ = self.ws_stream.close() => {},
Expand Down
1 change: 1 addition & 0 deletions opsqueue/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::common::chunk;
use futures::stream::{self, TryStreamExt};
use object_store::path::Path;
use object_store::DynObjectStore;
use object_store::ObjectStoreExt;
use reqwest::Url;
use ux::u63;

Expand Down
4 changes: 2 additions & 2 deletions opsqueue/src/producer/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ impl ServerState {
)
.route("/submissions/count", get(submissions_count))
.route(
"/submissions/lookup_id_by_prefix/:prefix",
"/submissions/lookup_id_by_prefix/{prefix}",
get(lookup_submission_id_by_prefix),
)
.route("/submissions/:submission_id", get(submission_status))
.route("/submissions/{submission_id}", get(submission_status))
.route("/version", get(crate::server::version_endpoint)) // We're also exposing it here so the producer client can view it
.with_state(self)
}
Expand Down