Skip to content
119 changes: 111 additions & 8 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,16 @@ pub(super) struct ParquetOpener {
}

/// Represents a prepared access plan with optional row selection
struct PreparedAccessPlan {
pub(crate) struct PreparedAccessPlan {
/// Row group indexes to read
row_group_indexes: Vec<usize>,
pub(crate) row_group_indexes: Vec<usize>,
/// Optional row selection for filtering within row groups
row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
pub(crate) row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
}

impl PreparedAccessPlan {
/// Create a new prepared access plan from a ParquetAccessPlan
fn from_access_plan(
pub(crate) fn from_access_plan(
access_plan: ParquetAccessPlan,
rg_metadata: &[RowGroupMetaData],
) -> Result<Self> {
Expand All @@ -144,17 +144,23 @@ impl PreparedAccessPlan {
}

/// Reverse the access plan for reverse scanning
fn reverse(
pub(crate) fn reverse(
mut self,
file_metadata: &parquet::file::metadata::ParquetMetaData,
) -> Result<Self> {
// Get the row group indexes before reversing
let row_groups_to_scan = self.row_group_indexes.clone();

// Reverse the row group indexes
self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect();

// If we have a row selection, reverse it to match the new row group order
if let Some(row_selection) = self.row_selection {
self.row_selection =
Some(reverse_row_selection(&row_selection, file_metadata)?);
self.row_selection = Some(reverse_row_selection(
&row_selection,
file_metadata,
&row_groups_to_scan, // Pass the original (non-reversed) row group indexes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriangb The main change is to pass row_group_indexes to reverse logic instead of using all indexes.

Because i saw the following logic:

pub fn into_overall_row_selection(

)?);
}

Ok(self)
Expand Down Expand Up @@ -964,7 +970,7 @@ mod test {
use std::sync::Arc;

use super::{ConstantColumns, constant_columns_from_stats};
use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener};
use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use bytes::{BufMut, BytesMut};
use datafusion_common::{
Expand Down Expand Up @@ -1851,4 +1857,101 @@ mod test {
"Reverse scan should reverse row group order while maintaining correct RowSelection for each group"
);
}

#[tokio::test]
async fn test_reverse_scan_with_non_contiguous_row_groups() {
use parquet::file::properties::WriterProperties;

let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

// Create 4 batches (4 row groups)
let batch0 = record_batch!(("a", Int32, vec![Some(1), Some(2)])).unwrap();
let batch1 = record_batch!(("a", Int32, vec![Some(3), Some(4)])).unwrap();
let batch2 = record_batch!(("a", Int32, vec![Some(5), Some(6)])).unwrap();
let batch3 = record_batch!(("a", Int32, vec![Some(7), Some(8)])).unwrap();

let props = WriterProperties::builder()
.set_max_row_group_size(2)
.build();

let data_len = write_parquet_batches(
Arc::clone(&store),
"test.parquet",
vec![batch0.clone(), batch1, batch2, batch3],
Some(props),
)
.await;

let schema = batch0.schema();

use crate::ParquetAccessPlan;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};

// KEY: Skip RG1 (non-contiguous!)
// Only scan row groups: [0, 2, 3]
let mut access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan, // RG0
RowGroupAccess::Skip, // RG1 - SKIPPED!
RowGroupAccess::Scan, // RG2
RowGroupAccess::Scan, // RG3
]);

// Add RowSelection for each scanned row group
// RG0: select first row (1), skip second (2)
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
);
// RG1: skipped, no selection needed
// RG2: select first row (5), skip second (6)
access_plan.scan_selection(
2,
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
);
// RG3: select first row (7), skip second (8)
access_plan.scan_selection(
3,
RowSelection::from(vec![RowSelector::select(1), RowSelector::skip(1)]),
);

let file = PartitionedFile::new(
"test.parquet".to_string(),
u64::try_from(data_len).unwrap(),
)
.with_extensions(Arc::new(access_plan));

let make_opener = |reverse_scan: bool| {
ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_reverse_row_groups(reverse_scan)
.build()
};

// Forward scan: RG0(1), RG2(5), RG3(7)
// Note: RG1 is completely skipped
let opener = make_opener(false);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let forward_values = collect_int32_values(stream).await;

assert_eq!(
forward_values,
vec![1, 5, 7],
"Forward scan with non-contiguous row groups"
);

// Reverse scan: RG3(7), RG2(5), RG0(1)
// WITHOUT the bug fix, this would return WRONG values
// because the RowSelection would be incorrectly mapped
let opener = make_opener(true);
let stream = opener.open(file).unwrap().await.unwrap();
let reverse_values = collect_int32_values(stream).await;

assert_eq!(
reverse_values,
vec![7, 5, 1],
"Reverse scan with non-contiguous row groups should correctly map RowSelection"
);
}
}
Loading