diff --git a/src/api.rs b/src/api.rs index 06f7d41..d4a7d16 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,4 +1,4 @@ -use crate::jobs::{Job, JobProgress, JobQueue, JobStatus, Quadrant, VideoQuadrantSelection, WebDavConfig}; +use crate::jobs::{Job, JobProgress, JobQueue, JobStatus, LogEntry, Quadrant, VideoQuadrantSelection, WebDavConfig}; use crate::webdav::WebDavClient; use anyhow::Result; use axum::{ @@ -90,6 +90,7 @@ pub async fn run_server(port: u16, data_dir: &str) -> Result<()> { .route("/api/jobs/pending", get(get_pending_job)) .route("/api/jobs/claim", post(claim_job)) .route("/api/jobs/{id}/progress", patch(update_job_progress)) + .route("/api/jobs/{id}/logs", post(append_job_logs)) .route("/health", get(health_check)) // Static files for worker provisioning .route("/assets/worker", get(serve_worker_binary)) @@ -456,6 +457,49 @@ async fn update_job_progress( } } +#[derive(Debug, Deserialize)] +struct AppendLogsRequest { + logs: Vec, +} + +#[derive(Debug, Deserialize)] +struct LogEntryRequest { + timestamp: String, + level: String, + message: String, +} + +/// Append log entries to a job +async fn append_job_logs( + State(state): State, + Path(id): Path, + Json(req): Json, +) -> Response { + let logs: Vec = req.logs.into_iter().map(|l| { + LogEntry { + timestamp: chrono::DateTime::parse_from_rfc3339(&l.timestamp) + .map(|dt| dt.with_timezone(&chrono::Utc)) + .unwrap_or_else(|_| chrono::Utc::now()), + level: l.level, + message: l.message, + } + }).collect(); + + let queue = state.queue.lock().unwrap(); + match queue.append_job_logs(&id, logs) { + Ok(_) => { + StatusCode::OK.into_response() + } + Err(e) => { + error!("Failed to append job logs: {}", e); + AppError { + status: StatusCode::NOT_FOUND, + message: format!("Job not found: {}", e), + }.into_response() + } + } +} + async fn serve_worker_binary() -> Response { // Serve the Linux worker binary from ./assets/worker-linux let path = "./assets/worker-linux"; diff --git a/src/jobs.rs b/src/jobs.rs index c39ea46..a13bf84 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -2,6 +2,7 @@ use anyhow::Result; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::fs; +use std::sync::{Arc, Mutex}; use tokio::time::{interval, Duration}; use tracing::{info, warn, error}; use urlencoding::encode; @@ -70,6 +71,13 @@ pub struct JobProgress { pub stage: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LogEntry { + pub timestamp: DateTime, + pub level: String, + pub message: String, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Job { pub id: String, @@ -85,6 +93,8 @@ pub struct Job { pub webdav_config: WebDavConfig, #[serde(default)] pub progress: Option, + #[serde(default)] + pub logs: Vec, } pub struct JobQueue { @@ -147,6 +157,7 @@ impl JobQueue { worker_id: None, webdav_config, progress: None, + logs: Vec::new(), }; jobs.push(job.clone()); @@ -240,6 +251,113 @@ impl JobQueue { Ok(job) } + + /// Append log entries to a job + pub fn append_job_logs(&self, job_id: &str, new_logs: Vec) -> Result { + let mut jobs = self.load_jobs()?; + let job = jobs + .iter_mut() + .find(|j| j.id == job_id) + .ok_or_else(|| anyhow::anyhow!("Job not found: {}", job_id))?; + + job.logs.extend(new_logs); + // Keep only the last 1000 log entries to prevent unbounded growth + if job.logs.len() > 1000 { + job.logs = job.logs.split_off(job.logs.len() - 1000); + } + let job = job.clone(); + self.save_jobs(&jobs)?; + + Ok(job) + } +} + +/// A logger that buffers log entries and sends them to the server periodically +#[derive(Clone)] +pub struct RemoteLogger { + queue_url: String, + job_id: String, + buffer: Arc>>, +} + +impl RemoteLogger { + pub fn new(queue_url: String, job_id: String) -> Self { + Self { + queue_url, + job_id, + buffer: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Add a log entry to the buffer + pub fn log(&self, level: &str, message: String) { + let entry = LogEntry { + timestamp: Utc::now(), + level: level.to_string(), + message, + }; + if let Ok(mut buffer) = self.buffer.lock() { + buffer.push(entry); + } + } + + /// Flush buffered logs to the server + pub async fn flush(&self) { + let logs = { + let mut buffer = match self.buffer.lock() { + Ok(b) => b, + Err(_) => return, + }; + std::mem::take(&mut *buffer) + }; + + if logs.is_empty() { + return; + } + + let client = reqwest::Client::new(); + let url = format!("{}/jobs/{}/logs", self.queue_url, self.job_id); + + #[derive(Serialize)] + struct LogsPayload { + logs: Vec, + } + + #[derive(Serialize)] + struct LogPayload { + timestamp: String, + level: String, + message: String, + } + + let payload = LogsPayload { + logs: logs.iter().map(|l| LogPayload { + timestamp: l.timestamp.to_rfc3339(), + level: l.level.clone(), + message: l.message.clone(), + }).collect(), + }; + + // Fire and forget - don't block on this + let _ = client.post(&url).json(&payload).send().await; + } + + /// Helper macros-like methods + pub fn info(&self, msg: impl Into) { + self.log("INFO", msg.into()); + } + + pub fn warn(&self, msg: impl Into) { + self.log("WARN", msg.into()); + } + + pub fn error(&self, msg: impl Into) { + self.log("ERROR", msg.into()); + } + + pub fn debug(&self, msg: impl Into) { + self.log("DEBUG", msg.into()); + } } pub async fn run_worker(queue_url: String) -> Result<()> { @@ -304,50 +422,88 @@ async fn claim_job(queue_url: &str, worker_id: &str) -> Result> { } async fn process_job(job: Job) -> Result<()> { - info!("=== PROCESSING JOB START ==="); - info!("Job ID: {}", job.id); - info!("Video path: {}", job.video_path); - info!("Output path: {}", job.output_path); - info!("Selection: {:?}", job.selection); - info!("WebDAV URL: {}", job.webdav_config.url); - info!("Queue URL: {:?}", job.webdav_config.queue_url); + // Create remote logger if we have a queue URL + let rlog = job.webdav_config.queue_url.as_ref().map(|url| { + RemoteLogger::new(url.clone(), job.id.clone()) + }); + + // Helper macro to log to both local and remote + macro_rules! log_both { + (info, $($arg:tt)*) => {{ + let msg = format!($($arg)*); + info!("{}", msg); + if let Some(ref logger) = rlog { + logger.info(&msg); + } + }}; + (warn, $($arg:tt)*) => {{ + let msg = format!($($arg)*); + warn!("{}", msg); + if let Some(ref logger) = rlog { + logger.warn(&msg); + } + }}; + (error, $($arg:tt)*) => {{ + let msg = format!($($arg)*); + error!("{}", msg); + if let Some(ref logger) = rlog { + logger.error(&msg); + } + }}; + } + + log_both!(info, "=== PROCESSING JOB START ==="); + log_both!(info, "Job ID: {}", job.id); + log_both!(info, "Video path: {}", job.video_path); + log_both!(info, "Output path: {}", job.output_path); + log_both!(info, "Selection: {:?}", job.selection); + + // Flush initial logs + if let Some(ref logger) = rlog { + logger.flush().await; + } let worker_id = format!("worker-{}", uuid::Uuid::new_v4().simple()); let temp_dir = format!("/tmp/worker-{}", worker_id); - info!("Creating temp dir: {}", temp_dir); + log_both!(info, "Creating temp dir: {}", temp_dir); fs::create_dir_all(&temp_dir)?; // Build input URL with auth for direct FFmpeg streaming let video_url = build_webdav_download_url(&job.webdav_config, &job.video_path); - info!("Video URL for FFmpeg: {}", video_url); + log_both!(info, "Video URL for FFmpeg: {}", video_url); // Background image path (downloaded by cloud-init to /root) let bg_image_path = "/root/gpc-bg.png"; - info!("Background image path: {}", bg_image_path); + log_both!(info, "Background image path: {}", bg_image_path); // Check if background image exists if std::path::Path::new(bg_image_path).exists() { - info!("Background image exists at {}", bg_image_path); + log_both!(info, "Background image exists at {}", bg_image_path); } else { - error!("Background image NOT FOUND at {}", bg_image_path); + log_both!(error, "Background image NOT FOUND at {}", bg_image_path); // Try to list /root to see what's there if let Ok(entries) = std::fs::read_dir("/root") { - info!("Contents of /root:"); + log_both!(info, "Contents of /root:"); for entry in entries { if let Ok(e) = entry { - info!(" - {:?}", e.path()); + log_both!(info, " - {:?}", e.path()); } } } } + // Flush logs before starting FFmpeg + if let Some(ref logger) = rlog { + logger.flush().await; + } + // Build FFmpeg filter complex based on quadrant selection let filter_complex = build_filter_complex(&job.selection)?; - info!("FFmpeg filter: {}", filter_complex); + log_both!(info, "FFmpeg filter: {}", filter_complex); // Local output path for FFmpeg let local_output_path = format!("{}/output.mp4", temp_dir); - info!("Local output path: {}", local_output_path); + log_both!(info, "Local output path: {}", local_output_path); // Report initial progress if let Some(queue_url) = &job.webdav_config.queue_url { @@ -357,7 +513,7 @@ async fn process_job(job: Job) -> Result<()> { }).await; } - info!("Starting FFmpeg (output to local file)..."); + log_both!(info, "Starting FFmpeg (output to local file)..."); // Run FFmpeg command with progress parsing // Use -progress pipe:1 to get machine-readable progress on stdout @@ -383,6 +539,7 @@ async fn process_job(job: Job) -> Result<()> { let stdout = child.stdout.take(); let queue_url_clone = job.webdav_config.queue_url.clone(); let job_id_clone = job.id.clone(); + let rlog_clone = rlog.clone(); // Spawn a task to read and parse progress let progress_handle = tokio::spawn(async move { @@ -396,6 +553,13 @@ async fn process_job(job: Job) -> Result<()> { let mut current_speed: Option = None; let mut total_duration: Option = None; let mut last_report = std::time::Instant::now(); + let mut last_log_flush = std::time::Instant::now(); + let mut progress_count = 0u32; + + info!("Starting to read FFmpeg progress from stdout..."); + if let Some(ref logger) = rlog_clone { + logger.info("Starting to read FFmpeg progress from stdout..."); + } while let Ok(Some(line)) = lines.next_line().await { // Parse FFmpeg progress output format: @@ -420,9 +584,16 @@ async fn process_job(job: Job) -> Result<()> { } else if let Some(value) = line.strip_prefix("duration=") { total_duration = Some(value.trim().to_string()); } else if line.starts_with("progress=") { + progress_count += 1; // End of a progress block - report to server (throttled) if last_report.elapsed() >= std::time::Duration::from_secs(2) { if let Some(queue_url) = &queue_url_clone { + let msg = format!("Progress update #{}: frame={:?}, time={:?}, speed={:?}", + progress_count, current_frame, current_time, current_speed); + info!("{}", msg); + if let Some(ref logger) = rlog_clone { + logger.info(&msg); + } let progress = JobProgress { frame: current_frame, total_frames: None, @@ -432,29 +603,71 @@ async fn process_job(job: Job) -> Result<()> { percent: None, // Could calculate from time/duration stage: Some("Encoding".to_string()), }; - let _ = update_job_progress_remote(queue_url, &job_id_clone, progress).await; + match update_job_progress_remote(queue_url, &job_id_clone, progress).await { + Ok(_) => info!("Progress update sent successfully"), + Err(e) => error!("Failed to send progress update: {}", e), + } } last_report = std::time::Instant::now(); } + + // Flush logs every 30 seconds during encoding + if last_log_flush.elapsed() >= std::time::Duration::from_secs(30) { + if let Some(ref logger) = rlog_clone { + logger.flush().await; + } + last_log_flush = std::time::Instant::now(); + } } } + info!("Finished reading FFmpeg progress. Total progress blocks: {}", progress_count); + if let Some(ref logger) = rlog_clone { + logger.info(format!("Finished reading FFmpeg progress. Total progress blocks: {}", progress_count)); + logger.flush().await; + } + } else { + warn!("No stdout available from FFmpeg process"); + if let Some(ref logger) = rlog_clone { + logger.warn("No stdout available from FFmpeg process"); + } } }); + // Read stderr in a separate task + let stderr_handle = { + let stderr = child.stderr.take(); + tokio::spawn(async move { + if let Some(stderr) = stderr { + use tokio::io::AsyncReadExt; + let mut buf = String::new(); + let mut reader = tokio::io::BufReader::new(stderr); + let _ = reader.read_to_string(&mut buf).await; + buf + } else { + String::new() + } + }) + }; + // Wait for FFmpeg to complete - let output = child.wait_with_output().await?; + let status = child.wait().await?; - // Wait for progress parsing to finish + // Wait for progress parsing and stderr reading to finish let _ = progress_handle.await; + let stderr = stderr_handle.await.unwrap_or_default(); - let stderr = String::from_utf8_lossy(&output.stderr); - info!("FFmpeg exit status: {}", output.status); + log_both!(info, "FFmpeg exit status: {}", status); if !stderr.is_empty() { - info!("FFmpeg stderr: {}", stderr); + log_both!(info, "FFmpeg stderr: {}", stderr); } - if output.status.success() { - info!("FFmpeg processing successful!"); + // Flush logs after FFmpeg completes + if let Some(ref logger) = rlog { + logger.flush().await; + } + + if status.success() { + log_both!(info, "FFmpeg processing successful!"); // Report upload stage if let Some(queue_url) = &job.webdav_config.queue_url { @@ -466,14 +679,14 @@ async fn process_job(job: Job) -> Result<()> { // Check output file size match fs::metadata(&local_output_path) { - Ok(meta) => info!("Output file size: {} bytes", meta.len()), - Err(e) => error!("Failed to stat output file: {}", e), + Ok(meta) => log_both!(info, "Output file size: {} bytes", meta.len()), + Err(e) => log_both!(error, "Failed to stat output file: {}", e), } // Now upload to WebDAV - info!("Reading output file for upload..."); + log_both!(info, "Reading output file for upload..."); let output_data = fs::read(&local_output_path)?; - info!("Read {} bytes, uploading to WebDAV...", output_data.len()); + log_both!(info, "Read {} bytes, uploading to WebDAV...", output_data.len()); let dav_client = WebDavClient::new(&job.webdav_config)?; @@ -482,48 +695,59 @@ async fn process_job(job: Job) -> Result<()> { if let Some(folder_end) = job.output_path.rfind('/') { let folder = &job.output_path[..folder_end]; if !folder.is_empty() { - info!("Ensuring folder exists: {}", folder); + log_both!(info, "Ensuring folder exists: {}", folder); if let Err(e) = dav_client.ensure_folder_exists(folder).await { - warn!("Could not create folder {}: {} (may already exist)", folder, e); + log_both!(warn, "Could not create folder {}: {} (may already exist)", folder, e); } } } - info!("Uploading to: {}", job.output_path); + log_both!(info, "Uploading to: {}", job.output_path); match dav_client.upload_file(&job.output_path, output_data).await { Ok(_) => { - info!("Upload successful!"); - info!("Job {} completed successfully", job.id); + log_both!(info, "Upload successful!"); + log_both!(info, "Job {} completed successfully", job.id); // Update job to completed via queue URL if let Some(queue_url) = &job.webdav_config.queue_url { - info!("Updating job status to completed at: {}", queue_url); + log_both!(info, "Updating job status to completed at: {}", queue_url); match update_job_status_remote(queue_url, &job.id, JobStatus::Completed, None).await { - Ok(_) => info!("Status update successful"), - Err(e) => error!("Status update failed: {}", e), + Ok(_) => log_both!(info, "Status update successful"), + Err(e) => log_both!(error, "Status update failed: {}", e), } } } Err(e) => { - error!("Upload FAILED: {}", e); + log_both!(error, "Upload FAILED: {}", e); if let Some(queue_url) = &job.webdav_config.queue_url { let _ = update_job_status_remote(queue_url, &job.id, JobStatus::Failed, None).await; } } } } else { - error!("FFmpeg FAILED with exit code: {}", output.status); + log_both!(error, "FFmpeg FAILED with exit code: {}", status); if let Some(queue_url) = &job.webdav_config.queue_url { let _ = update_job_status_remote(queue_url, &job.id, JobStatus::Failed, None).await; } } + // Final flush before cleanup + if let Some(ref logger) = rlog { + logger.flush().await; + } + // Cleanup temp directory - info!("Cleaning up temp dir: {}", temp_dir); + log_both!(info, "Cleaning up temp dir: {}", temp_dir); let _ = fs::remove_dir_all(&temp_dir); - info!("=== PROCESSING JOB END ==="); + log_both!(info, "=== PROCESSING JOB END ==="); + + // Final flush + if let Some(ref logger) = rlog { + logger.flush().await; + } + Ok(()) } diff --git a/templates/index.html b/templates/index.html index bd1c060..2617b83 100644 --- a/templates/index.html +++ b/templates/index.html @@ -296,6 +296,63 @@ font-weight: 500; } + .job-logs-toggle { + background: none; + border: none; + color: #4a9eff; + cursor: pointer; + font-size: 12px; + padding: 4px 8px; + margin-top: 8px; + } + + .job-logs-toggle:hover { + text-decoration: underline; + } + + .job-logs { + margin-top: 10px; + background: #0a0a14; + border-radius: 6px; + padding: 12px; + max-height: 300px; + overflow-y: auto; + font-family: 'Consolas', 'Monaco', monospace; + font-size: 11px; + line-height: 1.5; + } + + .job-logs.hidden { + display: none; + } + + .log-entry { + display: flex; + gap: 8px; + } + + .log-timestamp { + color: #666; + flex-shrink: 0; + } + + .log-level { + flex-shrink: 0; + padding: 0 4px; + border-radius: 2px; + font-weight: 500; + } + + .log-level-INFO { color: #4a9eff; } + .log-level-WARN { color: #ffd700; } + .log-level-ERROR { color: #ef4444; } + .log-level-DEBUG { color: #888; } + + .log-message { + color: #ccc; + word-break: break-all; + } + .loading { display: inline-block; width: 16px; @@ -669,21 +726,60 @@

${video.name}

${job.selection.presentation}/${job.selection.slides} - ${job.status === 'Processing' && job.progress ? ` + ${job.status === 'Processing' ? `
-
+
- ${job.progress.stage || 'Processing'} - ${job.progress.time || ''} ${job.progress.speed ? '@ ' + job.progress.speed : ''} + ${job.progress?.stage || 'Waiting for progress...'} + ${job.progress?.time || ''} ${job.progress?.speed ? '@ ' + job.progress.speed : ''}
` : ''} + ${job.logs && job.logs.length > 0 ? ` + + + ` : ''} `).join(''); } + function toggleLogs(jobId) { + const logsEl = document.getElementById(`logs-${jobId}`); + const toggleEl = logsEl.previousElementSibling; + if (logsEl.classList.contains('hidden')) { + logsEl.classList.remove('hidden'); + toggleEl.textContent = toggleEl.textContent.replace('Show', 'Hide'); + // Scroll to bottom to show latest logs + logsEl.scrollTop = logsEl.scrollHeight; + } else { + logsEl.classList.add('hidden'); + toggleEl.textContent = toggleEl.textContent.replace('Hide', 'Show'); + } + } + + function formatLogTimestamp(timestamp) { + const date = new Date(timestamp); + return date.toLocaleTimeString('en-US', { hour12: false, hour: '2-digit', minute: '2-digit', second: '2-digit' }); + } + + function escapeHtml(str) { + const div = document.createElement('div'); + div.textContent = str; + return div.innerHTML; + } + function formatBytes(bytes) { if (bytes === 0) return '0 Bytes'; const k = 1024;