diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index 829e313d2381e..06a40112b6ee8 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -20,10 +20,13 @@ use datafusion_common::{Result, internal_datafusion_err}; use std::fmt::Display; +use std::future::Future; use std::hash::{Hash, Hasher}; +use std::pin::Pin; use std::{cmp::Ordering, sync::Arc, sync::atomic}; mod pool; +mod reclaimer; #[cfg(feature = "arrow_buffer_pool")] pub mod arrow; @@ -36,6 +39,7 @@ pub use datafusion_common::{ human_readable_count, human_readable_duration, human_readable_size, units, }; pub use pool::*; +pub use reclaimer::MemoryReclaimer; /// Tracks and potentially limits memory use across operators during execution. /// @@ -209,6 +213,17 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug + Display { /// On error the `allocation` will not be increased in size fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>; + /// Async variant of [`Self::try_grow`]. Default delegates to the + /// sync version; reclaim-aware pools (e.g. [`TrackConsumersPool`]) + /// override to invoke registered [`MemoryReclaimer`]s on OOM. + fn try_grow_async<'a>( + &'a self, + reservation: &'a MemoryReservation, + additional: usize, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { self.try_grow(reservation, additional) }) + } + /// Return the total amount of memory reserved fn reserved(&self) -> usize; @@ -249,6 +264,9 @@ pub struct MemoryConsumer { name: String, can_spill: bool, id: usize, + /// Reclaimer collected by reclaim-aware pools at register time. Not + /// part of consumer identity (excluded from `Eq`/`Hash`). + reclaimer: Option>, } impl PartialEq for MemoryConsumer { @@ -287,20 +305,39 @@ impl MemoryConsumer { name: name.into(), can_spill: false, id: Self::new_unique_id(), + reclaimer: None, } } - /// Returns a clone of this [`MemoryConsumer`] with a new unique id, - /// which can be registered with a [`MemoryPool`], - /// This new consumer is separate from the original. + /// Clone this [`MemoryConsumer`] with a new unique id. + /// + /// Drops any attached reclaimer: it is bound to the original operator's + /// state and would target the wrong owner under a new id (and bypass + /// the id-keyed requestor-self-skip in `try_grow_async`). pub fn clone_with_new_id(&self) -> Self { Self { name: self.name.clone(), can_spill: self.can_spill, id: Self::new_unique_id(), + reclaimer: None, + } + } + + /// Attach a [`MemoryReclaimer`] and mark this consumer spill-capable. + /// Pools without reclaim support ignore the reclaimer. + pub fn with_reclaimer(self, reclaimer: Arc) -> Self { + Self { + can_spill: true, + reclaimer: Some(reclaimer), + ..self } } + /// Returns the attached [`MemoryReclaimer`], if any. + pub fn reclaimer(&self) -> Option<&Arc> { + self.reclaimer.as_ref() + } + /// Return the unique id of this [`MemoryConsumer`] pub fn id(&self) -> usize { self.id @@ -461,6 +498,17 @@ impl MemoryReservation { Ok(()) } + /// Async variant of [`Self::try_grow`]. On a reclaim-aware pool, + /// triggers registered [`MemoryReclaimer`]s before surfacing OOM. + pub async fn try_grow_async(&self, capacity: usize) -> Result<()> { + self.registration + .pool + .try_grow_async(self, capacity) + .await?; + self.size.fetch_add(capacity, atomic::Ordering::Relaxed); + Ok(()) + } + /// Splits off `capacity` bytes from this [`MemoryReservation`] /// into a new [`MemoryReservation`] with the same /// [`MemoryConsumer`]. diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index aac95b9d6a81f..1ee609ac3f879 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -16,13 +16,17 @@ // under the License. use crate::memory_pool::{ - MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, human_readable_size, + MemoryConsumer, MemoryLimit, MemoryPool, MemoryReclaimer, MemoryReservation, + human_readable_size, }; use datafusion_common::HashMap; use datafusion_common::{DataFusionError, Result, resources_datafusion_err}; use log::debug; use parking_lot::Mutex; use std::fmt::{Display, Formatter}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; use std::{ num::NonZeroUsize, sync::atomic::{AtomicUsize, Ordering}, @@ -324,6 +328,7 @@ struct TrackedConsumer { can_spill: bool, reserved: AtomicUsize, peak: AtomicUsize, + reclaimer: Option>, } impl TrackedConsumer { @@ -533,6 +538,7 @@ impl MemoryPool for TrackConsumersPool { can_spill: consumer.can_spill(), reserved: Default::default(), peak: Default::default(), + reclaimer: consumer.reclaimer().cloned(), }, ); @@ -593,6 +599,69 @@ impl MemoryPool for TrackConsumersPool { Ok(()) } + fn try_grow_async<'a>( + &'a self, + reservation: &'a MemoryReservation, + additional: usize, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + // Fast path. + let initial_err = match self.try_grow(reservation, additional) { + Ok(()) => return Ok(()), + Err(e) => e, + }; + + // Snapshot reclaimers. Skip the requestor (self-reclaim risks + // deadlock) and zero-byte consumers. + let requestor_id = reservation.consumer().id(); + let mut candidates = { + let guard = self.tracked_consumers.lock(); + guard + .iter() + .filter_map(|(cid, tc)| { + let reclaimer = tc.reclaimer.as_ref()?; + if *cid == requestor_id || tc.reserved() == 0 { + return None; + } + Some((tc.reserved(), Arc::clone(reclaimer))) + }) + .collect::>() + }; + // Order: priority desc, then reservation size desc. + candidates.sort_by(|(left_reserved, left), (right_reserved, right)| { + right + .priority() + .cmp(&left.priority()) + .then_with(|| right_reserved.cmp(left_reserved)) + }); + + // Reclaim, retry after each, exit on success. + for (_, reclaimer) in candidates { + if let Err(e) = reclaimer.reclaim(additional).await { + debug!("memory reclaimer returned error: {e}"); + continue; + } + if self.try_grow(reservation, additional).is_ok() { + return Ok(()); + } + } + + // Fall through to the inner pool's own reclaim path, if any. + // Default impl just re-runs try_grow — cheap. + self.inner + .try_grow_async(reservation, additional) + .await + .map_err(|_| initial_err)?; + self.tracked_consumers + .lock() + .entry(reservation.consumer().id()) + .and_modify(|tracked_consumer| { + tracked_consumer.grow(additional); + }); + Ok(()) + }) + } + fn reserved(&self) -> usize { self.inner.reserved() } diff --git a/datafusion/execution/src/memory_pool/reclaimer.rs b/datafusion/execution/src/memory_pool/reclaimer.rs new file mode 100644 index 0000000000000..106a511701826 --- /dev/null +++ b/datafusion/execution/src/memory_pool/reclaimer.rs @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Operator hook used by a [`MemoryPool`] to free memory before an +//! allocation fails. +//! +//! [`MemoryPool`]: super::MemoryPool + +use datafusion_common::Result; +use std::fmt::Debug; + +/// Hook attached to a [`MemoryConsumer`] via +/// [`MemoryConsumer::with_reclaimer`]. On +/// [`MemoryPool::try_grow_async`] failure the pool walks registered +/// reclaimers in descending [`Self::priority`] and asks each to free bytes. +/// +/// Implementations MUST: +/// +/// - Spill data **before** shrinking the reservation, so reported bytes +/// are recoverable. +/// - Release bytes via [`MemoryReservation::shrink`] / +/// [`MemoryReservation::free`] and return at most `target`. +/// - Not call `try_grow*` on the pool — risks reentrancy/deadlock. +/// - Not capture `Arc` / `Arc` +/// (creates a cycle that blocks `unregister`); use channels or `Weak`. +/// +/// [`MemoryConsumer`]: super::MemoryConsumer +/// [`MemoryConsumer::with_reclaimer`]: super::MemoryConsumer::with_reclaimer +/// [`MemoryPool::try_grow_async`]: super::MemoryPool::try_grow_async +/// [`MemoryReservation::shrink`]: super::MemoryReservation::shrink +/// [`MemoryReservation::free`]: super::MemoryReservation::free +#[async_trait::async_trait] +pub trait MemoryReclaimer: Send + Sync + Debug { + /// Upper bound on bytes this reclaimer can free. `None` = unknown. + fn reclaimable_bytes(&self) -> Option { + None + } + + /// Free up to `target` bytes; return the amount actually released. + /// See trait-level contract. + async fn reclaim(&self, target: usize) -> Result; + + /// Higher priorities reclaim first. Negative = last resort. + fn priority(&self) -> i32 { + 0 + } +} diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 6c02af8dec6d3..d264eefa3bf6e 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -65,7 +65,9 @@ use datafusion_common::{ unwrap_or_internal_err, }; use datafusion_execution::TaskContext; -use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_execution::memory_pool::{ + MemoryConsumer, MemoryReclaimer, MemoryReservation, +}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr::PhysicalExpr; @@ -74,6 +76,26 @@ use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; +/// Reclaimer for an [`ExternalSorter`] partition. Hands a oneshot off to +/// the partition's stream loop (the sorter's sole owner), which spills and +/// replies with the freed byte count. +#[derive(Debug)] +struct ExternalSorterReclaimer { + tx: tokio::sync::mpsc::Sender>, +} + +#[async_trait::async_trait] +impl MemoryReclaimer for ExternalSorterReclaimer { + async fn reclaim(&self, _target: usize) -> Result { + let (resp_tx, resp_rx) = tokio::sync::oneshot::channel(); + // Stream loop terminated, or response dropped: report 0. + if self.tx.send(resp_tx).await.is_err() { + return Ok(0); + } + Ok(resp_rx.await.unwrap_or(0)) + } +} + struct ExternalSorterMetrics { /// metrics baseline: BaselineMetrics, @@ -279,11 +301,16 @@ impl ExternalSorter { spill_compression: SpillCompression, metrics: &ExecutionPlanMetricsSet, runtime: Arc, + // Reclaimer attached to this partition's `MemoryConsumer`. + reclaimer: Option>, ) -> Result { let metrics = ExternalSorterMetrics::new(metrics, partition_id); - let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]")) - .with_can_spill(true) - .register(&runtime.memory_pool); + let mut consumer = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]")) + .with_can_spill(true); + if let Some(r) = reclaimer { + consumer = consumer.with_reclaimer(r); + } + let reservation = consumer.register(&runtime.memory_pool); let merge_reservation = MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]")) @@ -492,6 +519,10 @@ impl ExternalSorter { while let Some(batch) = sorted_stream.next().await { let batch = batch?; let sorted_size = get_reserved_bytes_for_record_batch(&batch)?; + // Sync `try_grow`, not `try_grow_async`: we are already in the + // spill path (freeing memory). A recursive reclaim here can + // close a cycle between two sorters that are each waiting on + // the other's spill to complete. if self.reservation.try_grow(sorted_size).is_err() { // Although the reservation is not enough, the batch is // already in memory, so it's okay to combine it with previously @@ -736,14 +767,16 @@ impl ExternalSorter { ) -> Result<()> { let size = get_reserved_bytes_for_record_batch(input)?; - match self.reservation.try_grow(size) { + match self.reservation.try_grow_async(size).await { Ok(_) => Ok(()), Err(e) => { if self.in_mem_batches.is_empty() { return Err(Self::err_with_oom_context(e)); } - // Spill and try again. + // Sibling reclaim was already attempted by `try_grow_async` + // (which skips this consumer). Spill our own buffer, retry + // sync — siblings won't free more on a second pass. self.sort_and_spill_in_mem_batches().await?; self.reservation .try_grow(size) @@ -1246,6 +1279,12 @@ impl ExecutionPlan for SortExec { ))) } (false, None) => { + // Spill-request channel; drained by the stream loop below. + let (reclaim_tx, mut reclaim_rx) = + tokio::sync::mpsc::channel::>(4); + let reclaimer: Arc = + Arc::new(ExternalSorterReclaimer { tx: reclaim_tx }); + let mut sorter = ExternalSorter::new( partition, input.schema(), @@ -1256,14 +1295,51 @@ impl ExecutionPlan for SortExec { context.session_config().spill_compression(), &self.metrics_set, context.runtime_env(), + Some(reclaimer), )?; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { - while let Some(batch) = input.next().await { - let batch = batch?; - sorter.insert_batch(batch).await?; + // State machine: spill or insert, never both. The + // freed-byte reply is sent only after the spill + // completes, so the pool sees recoverable bytes. + // `biased` ensures spill wins over insert under + // pressure. + // + // Cancellation: selecting reclaim drops the in-flight + // `input.next()`. Safe for cancellation-safe inputs + // (channel receivers, e.g. RepartitionExec); other + // inputs could drop a batch here. + loop { + tokio::select! { + biased; + Some(resp_tx) = reclaim_rx.recv() => { + // A reclaim can be dequeued just after a + // prior spill drained `in_mem_batches` + // (sibling sent during the spill's awaits; + // pool's zero-byte filter can transiently + // miss us via split reservations). Nothing + // local to free — reply 0 and keep going. + if sorter.in_mem_batches.is_empty() { + let _ = resp_tx.send(0); + continue; + } + let before = sorter.used(); + sorter.sort_and_spill_in_mem_batches().await?; + let after = sorter.used(); + let _ = resp_tx + .send(before.saturating_sub(after)); + } + next = input.next() => match next { + Some(batch) => { + sorter.insert_batch(batch?).await?; + } + None => break, + } + } } + // Late reclaim requests now resolve to Ok(0). + drop(reclaim_rx); sorter.sort().await }) .try_flatten(), @@ -2766,6 +2842,7 @@ mod tests { SpillCompression::Uncompressed, &metrics_set, Arc::clone(&runtime), + None, )?; // Insert enough data to force spilling.