diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8f1a171dca..4f8fe86e45 100755 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2247,6 +2247,7 @@ dependencies = [ "crossterm", "log", "mock-anthropic-service", + "nix", "plugins", "pulldown-cmark", "runtime", diff --git a/rust/crates/runtime/src/conversation.rs b/rust/crates/runtime/src/conversation.rs index 9c36329a16..47a09fa016 100644 --- a/rust/crates/runtime/src/conversation.rs +++ b/rust/crates/runtime/src/conversation.rs @@ -1,5 +1,9 @@ use std::collections::BTreeMap; use std::fmt::{Display, Formatter}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; use serde_json::{Map, Value}; use telemetry::SessionTracer; @@ -53,6 +57,41 @@ pub struct PromptCacheEvent { pub token_drop: u32, } +/// Shared flag used to request graceful interruption of a running turn. +/// +/// Cloning shares the underlying flag, mirroring +/// [`HookAbortSignal`](crate::hooks::HookAbortSignal). An input listener +/// (e.g. Esc or Ctrl+C handling in the CLI) sets the flag while the +/// conversation loop and the streaming API client poll it at safe points. +/// When the flag is observed, the turn winds down without treating the +/// stop as a failure: pending tool calls receive synthesized error +/// results so the session stays consistent, and [`TurnSummary`] reports +/// `interrupted: true`. +#[derive(Debug, Clone, Default)] +pub struct TurnInterruptSignal { + interrupted: Arc, +} + +impl TurnInterruptSignal { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + pub fn interrupt(&self) { + self.interrupted.store(true, Ordering::SeqCst); + } + + #[must_use] + pub fn is_interrupted(&self) -> bool { + self.interrupted.load(Ordering::SeqCst) + } + + pub fn reset(&self) { + self.interrupted.store(false, Ordering::SeqCst); + } +} + /// Minimal streaming API contract required by [`ConversationRuntime`]. pub trait ApiClient { fn stream(&mut self, request: ApiRequest) -> Result, RuntimeError>; @@ -118,6 +157,7 @@ pub struct TurnSummary { pub iterations: usize, pub usage: TokenUsage, pub auto_compaction: Option, + pub interrupted: bool, } /// Details about automatic session compaction applied during a turn. @@ -138,6 +178,7 @@ pub struct ConversationRuntime { hook_runner: HookRunner, auto_compaction_input_tokens_threshold: u32, hook_abort_signal: HookAbortSignal, + turn_interrupt_signal: TurnInterruptSignal, hook_progress_reporter: Option>, session_tracer: Option, } @@ -187,6 +228,7 @@ where hook_runner: HookRunner::from_feature_config(feature_config), auto_compaction_input_tokens_threshold: auto_compaction_threshold_from_env(), hook_abort_signal: HookAbortSignal::default(), + turn_interrupt_signal: TurnInterruptSignal::default(), hook_progress_reporter: None, session_tracer: None, } @@ -217,6 +259,15 @@ where self } + #[must_use] + pub fn with_turn_interrupt_signal( + mut self, + turn_interrupt_signal: TurnInterruptSignal, + ) -> Self { + self.turn_interrupt_signal = turn_interrupt_signal; + self + } + #[must_use] pub fn with_hook_progress_reporter( mut self, @@ -350,8 +401,14 @@ where let mut prompt_cache_events = Vec::new(); let mut iterations = 0; let mut auto_compaction = None; + let mut interrupted = false; loop { + if self.turn_interrupt_signal.is_interrupted() { + self.record_turn_interrupted(iterations, "before_request"); + interrupted = true; + break; + } iterations += 1; if iterations > self.max_iterations { let error = RuntimeError::new( @@ -368,6 +425,14 @@ where let events = match self.api_client.stream(request) { Ok(events) => events, Err(error) => { + if self.turn_interrupt_signal.is_interrupted() { + // The client aborted because the user interrupted the + // turn; any partial response is discarded and the stop + // is reported as an interruption rather than a failure. + self.record_turn_interrupted(iterations, "during_request"); + interrupted = true; + break; + } self.record_turn_failed(iterations, &error); return Err(error); } @@ -416,6 +481,25 @@ where } for (tool_use_id, tool_name, input) in pending_tool_uses { + if interrupted || self.turn_interrupt_signal.is_interrupted() { + // Every pending tool_use must still receive a tool_result + // so the session stays valid for the next request. + if !interrupted { + self.record_turn_interrupted(iterations, "before_tool"); + interrupted = true; + } + let result_message = ConversationMessage::tool_result( + tool_use_id, + tool_name, + "Interrupted by user before this tool could run.", + true, + ); + self.session + .push_message(result_message.clone()) + .map_err(|error| RuntimeError::new(error.to_string()))?; + tool_results.push(result_message); + continue; + } let pre_hook_result = self.run_pre_tool_use_hook(&tool_name, &input); let effective_input = pre_hook_result .updated_input() @@ -515,6 +599,10 @@ where self.record_tool_finished(iterations, &result_message); tool_results.push(result_message); } + + if interrupted { + break; + } } let summary = TurnSummary { @@ -524,8 +612,11 @@ where iterations, usage: self.usage_tracker.cumulative_usage(), auto_compaction, + interrupted, }; - self.record_turn_completed(&summary); + if !interrupted { + self.record_turn_completed(&summary); + } Ok(summary) } @@ -689,6 +780,17 @@ where session_tracer.record("turn_completed", attributes); } + fn record_turn_interrupted(&self, iteration: usize, phase: &str) { + let Some(session_tracer) = &self.session_tracer else { + return; + }; + + let mut attributes = Map::new(); + attributes.insert("iteration".to_string(), Value::from(iteration as u64)); + attributes.insert("phase".to_string(), Value::String(phase.to_string())); + session_tracer.record("turn_interrupted", attributes); + } + fn record_turn_failed(&self, iteration: usize, error: &RuntimeError) { let Some(session_tracer) = &self.session_tracer else { return; @@ -850,7 +952,8 @@ mod tests { use super::{ build_assistant_message, parse_auto_compaction_threshold, ApiClient, ApiRequest, AssistantEvent, AutoCompactionEvent, ConversationRuntime, PromptCacheEvent, RuntimeError, - StaticToolExecutor, ToolExecutor, DEFAULT_AUTO_COMPACTION_INPUT_TOKENS_THRESHOLD, + StaticToolExecutor, ToolExecutor, TurnInterruptSignal, + DEFAULT_AUTO_COMPACTION_INPUT_TOKENS_THRESHOLD, }; use crate::compact::CompactionConfig; use crate::config::{RuntimeFeatureConfig, RuntimeHookConfig}; @@ -1875,4 +1978,165 @@ mod tests { // then assert_eq!(error.to_string(), "upstream failed"); } + + #[test] + fn interrupt_before_first_request_skips_the_api_call() { + struct UnreachableApi; + + impl ApiClient for UnreachableApi { + fn stream( + &mut self, + _request: ApiRequest, + ) -> Result, RuntimeError> { + unreachable!("interrupted turn must not reach the API") + } + } + + // given + let interrupt = TurnInterruptSignal::new(); + interrupt.interrupt(); + let mut runtime = ConversationRuntime::new( + Session::new(), + UnreachableApi, + StaticToolExecutor::new(), + PermissionPolicy::new(PermissionMode::DangerFullAccess), + vec!["system".to_string()], + ) + .with_turn_interrupt_signal(interrupt); + + // when + let summary = runtime + .run_turn("hello", None) + .expect("interruption should not be reported as a failure"); + + // then + assert!(summary.interrupted); + assert_eq!(summary.iterations, 0); + assert!(summary.assistant_messages.is_empty()); + assert!(summary.tool_results.is_empty()); + assert_eq!(summary.auto_compaction, None); + assert_eq!(runtime.session().messages.len(), 1); + assert_eq!(runtime.session().messages[0].role, MessageRole::User); + } + + #[test] + fn interrupt_after_stream_synthesizes_results_for_pending_tools() { + struct ToolUseApi { + interrupt: TurnInterruptSignal, + } + + impl ApiClient for ToolUseApi { + fn stream( + &mut self, + _request: ApiRequest, + ) -> Result, RuntimeError> { + // Simulate the user pressing Esc while the response streams in. + self.interrupt.interrupt(); + Ok(vec![ + AssistantEvent::TextDelta("Running the tool.".to_string()), + AssistantEvent::ToolUse { + id: "tool-1".to_string(), + name: "add".to_string(), + input: "2,2".to_string(), + }, + AssistantEvent::MessageStop, + ]) + } + } + + // given + let interrupt = TurnInterruptSignal::new(); + let mut runtime = ConversationRuntime::new( + Session::new(), + ToolUseApi { + interrupt: interrupt.clone(), + }, + StaticToolExecutor::new() + .register("add", |_input| panic!("interrupted tool must not run")), + PermissionPolicy::new(PermissionMode::DangerFullAccess), + vec!["system".to_string()], + ) + .with_turn_interrupt_signal(interrupt); + + // when + let summary = runtime + .run_turn("what is 2 + 2?", None) + .expect("interruption should not be reported as a failure"); + + // then + assert!(summary.interrupted); + assert_eq!(summary.iterations, 1); + assert_eq!(summary.assistant_messages.len(), 1); + assert_eq!(summary.tool_results.len(), 1); + assert!(matches!( + &summary.tool_results[0].blocks[0], + ContentBlock::ToolResult { + tool_use_id, + is_error: true, + output, + .. + } if tool_use_id == "tool-1" && output.contains("Interrupted by user") + )); + // user text, assistant tool_use, synthesized tool_result + assert_eq!(runtime.session().messages.len(), 3); + assert!(matches!( + runtime.session().messages[2].blocks[0], + ContentBlock::ToolResult { is_error: true, .. } + )); + } + + #[test] + fn stream_error_during_interrupt_is_reported_as_interruption() { + struct AbortedApi { + interrupt: TurnInterruptSignal, + } + + impl ApiClient for AbortedApi { + fn stream( + &mut self, + _request: ApiRequest, + ) -> Result, RuntimeError> { + // Simulate the streaming client aborting the connection after + // observing the interrupt flag mid-stream. + self.interrupt.interrupt(); + Err(RuntimeError::new("request aborted")) + } + } + + // given + let sink = Arc::new(MemoryTelemetrySink::default()); + let tracer = SessionTracer::new("session-interrupt", sink.clone()); + let interrupt = TurnInterruptSignal::new(); + let mut runtime = ConversationRuntime::new( + Session::new(), + AbortedApi { + interrupt: interrupt.clone(), + }, + StaticToolExecutor::new(), + PermissionPolicy::new(PermissionMode::DangerFullAccess), + vec!["system".to_string()], + ) + .with_turn_interrupt_signal(interrupt) + .with_session_tracer(tracer); + + // when + let summary = runtime + .run_turn("hello", None) + .expect("interrupt-driven aborts should not surface as errors"); + + // then + assert!(summary.interrupted); + assert!(summary.assistant_messages.is_empty()); + let trace_names = sink + .events() + .iter() + .filter_map(|event| match event { + TelemetryEvent::SessionTrace(trace) => Some(trace.name.clone()), + _ => None, + }) + .collect::>(); + assert!(trace_names.iter().any(|name| name == "turn_interrupted")); + assert!(!trace_names.iter().any(|name| name == "turn_failed")); + assert!(!trace_names.iter().any(|name| name == "turn_completed")); + } } diff --git a/rust/crates/runtime/src/lib.rs b/rust/crates/runtime/src/lib.rs index 674d89251d..101aec7241 100644 --- a/rust/crates/runtime/src/lib.rs +++ b/rust/crates/runtime/src/lib.rs @@ -82,7 +82,7 @@ pub use config_validate::{ pub use conversation::{ auto_compaction_threshold_from_env, ApiClient, ApiRequest, AssistantEvent, AutoCompactionEvent, ConversationRuntime, PromptCacheEvent, RuntimeError, StaticToolExecutor, ToolError, - ToolExecutor, TurnSummary, + ToolExecutor, TurnInterruptSignal, TurnSummary, }; pub use file_ops::{ edit_file, edit_file_in_workspace, glob_search, glob_search_in_workspace, grep_search, diff --git a/rust/crates/runtime/src/worker_boot.rs b/rust/crates/runtime/src/worker_boot.rs index 6b4fbfbf3a..aa4fa28f4d 100644 --- a/rust/crates/runtime/src/worker_boot.rs +++ b/rust/crates/runtime/src/worker_boot.rs @@ -34,6 +34,9 @@ pub enum WorkerStatus { ToolPermissionRequired, ReadyForPrompt, Running, + /// The user stopped the turn mid-run (e.g. Esc or Ctrl+C); the worker + /// is back at the prompt with the session intact. + Interrupted, Finished, Failed, } @@ -46,6 +49,7 @@ impl std::fmt::Display for WorkerStatus { Self::ToolPermissionRequired => write!(f, "tool_permission_required"), Self::ReadyForPrompt => write!(f, "ready_for_prompt"), Self::Running => write!(f, "running"), + Self::Interrupted => write!(f, "interrupted"), Self::Finished => write!(f, "finished"), Self::Failed => write!(f, "failed"), } @@ -82,6 +86,7 @@ pub enum WorkerEventKind { PromptMisdelivery, PromptReplayArmed, Running, + Interrupted, Restarted, Finished, Failed, @@ -1461,6 +1466,23 @@ mod tests { use std::fs; use std::process::Command; + #[test] + fn interrupted_status_serializes_and_displays_as_snake_case() { + assert_eq!(WorkerStatus::Interrupted.to_string(), "interrupted"); + assert_eq!( + serde_json::to_string(&WorkerStatus::Interrupted).expect("should serialize"), + "\"interrupted\"" + ); + assert_eq!( + serde_json::from_str::("\"interrupted\"").expect("should deserialize"), + WorkerStatus::Interrupted + ); + assert_eq!( + serde_json::to_string(&WorkerEventKind::Interrupted).expect("should serialize"), + "\"interrupted\"" + ); + } + #[test] fn allowlisted_trust_prompt_auto_resolves_then_reaches_ready_state() { let registry = WorkerRegistry::new(); diff --git a/rust/crates/rusty-claude-cli/Cargo.toml b/rust/crates/rusty-claude-cli/Cargo.toml index d044176011..cec9764cf8 100644 --- a/rust/crates/rusty-claude-cli/Cargo.toml +++ b/rust/crates/rusty-claude-cli/Cargo.toml @@ -24,6 +24,8 @@ tokio = { version = "1", features = ["rt-multi-thread", "signal", "time"] } tools = { path = "../tools" } log = "0.4" +[target.'cfg(unix)'.dependencies] +nix = { version = "0.29", features = ["term", "poll"] } [lints] workspace = true diff --git a/rust/crates/rusty-claude-cli/src/interrupt.rs b/rust/crates/rusty-claude-cli/src/interrupt.rs new file mode 100644 index 0000000000..21aa7f4b70 --- /dev/null +++ b/rust/crates/rusty-claude-cli/src/interrupt.rs @@ -0,0 +1,280 @@ +//! Esc-key interruption for running turns. +//! +//! While a turn runs, the main thread is blocked inside the conversation +//! loop, so rustyline is not reading the keyboard. This module spawns a +//! listener thread that switches stdin to a minimal non-canonical mode +//! (line buffering and echo off; signal handling and output processing +//! stay enabled, unlike full raw mode) and watches for a lone Escape +//! byte. When one arrives it trips the shared [`TurnInterruptSignal`], +//! which the conversation loop and the streaming API client poll to wind +//! the turn down gracefully. +//! +//! Permission prompts read whole lines from stdin mid-turn, so stdin +//! ownership is coordinated through [`StdinPromptGate`]: while a prompt +//! holds a lease, the listener restores canonical mode and stops +//! consuming bytes. + +use std::io::IsTerminal; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, PoisonError}; +use std::thread::JoinHandle; + +use runtime::TurnInterruptSignal; + +/// Terminal-mode bookkeeping guarded by the gate mutex. `saved` holds the +/// canonical-mode termios while non-canonical mode is active. +#[derive(Default)] +struct TtyMode { + #[cfg(unix)] + saved: Option, +} + +#[derive(Default)] +struct GateState { + /// True while a permission prompt owns stdin. + busy: AtomicBool, + /// Tells the listener thread to exit at the end of the turn. + stop: AtomicBool, + /// Serializes stdin reads and terminal-mode flips. + mode: Mutex, +} + +/// Classification of one chunk of bytes read from the terminal. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum EscClassification { + /// A bare ESC byte: the user pressed the Escape key. + LoneEscape, + /// ESC followed by more bytes: the prefix of a terminal escape + /// sequence such as an arrow or function key. + EscapeSequence, + /// Anything else (regular type-ahead, control characters, ...). + Other, +} + +#[cfg_attr(not(unix), allow(dead_code))] +fn classify_escape_chunk(bytes: &[u8]) -> EscClassification { + match bytes { + [0x1b] => EscClassification::LoneEscape, + [0x1b, ..] => EscClassification::EscapeSequence, + _ => EscClassification::Other, + } +} + +/// Hands exclusive stdin ownership to interactive prompts while the +/// Esc listener is running. +#[derive(Clone)] +pub struct StdinPromptGate { + shared: Arc, +} + +impl StdinPromptGate { + /// Takes stdin away from the Esc listener for the lifetime of the + /// returned lease: the terminal returns to canonical mode so a + /// line-based prompt (e.g. the permission approval prompt) behaves + /// normally. Listening resumes when the lease is dropped. + pub fn lease(&self) -> StdinPromptLease<'_> { + self.shared.busy.store(true, Ordering::SeqCst); + // Wait for the listener's current poll cycle to release the + // terminal, then restore canonical mode for line input. + #[allow(unused_mut)] + let mut mode = self + .shared + .mode + .lock() + .unwrap_or_else(PoisonError::into_inner); + #[cfg(unix)] + tty::restore(&mut mode); + drop(mode); + StdinPromptLease { + shared: &self.shared, + } + } +} + +/// RAII lease returned by [`StdinPromptGate::lease`]. +pub struct StdinPromptLease<'a> { + shared: &'a GateState, +} + +impl Drop for StdinPromptLease<'_> { + fn drop(&mut self) { + self.shared.busy.store(false, Ordering::SeqCst); + } +} + +/// Background listener that maps a lone Esc keypress to +/// [`TurnInterruptSignal::interrupt`]. +pub struct EscapeInterruptMonitor { + shared: Arc, + join_handle: Option>, +} + +impl EscapeInterruptMonitor { + /// Spawns the listener for the duration of one turn. Returns `None` + /// when stdin is not an interactive terminal or the platform does not + /// support terminal-mode switching; Ctrl+C interruption still works + /// in those cases. + pub fn spawn(signal: TurnInterruptSignal) -> Option<(Self, StdinPromptGate)> { + if !std::io::stdin().is_terminal() { + return None; + } + + #[cfg(not(unix))] + { + let _ = signal; + None + } + + #[cfg(unix)] + { + let shared = Arc::new(GateState::default()); + let thread_shared = Arc::clone(&shared); + let join_handle = std::thread::spawn(move || listener_loop(&thread_shared, &signal)); + let gate = StdinPromptGate { + shared: Arc::clone(&shared), + }; + Some(( + Self { + shared, + join_handle: Some(join_handle), + }, + gate, + )) + } + } + + /// Stops the listener and restores the terminal before control + /// returns to the line editor. + pub fn stop(mut self) { + self.shared.stop.store(true, Ordering::SeqCst); + if let Some(join_handle) = self.join_handle.take() { + let _ = join_handle.join(); + } + } +} + +#[cfg(unix)] +fn listener_loop(shared: &GateState, signal: &TurnInterruptSignal) { + let mut buffer = [0u8; 64]; + + while !shared.stop.load(Ordering::SeqCst) { + if shared.busy.load(Ordering::SeqCst) { + std::thread::sleep(std::time::Duration::from_millis(25)); + continue; + } + + let mut mode = shared.mode.lock().unwrap_or_else(PoisonError::into_inner); + if shared.busy.load(Ordering::SeqCst) || shared.stop.load(Ordering::SeqCst) { + continue; + } + if !tty::enable_noncanonical(&mut mode) { + // Terminal refused the mode switch; Esc support is + // unavailable for this turn but Ctrl+C still works. + return; + } + if !tty::wait_readable(100) { + continue; + } + + let count = tty::read_pending(&mut buffer); + if classify_escape_chunk(&buffer[..count]) == EscClassification::LoneEscape { + // A slow terminal may split an escape sequence across reads; + // only treat ESC as the Esc key when nothing follows it. + if tty::wait_readable(50) { + let _ = tty::read_pending(&mut buffer); + continue; + } + signal.interrupt(); + } + // Other bytes are intentionally swallowed: type-ahead is not + // preserved while a turn is running. + } + + let mut mode = shared.mode.lock().unwrap_or_else(PoisonError::into_inner); + tty::restore(&mut mode); +} + +#[cfg(unix)] +mod tty { + use std::os::fd::{AsFd, AsRawFd}; + + use nix::poll::{poll, PollFd, PollFlags, PollTimeout}; + use nix::sys::termios::{self, LocalFlags, SetArg, SpecialCharacterIndices}; + + use super::TtyMode; + + /// Disables line buffering and echo while leaving signal generation + /// (ISIG) and output post-processing (OPOST) untouched, so Ctrl+C and + /// streamed output keep working. No-op when already active. + pub(super) fn enable_noncanonical(mode: &mut TtyMode) -> bool { + if mode.saved.is_some() { + return true; + } + let stdin = std::io::stdin(); + let Ok(saved) = termios::tcgetattr(&stdin) else { + return false; + }; + let mut noncanonical = saved.clone(); + noncanonical.local_flags &= !(LocalFlags::ICANON | LocalFlags::ECHO); + noncanonical.control_chars[SpecialCharacterIndices::VMIN as usize] = 0; + noncanonical.control_chars[SpecialCharacterIndices::VTIME as usize] = 0; + if termios::tcsetattr(&stdin, SetArg::TCSANOW, &noncanonical).is_err() { + return false; + } + mode.saved = Some(saved); + true + } + + pub(super) fn restore(mode: &mut TtyMode) { + if let Some(saved) = mode.saved.take() { + let _ = termios::tcsetattr(std::io::stdin(), SetArg::TCSANOW, &saved); + } + } + + pub(super) fn wait_readable(timeout_ms: u16) -> bool { + let stdin = std::io::stdin(); + let mut fds = [PollFd::new(stdin.as_fd(), PollFlags::POLLIN)]; + matches!(poll(&mut fds, PollTimeout::from(timeout_ms)), Ok(ready) if ready > 0) + && fds[0] + .revents() + .is_some_and(|revents| revents.contains(PollFlags::POLLIN)) + } + + pub(super) fn read_pending(buffer: &mut [u8]) -> usize { + nix::unistd::read(std::io::stdin().as_raw_fd(), buffer).unwrap_or(0) + } +} + +#[cfg(test)] +mod tests { + use super::{classify_escape_chunk, EscClassification}; + + #[test] + fn lone_escape_byte_is_the_esc_key() { + assert_eq!( + classify_escape_chunk(&[0x1b]), + EscClassification::LoneEscape + ); + } + + #[test] + fn escape_followed_by_more_bytes_is_a_sequence() { + // Up arrow: ESC [ A + assert_eq!( + classify_escape_chunk(&[0x1b, 0x5b, 0x41]), + EscClassification::EscapeSequence + ); + // Alt+f style: ESC f + assert_eq!( + classify_escape_chunk(&[0x1b, b'f']), + EscClassification::EscapeSequence + ); + } + + #[test] + fn regular_bytes_are_ignored() { + assert_eq!(classify_escape_chunk(b"hello"), EscClassification::Other); + assert_eq!(classify_escape_chunk(&[]), EscClassification::Other); + assert_eq!(classify_escape_chunk(&[0x03]), EscClassification::Other); + } +} diff --git a/rust/crates/rusty-claude-cli/src/main.rs b/rust/crates/rusty-claude-cli/src/main.rs index 665ce632cf..2fffa2aea4 100644 --- a/rust/crates/rusty-claude-cli/src/main.rs +++ b/rust/crates/rusty-claude-cli/src/main.rs @@ -16,6 +16,7 @@ )] mod init; mod input; +mod interrupt; mod render; mod setup_wizard; @@ -7194,6 +7195,18 @@ impl BuiltRuntime { self } + fn with_turn_interrupt_signal(mut self, signal: runtime::TurnInterruptSignal) -> Self { + let mut runtime = self + .runtime + .take() + .expect("runtime should exist before installing turn interrupt signal"); + runtime + .api_client_mut() + .set_turn_interrupt_signal(signal.clone()); + self.runtime = Some(runtime.with_turn_interrupt_signal(signal)); + self + } + fn shutdown_plugins(&mut self) -> Result<(), Box> { if self.plugins_active { self.plugin_registry.shutdown()?; @@ -7576,7 +7589,10 @@ struct HookAbortMonitor { } impl HookAbortMonitor { - fn spawn(abort_signal: runtime::HookAbortSignal) -> Self { + fn spawn( + abort_signal: runtime::HookAbortSignal, + turn_interrupt_signal: runtime::TurnInterruptSignal, + ) -> Self { Self::spawn_with_waiter(abort_signal, move |stop_rx, abort_signal| { let Ok(runtime) = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -7593,7 +7609,11 @@ impl HookAbortMonitor { tokio::select! { result = tokio::signal::ctrl_c() => { if result.is_ok() { + // Ctrl+C stops the whole turn, not just running + // hooks: the conversation loop and the streaming + // client both poll the turn interrupt signal. abort_signal.abort(); + turn_interrupt_signal.interrupt(); } } _ = wait_for_stop => {} @@ -7698,7 +7718,7 @@ impl LiveCli { \x1b[2mDirectory\x1b[0m {}\n\ \x1b[2mSession\x1b[0m {}\n\ \x1b[2mAuto-save\x1b[0m {}\n\n\ - Type \x1b[1m/help\x1b[0m for commands · \x1b[1m/status\x1b[0m for live context · \x1b[2m/resume latest\x1b[0m jumps back to the newest session · \x1b[1m/diff\x1b[0m then \x1b[1m/commit\x1b[0m to ship · \x1b[2mTab\x1b[0m for workflow completions · \x1b[2mShift+Enter\x1b[0m for newline", + Type \x1b[1m/help\x1b[0m for commands · \x1b[1m/status\x1b[0m for live context · \x1b[2m/resume latest\x1b[0m jumps back to the newest session · \x1b[1m/diff\x1b[0m then \x1b[1m/commit\x1b[0m to ship · \x1b[2mTab\x1b[0m for workflow completions · \x1b[2mShift+Enter\x1b[0m for newline · \x1b[2mEsc\x1b[0m interrupts a running turn", self.model, self.permission_mode.as_str(), git_branch, @@ -7723,8 +7743,12 @@ impl LiveCli { fn prepare_turn_runtime( &self, emit_output: bool, - ) -> Result<(BuiltRuntime, HookAbortMonitor), Box> { + ) -> Result< + (BuiltRuntime, HookAbortMonitor, runtime::TurnInterruptSignal), + Box, + > { let hook_abort_signal = runtime::HookAbortSignal::new(); + let turn_interrupt_signal = runtime::TurnInterruptSignal::new(); let runtime = build_runtime( self.runtime.session().clone(), &self.session.id, @@ -7736,10 +7760,12 @@ impl LiveCli { self.permission_mode, None, )? - .with_hook_abort_signal(hook_abort_signal.clone()); - let hook_abort_monitor = HookAbortMonitor::spawn(hook_abort_signal); + .with_hook_abort_signal(hook_abort_signal.clone()) + .with_turn_interrupt_signal(turn_interrupt_signal.clone()); + let hook_abort_monitor = + HookAbortMonitor::spawn(hook_abort_signal, turn_interrupt_signal.clone()); - Ok((runtime, hook_abort_monitor)) + Ok((runtime, hook_abort_monitor, turn_interrupt_signal)) } fn replace_runtime(&mut self, runtime: BuiltRuntime) -> Result<(), Box> { @@ -7749,22 +7775,41 @@ impl LiveCli { } fn run_turn(&mut self, input: &str) -> Result<(), Box> { - let (mut runtime, hook_abort_monitor) = self.prepare_turn_runtime(true)?; + let (mut runtime, hook_abort_monitor, turn_interrupt_signal) = + self.prepare_turn_runtime(true)?; + let mut permission_prompter = CliPermissionPrompter::new(self.permission_mode); + let mut escape_monitor = None; + if let Some((monitor, stdin_gate)) = + interrupt::EscapeInterruptMonitor::spawn(turn_interrupt_signal) + { + permission_prompter = permission_prompter.with_stdin_gate(stdin_gate); + escape_monitor = Some(monitor); + } let mut spinner = Spinner::new(); let mut stdout = io::stdout(); spinner.tick( - "🦀 Thinking...", + if escape_monitor.is_some() { + "🦀 Thinking... (esc to interrupt)" + } else { + "🦀 Thinking..." + }, TerminalRenderer::new().color_theme(), &mut stdout, )?; - let mut permission_prompter = CliPermissionPrompter::new(self.permission_mode); let result = runtime.run_turn(input, Some(&mut permission_prompter)); + if let Some(monitor) = escape_monitor { + monitor.stop(); + } hook_abort_monitor.stop(); match result { Ok(summary) => { self.replace_runtime(runtime)?; spinner.finish( - "✨ Done", + if summary.interrupted { + "⏹ Interrupted" + } else { + "✨ Done" + }, TerminalRenderer::new().color_theme(), &mut stdout, )?; @@ -7895,7 +7940,7 @@ impl LiveCli { *self.runtime.session_mut() = result.compacted_session.clone(); // Build a new runtime with the compacted session and retry - let (mut new_runtime, hook_abort_monitor) = + let (mut new_runtime, hook_abort_monitor, _turn_interrupt_signal) = self.prepare_turn_runtime(true)?; drop(hook_abort_monitor); @@ -7985,7 +8030,8 @@ impl LiveCli { } fn run_prompt_compact(&mut self, input: &str) -> Result<(), Box> { - let (mut runtime, hook_abort_monitor) = self.prepare_turn_runtime(false)?; + let (mut runtime, hook_abort_monitor, _turn_interrupt_signal) = + self.prepare_turn_runtime(false)?; let mut permission_prompter = CliPermissionPrompter::new(self.permission_mode); let result = runtime.run_turn(input, Some(&mut permission_prompter)); hook_abort_monitor.stop(); @@ -7998,7 +8044,8 @@ impl LiveCli { } fn run_prompt_compact_json(&mut self, input: &str) -> Result<(), Box> { - let (mut runtime, hook_abort_monitor) = self.prepare_turn_runtime(false)?; + let (mut runtime, hook_abort_monitor, _turn_interrupt_signal) = + self.prepare_turn_runtime(false)?; let mut permission_prompter = CliPermissionPrompter::new(self.permission_mode); let result = runtime.run_turn(input, Some(&mut permission_prompter)); hook_abort_monitor.stop(); @@ -8023,7 +8070,8 @@ impl LiveCli { } fn run_prompt_json(&mut self, input: &str) -> Result<(), Box> { - let (mut runtime, hook_abort_monitor) = self.prepare_turn_runtime(false)?; + let (mut runtime, hook_abort_monitor, _turn_interrupt_signal) = + self.prepare_turn_runtime(false)?; let mut permission_prompter = CliPermissionPrompter::new(self.permission_mode); let result = runtime.run_turn(input, Some(&mut permission_prompter)); hook_abort_monitor.stop(); @@ -9495,6 +9543,7 @@ fn render_repl_help() -> String { " Ctrl-R Reverse-search prompt history".to_string(), " Tab Complete commands, modes, and recent sessions".to_string(), " Ctrl-C Clear input (or exit on empty prompt)".to_string(), + " Esc Interrupt the running turn".to_string(), " Shift+Enter/Ctrl+J Insert a newline".to_string(), " Auto-save .claw/sessions//.jsonl" .to_string(), @@ -12478,11 +12527,20 @@ impl runtime::HookProgressReporter for CliHookProgressReporter { struct CliPermissionPrompter { current_mode: PermissionMode, + stdin_gate: Option, } impl CliPermissionPrompter { fn new(current_mode: PermissionMode) -> Self { - Self { current_mode } + Self { + current_mode, + stdin_gate: None, + } + } + + fn with_stdin_gate(mut self, stdin_gate: interrupt::StdinPromptGate) -> Self { + self.stdin_gate = Some(stdin_gate); + self } } @@ -12491,6 +12549,12 @@ impl runtime::PermissionPrompter for CliPermissionPrompter { &mut self, request: &runtime::PermissionRequest, ) -> runtime::PermissionPromptDecision { + // Take stdin back from the Esc listener (restoring canonical + // mode) for the duration of this line-based prompt. + let _stdin_lease = self + .stdin_gate + .as_ref() + .map(interrupt::StdinPromptGate::lease); println!(); println!("Permission approval required"); println!(" Tool {}", request.tool_name); @@ -12542,6 +12606,7 @@ struct AnthropicRuntimeClient { tool_registry: GlobalToolRegistry, progress_reporter: Option, reasoning_effort: Option, + turn_interrupt_signal: Option, } impl AnthropicRuntimeClient { @@ -12607,12 +12672,17 @@ impl AnthropicRuntimeClient { tool_registry, progress_reporter, reasoning_effort: None, + turn_interrupt_signal: None, }) } fn set_reasoning_effort(&mut self, effort: Option) { self.reasoning_effort = effort; } + + fn set_turn_interrupt_signal(&mut self, signal: runtime::TurnInterruptSignal) { + self.turn_interrupt_signal = Some(signal); + } } fn resolve_cli_auth_source() -> Result> { @@ -12646,30 +12716,50 @@ impl ApiClient for AnthropicRuntimeClient { }; self.runtime.block_on(async { - // When resuming after tool execution, apply a stall timeout on the - // first stream event. If the model does not respond within the - // deadline we drop the stalled connection and re-send the request as - // a continuation nudge (one retry only). - let max_attempts: usize = if is_post_tool { 2 } else { 1 }; - - for attempt in 1..=max_attempts { - let result = self - .consume_stream(&message_request, is_post_tool && attempt == 1) - .await; - match result { - Ok(events) => return Ok(events), - Err(error) - if error.to_string().contains("post-tool stall") - && attempt < max_attempts => - { - // Stalled after tool completion — nudge the model by - // re-sending the same request. + let request_loop = async { + // When resuming after tool execution, apply a stall timeout on the + // first stream event. If the model does not respond within the + // deadline we drop the stalled connection and re-send the request as + // a continuation nudge (one retry only). + let max_attempts: usize = if is_post_tool { 2 } else { 1 }; + + for attempt in 1..=max_attempts { + let result = self + .consume_stream(&message_request, is_post_tool && attempt == 1) + .await; + match result { + Ok(events) => return Ok(events), + Err(error) + if error.to_string().contains("post-tool stall") + && attempt < max_attempts => + { + // Stalled after tool completion — nudge the model by + // re-sending the same request. + } + Err(error) => return Err(error), } - Err(error) => return Err(error), } - } - Err(RuntimeError::new("post-tool continuation nudge exhausted")) + Err(RuntimeError::new("post-tool continuation nudge exhausted")) + }; + + let Some(interrupt) = self.turn_interrupt_signal.clone() else { + return request_loop.await; + }; + + tokio::select! { + result = request_loop => result, + () = async { + while !interrupt.is_interrupted() { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + } => { + // Dropping the request future aborts the in-flight HTTP + // stream. The conversation loop sees the interrupt flag and + // reports this as an interruption rather than a failure. + Err(RuntimeError::new("request interrupted by user")) + } + } }) } }