Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions crates/memtrack/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use anyhow::{Context, Result, anyhow};
use clap::Parser;
use ipc_channel::ipc::{self};
use log::{debug, info};
use memtrack::{Event, MemtrackIpcMessage, Tracker, handle_ipc_message};
use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact};
use memtrack::{MemtrackIpcMessage, Tracker, handle_ipc_message};
use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact, MemtrackEvent};
use std::path::PathBuf;
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -53,9 +53,7 @@ fn main() -> Result<()> {

let (root_pid, events, status) =
track_command(&command, ipc_server).context("Failed to track command")?;
let result = MemtrackArtifact {
events: events.into_iter().map(|event| event.into()).collect(),
};
let result = MemtrackArtifact { events };
result.save_with_pid_to(&out_dir, root_pid as libc::pid_t)?;

std::process::exit(status.code().unwrap_or(1));
Expand All @@ -66,8 +64,7 @@ fn main() -> Result<()> {
fn track_command(
cmd_string: &str,
ipc_server_name: Option<String>,
) -> anyhow::Result<(u32, Vec<Event>, std::process::ExitStatus)> {
let events = Arc::new(Mutex::new(Vec::new()));
) -> anyhow::Result<(u32, Vec<MemtrackEvent>, std::process::ExitStatus)> {
let tracker = Tracker::new()?;

let tracker_arc = Arc::new(Mutex::new(tracker));
Expand Down Expand Up @@ -99,10 +96,10 @@ fn track_command(
info!("Spawned child with pid {root_pid}");

// Spawn event processing thread
let events_clone = events.clone();
let process_events = Arc::new(AtomicBool::new(true));
let process_events_clone = process_events.clone();
let processing_thread = thread::spawn(move || {
let mut events = Vec::new();
loop {
if !process_events_clone.load(Ordering::Relaxed) {
break;
Expand All @@ -112,12 +109,9 @@ fn track_command(
continue;
};

let Ok(mut e) = events_clone.lock() else {
continue;
};

e.push(event);
events.push(event.into());
}
events
});

// Wait for the command to complete
Expand All @@ -126,14 +120,12 @@ fn track_command(

info!("Waiting for the event processing thread to finish");
process_events.store(false, Ordering::Relaxed);
processing_thread
let events = processing_thread
.join()
.map_err(|_| anyhow::anyhow!("Failed to join event thread"))?;

// IPC thread will exit when channel closes
drop(ipc_handle);

info!("Unwrapping and returning events");
let events = Arc::try_unwrap(events).map_err(|_| anyhow::anyhow!("Failed to unwrap events"))?;
Ok((root_pid as u32, events.into_inner()?, status))
Ok((root_pid as u32, events, status))
}
77 changes: 74 additions & 3 deletions crates/runner-shared/src/artifacts/memtrack.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
use libc::pid_t;
use serde::{Deserialize, Serialize};
use std::io::{Read, Write};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemtrackArtifact {
pub events: Vec<MemtrackEvent>,
}
impl super::ArtifactExt for MemtrackArtifact {}
impl super::ArtifactExt for MemtrackArtifact {
fn encode_to_writer<W: Write>(&self, writer: W) -> anyhow::Result<()> {
// This is required for `decode_streamed`: We can't stream the deserialization of
// the whole artifact, so we have to encode them one by one.
let mut serializer = rmp_serde::Serializer::new(writer);
for event in &self.events {
event.serialize(&mut serializer)?;
}
Ok(())
}
}

impl MemtrackArtifact {
pub fn decode_streamed<R: std::io::Read>(reader: R) -> anyhow::Result<MemtrackEventStream<R>> {
Ok(MemtrackEventStream {
deserializer: rmp_serde::Deserializer::new(reader),
})
}
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub struct MemtrackEvent {
pub pid: pid_t,
pub tid: pid_t,
Expand All @@ -17,7 +36,7 @@ pub struct MemtrackEvent {
pub kind: MemtrackEventKind,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type")]
pub enum MemtrackEventKind {
Malloc { size: u64 },
Expand All @@ -29,3 +48,55 @@ pub enum MemtrackEventKind {
Munmap { size: u64 },
Brk { size: u64 },
}

pub struct MemtrackEventStream<R: Read> {
deserializer: rmp_serde::Deserializer<rmp_serde::decode::ReadReader<R>>,
}

impl<R: Read> Iterator for MemtrackEventStream<R> {
type Item = MemtrackEvent;

fn next(&mut self) -> Option<Self::Item> {
MemtrackEvent::deserialize(&mut self.deserializer).ok()
}
}

#[cfg(test)]
mod tests {
use crate::artifacts::ArtifactExt;

use super::*;
use std::io::Cursor;

#[test]
fn test_decode_streamed() -> anyhow::Result<()> {
let events = vec![
MemtrackEvent {
pid: 1,
tid: 11,
timestamp: 100,
addr: 0x10,
kind: MemtrackEventKind::Malloc { size: 64 },
},
MemtrackEvent {
pid: 1,
tid: 12,
timestamp: 200,
addr: 0x20,
kind: MemtrackEventKind::Free,
},
];

let artifact = MemtrackArtifact {
events: events.clone(),
};
let mut buf = Vec::new();
artifact.encode_to_writer(&mut buf)?;

let stream = MemtrackArtifact::decode_streamed(Cursor::new(buf))?;
let collected: Vec<_> = stream.collect();
assert_eq!(collected, events);

Ok(())
}
}
10 changes: 8 additions & 2 deletions crates/runner-shared/src/artifacts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@ where
std::any::type_name::<Self>().rsplit("::").next().unwrap()
}

fn encode_to_writer<W: std::io::Write>(&self, mut writer: W) -> anyhow::Result<()> {
let encoded = rmp_serde::to_vec_named(self)?;
writer.write_all(&encoded)?;
Ok(())
}

fn save_file_to<P: AsRef<std::path::Path>>(
&self,
folder: P,
filename: &str,
) -> anyhow::Result<()> {
std::fs::create_dir_all(folder.as_ref())?;
let data = rmp_serde::to_vec_named(self)?;
std::fs::write(folder.as_ref().join(filename), data)?;
let file = std::fs::File::create(folder.as_ref().join(filename))?;
self.encode_to_writer(file)?;

debug!("Saved {} result to {:?}", Self::name(), folder.as_ref());
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/run/uploader/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn create_profile_archive(
) -> Result<ProfileArchive> {
let time_start = std::time::Instant::now();
let profile_archive = match executor_name {
ExecutorName::Memory | ExecutorName::Valgrind => {
ExecutorName::Valgrind => {
debug!("Creating compressed tar archive for Valgrind");
let enc = GzipEncoder::new(Vec::new());
let mut tar = Builder::new(enc);
Expand All @@ -68,7 +68,7 @@ async fn create_profile_archive(
let data = gzip_encoder.into_inner();
ProfileArchive::new_compressed_in_memory(data)
}
ExecutorName::WallTime => {
ExecutorName::Memory | ExecutorName::WallTime => {
// Check folder size to decide on compression
let folder_size_bytes =
calculate_folder_size(&execution_context.profile_folder).await?;
Expand Down
Loading