From 6af521e602fe22801a9f56801a747ab5a0fecc26 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Mon, 15 Dec 2025 14:41:07 +0100 Subject: [PATCH 1/4] fix(memory): compress only when size exceeds threshold --- src/run/uploader/upload.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/run/uploader/upload.rs b/src/run/uploader/upload.rs index 70ee50c4..0a7c725a 100644 --- a/src/run/uploader/upload.rs +++ b/src/run/uploader/upload.rs @@ -57,7 +57,7 @@ async fn create_profile_archive( ) -> Result { let time_start = std::time::Instant::now(); let profile_archive = match executor_name { - ExecutorName::Memory | ExecutorName::Valgrind => { + ExecutorName::Valgrind => { debug!("Creating compressed tar archive for Valgrind"); let enc = GzipEncoder::new(Vec::new()); let mut tar = Builder::new(enc); @@ -68,7 +68,7 @@ async fn create_profile_archive( let data = gzip_encoder.into_inner(); ProfileArchive::new_compressed_in_memory(data) } - ExecutorName::WallTime => { + ExecutorName::Memory | ExecutorName::WallTime => { // Check folder size to decide on compression let folder_size_bytes = calculate_folder_size(&execution_context.profile_folder).await?; From d244c5aec58f3bcca04b46786e84695159a42a91 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Mon, 15 Dec 2025 17:14:00 +0100 Subject: [PATCH 2/4] fix(memtrack): convert events in thread to avoid blocking at the end This lead to slowdowns of 10+ minutes in CI when dealing with millions of memtrack events. --- crates/memtrack/src/main.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/crates/memtrack/src/main.rs b/crates/memtrack/src/main.rs index 7d3d96ba..fefdb490 100644 --- a/crates/memtrack/src/main.rs +++ b/crates/memtrack/src/main.rs @@ -3,7 +3,7 @@ use clap::Parser; use ipc_channel::ipc::{self}; use log::{debug, info}; use memtrack::{Event, MemtrackIpcMessage, Tracker, handle_ipc_message}; -use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact}; +use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact, MemtrackEvent}; use std::path::PathBuf; use std::process::Command; use std::sync::atomic::{AtomicBool, Ordering}; @@ -53,9 +53,7 @@ fn main() -> Result<()> { let (root_pid, events, status) = track_command(&command, ipc_server).context("Failed to track command")?; - let result = MemtrackArtifact { - events: events.into_iter().map(|event| event.into()).collect(), - }; + let result = MemtrackArtifact { events }; result.save_with_pid_to(&out_dir, root_pid as libc::pid_t)?; std::process::exit(status.code().unwrap_or(1)); @@ -66,7 +64,7 @@ fn main() -> Result<()> { fn track_command( cmd_string: &str, ipc_server_name: Option, -) -> anyhow::Result<(u32, Vec, std::process::ExitStatus)> { +) -> anyhow::Result<(u32, Vec, std::process::ExitStatus)> { let events = Arc::new(Mutex::new(Vec::new())); let tracker = Tracker::new()?; @@ -116,7 +114,7 @@ fn track_command( continue; }; - e.push(event); + e.push(event.into()); } }); From 78e2e133397f945b391044b0b824b7587700762b Mon Sep 17 00:00:00 2001 From: not-matthias Date: Tue, 16 Dec 2025 15:03:24 +0100 Subject: [PATCH 3/4] fix(memtrack): collect events in thread to avoid mutex overhead --- crates/memtrack/src/main.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/crates/memtrack/src/main.rs b/crates/memtrack/src/main.rs index fefdb490..e3ead412 100644 --- a/crates/memtrack/src/main.rs +++ b/crates/memtrack/src/main.rs @@ -2,7 +2,7 @@ use anyhow::{Context, Result, anyhow}; use clap::Parser; use ipc_channel::ipc::{self}; use log::{debug, info}; -use memtrack::{Event, MemtrackIpcMessage, Tracker, handle_ipc_message}; +use memtrack::{MemtrackIpcMessage, Tracker, handle_ipc_message}; use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact, MemtrackEvent}; use std::path::PathBuf; use std::process::Command; @@ -65,7 +65,6 @@ fn track_command( cmd_string: &str, ipc_server_name: Option, ) -> anyhow::Result<(u32, Vec, std::process::ExitStatus)> { - let events = Arc::new(Mutex::new(Vec::new())); let tracker = Tracker::new()?; let tracker_arc = Arc::new(Mutex::new(tracker)); @@ -97,10 +96,10 @@ fn track_command( info!("Spawned child with pid {root_pid}"); // Spawn event processing thread - let events_clone = events.clone(); let process_events = Arc::new(AtomicBool::new(true)); let process_events_clone = process_events.clone(); let processing_thread = thread::spawn(move || { + let mut events = Vec::new(); loop { if !process_events_clone.load(Ordering::Relaxed) { break; @@ -110,12 +109,9 @@ fn track_command( continue; }; - let Ok(mut e) = events_clone.lock() else { - continue; - }; - - e.push(event.into()); + events.push(event.into()); } + events }); // Wait for the command to complete @@ -124,14 +120,12 @@ fn track_command( info!("Waiting for the event processing thread to finish"); process_events.store(false, Ordering::Relaxed); - processing_thread + let events = processing_thread .join() .map_err(|_| anyhow::anyhow!("Failed to join event thread"))?; // IPC thread will exit when channel closes drop(ipc_handle); - info!("Unwrapping and returning events"); - let events = Arc::try_unwrap(events).map_err(|_| anyhow::anyhow!("Failed to unwrap events"))?; - Ok((root_pid as u32, events.into_inner()?, status)) + Ok((root_pid as u32, events, status)) } From 12452f080928484828963327c8bfd4089ea97366 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Tue, 16 Dec 2025 18:39:20 +0100 Subject: [PATCH 4/4] feat(memtrack): serialize events serially to allow streamed decoding --- .../runner-shared/src/artifacts/memtrack.rs | 77 ++++++++++++++++++- crates/runner-shared/src/artifacts/mod.rs | 10 ++- 2 files changed, 82 insertions(+), 5 deletions(-) diff --git a/crates/runner-shared/src/artifacts/memtrack.rs b/crates/runner-shared/src/artifacts/memtrack.rs index 5be15242..484dd234 100644 --- a/crates/runner-shared/src/artifacts/memtrack.rs +++ b/crates/runner-shared/src/artifacts/memtrack.rs @@ -1,13 +1,32 @@ use libc::pid_t; use serde::{Deserialize, Serialize}; +use std::io::{Read, Write}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MemtrackArtifact { pub events: Vec, } -impl super::ArtifactExt for MemtrackArtifact {} +impl super::ArtifactExt for MemtrackArtifact { + fn encode_to_writer(&self, writer: W) -> anyhow::Result<()> { + // This is required for `decode_streamed`: We can't stream the deserialization of + // the whole artifact, so we have to encode them one by one. + let mut serializer = rmp_serde::Serializer::new(writer); + for event in &self.events { + event.serialize(&mut serializer)?; + } + Ok(()) + } +} + +impl MemtrackArtifact { + pub fn decode_streamed(reader: R) -> anyhow::Result> { + Ok(MemtrackEventStream { + deserializer: rmp_serde::Deserializer::new(reader), + }) + } +} -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub struct MemtrackEvent { pub pid: pid_t, pub tid: pid_t, @@ -17,7 +36,7 @@ pub struct MemtrackEvent { pub kind: MemtrackEventKind, } -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "type")] pub enum MemtrackEventKind { Malloc { size: u64 }, @@ -29,3 +48,55 @@ pub enum MemtrackEventKind { Munmap { size: u64 }, Brk { size: u64 }, } + +pub struct MemtrackEventStream { + deserializer: rmp_serde::Deserializer>, +} + +impl Iterator for MemtrackEventStream { + type Item = MemtrackEvent; + + fn next(&mut self) -> Option { + MemtrackEvent::deserialize(&mut self.deserializer).ok() + } +} + +#[cfg(test)] +mod tests { + use crate::artifacts::ArtifactExt; + + use super::*; + use std::io::Cursor; + + #[test] + fn test_decode_streamed() -> anyhow::Result<()> { + let events = vec![ + MemtrackEvent { + pid: 1, + tid: 11, + timestamp: 100, + addr: 0x10, + kind: MemtrackEventKind::Malloc { size: 64 }, + }, + MemtrackEvent { + pid: 1, + tid: 12, + timestamp: 200, + addr: 0x20, + kind: MemtrackEventKind::Free, + }, + ]; + + let artifact = MemtrackArtifact { + events: events.clone(), + }; + let mut buf = Vec::new(); + artifact.encode_to_writer(&mut buf)?; + + let stream = MemtrackArtifact::decode_streamed(Cursor::new(buf))?; + let collected: Vec<_> = stream.collect(); + assert_eq!(collected, events); + + Ok(()) + } +} diff --git a/crates/runner-shared/src/artifacts/mod.rs b/crates/runner-shared/src/artifacts/mod.rs index a2d015bf..1b3cb7f5 100644 --- a/crates/runner-shared/src/artifacts/mod.rs +++ b/crates/runner-shared/src/artifacts/mod.rs @@ -17,14 +17,20 @@ where std::any::type_name::().rsplit("::").next().unwrap() } + fn encode_to_writer(&self, mut writer: W) -> anyhow::Result<()> { + let encoded = rmp_serde::to_vec_named(self)?; + writer.write_all(&encoded)?; + Ok(()) + } + fn save_file_to>( &self, folder: P, filename: &str, ) -> anyhow::Result<()> { std::fs::create_dir_all(folder.as_ref())?; - let data = rmp_serde::to_vec_named(self)?; - std::fs::write(folder.as_ref().join(filename), data)?; + let file = std::fs::File::create(folder.as_ref().join(filename))?; + self.encode_to_writer(file)?; debug!("Saved {} result to {:?}", Self::name(), folder.as_ref()); Ok(())