Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/datasource-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ all-features = true

[dependencies]
arrow = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
datafusion-common = { workspace = true, features = ["object_store", "parquet"] }
Expand Down
2 changes: 2 additions & 0 deletions datafusion/datasource-parquet/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod row_group_filter;
mod sort;
pub mod source;
mod supported_predicates;
mod virtual_column;
mod writer;

pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
Expand All @@ -46,4 +47,5 @@ pub use reader::*; // Expose so downstream crates can use it
pub use row_filter::build_row_filter;
pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use row_group_filter::RowGroupAccessPlanFilter;
pub use virtual_column::ParquetVirtualColumn;
pub use writer::plan_to_parquet;
704 changes: 684 additions & 20 deletions datafusion/datasource-parquet/src/opener.rs

Large diffs are not rendered by default.

89 changes: 87 additions & 2 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::DefaultParquetFileReaderFactory;
use crate::ParquetFileReaderFactory;
use crate::opener::ParquetMorselizer;
use crate::opener::build_pruning_predicates;
use crate::opener::build_virtual_columns_state;
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
use datafusion_common::config::ConfigOptions;
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -553,6 +554,22 @@ impl FileSource for ParquetSource {
.as_ref()
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());

// Validate virtual columns (extension-type allowlist) and, when
// pushdown is enabled, reject predicates that reference them. Both
// checks depend only on morselizer-level state, so we pay their cost
// once per scan partition rather than per file.
//
// Gating predicate validation on `pushdown_filters` is deliberate:
// when pushdown is off the predicate stays above the scan as a
// `FilterExec` and resolves virtual columns there; the row-filter
// ban only applies to the pushdown path.
let virtual_state = build_virtual_columns_state(
self.table_schema.virtual_columns(),
self.table_schema.file_schema(),
self.predicate.as_ref(),
self.pushdown_filters(),
)?;

Ok(Box::new(ParquetMorselizer {
partition_index: partition,
projection: self.projection.clone(),
Expand Down Expand Up @@ -580,6 +597,7 @@ impl FileSource for ParquetSource {
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
virtual_state,
}))
}

Expand Down Expand Up @@ -678,7 +696,12 @@ impl FileSource for ParquetSource {
filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
let table_schema = self.table_schema.table_schema();
// Use the schema excluding virtual columns: virtual columns (e.g.
// Parquet `row_number`) are produced by the reader itself and cannot
// be referenced inside a RowFilter, so predicates that reference them
// must not be marked as pushed down — otherwise the scan would
// silently drop them and produce wrong results.
let pushable_schema = self.table_schema.schema_without_virtual_columns();
// Determine if based on configs we should push filters down.
// If either the table / scan itself or the config has pushdown enabled,
// we will push down the filters.
Expand All @@ -694,7 +717,7 @@ impl FileSource for ParquetSource {
let filters: Vec<PushedDownPredicate> = filters
.into_iter()
.map(|filter| {
if can_expr_be_pushed_down_with_schemas(&filter, table_schema) {
if can_expr_be_pushed_down_with_schemas(&filter, &pushable_schema) {
PushedDownPredicate::supported(filter)
} else {
PushedDownPredicate::unsupported(filter)
Expand Down Expand Up @@ -946,4 +969,66 @@ mod tests {
assert!(source.reverse_row_groups());
assert!(source.filter().is_some());
}

#[test]
fn test_try_pushdown_filters_rejects_virtual_column_refs() {
// Virtual columns are produced by the reader and cannot be referenced
// inside a RowFilter. `try_pushdown_filters` must report such filters
// as `PushedDown::No` so the FilterExec above the scan stays in
// place — otherwise the scan would silently drop the predicate and
// produce wrong results.
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_datasource::TableSchema;
use datafusion_expr::{col, lit as logical_lit};
use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_plan::filter_pushdown::PushedDown;
use parquet::arrow::RowNumber;

let file_schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int64,
false,
)]));
let row_number_field: FieldRef = Arc::new(
Field::new("row_number", DataType::Int64, false)
.with_extension_type(RowNumber),
);
let table_schema = TableSchema::from_file_schema(file_schema)
.with_virtual_columns(vec![row_number_field]);

let source = ParquetSource::new(table_schema).with_pushdown_filters(true);

let full_schema = source.table_schema.table_schema();

let pushable = logical2physical(&col("value").eq(logical_lit(1i64)), full_schema);
let virtual_only =
logical2physical(&col("row_number").eq(logical_lit(2i64)), full_schema);
let mixed = logical2physical(
&col("row_number")
.eq(logical_lit(2i64))
.or(col("value").eq(logical_lit(4i64))),
full_schema,
);

let config = ConfigOptions::default();
let prop = source
.try_pushdown_filters(vec![pushable, virtual_only, mixed], &config)
.expect("try_pushdown_filters must not error");

assert_eq!(prop.filters.len(), 3);
assert!(
matches!(prop.filters[0], PushedDown::Yes),
"file-column filter should be pushable"
);
assert!(
matches!(prop.filters[1], PushedDown::No),
"filter referencing only a virtual column must not be pushed down"
);
assert!(
matches!(prop.filters[2], PushedDown::No),
"filter mixing a virtual column with a file column must not be \
pushed down (row filter would silently drop it)"
);
}
}
125 changes: 125 additions & 0 deletions datafusion/datasource-parquet/src/virtual_column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Typed wrapper for parquet virtual columns.
//!
//! arrow-rs identifies virtual columns via arrow extension types carried on
//! the `FieldRef`. [`ParquetVirtualColumn`] lifts that contract into the type
//! system so callers validate at the boundary (via `TryFrom<&FieldRef>`)
//! rather than string-comparing extension-type names deep inside the reader.

use arrow::datatypes::FieldRef;
use arrow_schema::extension::ExtensionType;
use datafusion_common::{DataFusionError, Result, not_impl_err};
use parquet::arrow::RowNumber;
use std::sync::Arc;

/// A parquet virtual column validated to have a supported arrow extension
/// type.
///
/// Construct via [`TryFrom<&FieldRef>`]; add a new variant (and update the
/// `TryFrom` impl) when DataFusion gains support for another arrow-rs virtual
/// extension type.
#[derive(Debug, Clone)]
pub enum ParquetVirtualColumn {
/// Absolute row number within the parquet file. Backed by arrow-rs's
/// [`RowNumber`] extension type.
RowNumber(FieldRef),
}

impl ParquetVirtualColumn {
pub fn field(&self) -> &FieldRef {
match self {
Self::RowNumber(field) => field,
}
}
}

impl From<ParquetVirtualColumn> for FieldRef {
fn from(col: ParquetVirtualColumn) -> Self {
match col {
ParquetVirtualColumn::RowNumber(field) => field,
}
}
}

impl TryFrom<&FieldRef> for ParquetVirtualColumn {
type Error = DataFusionError;

fn try_from(field: &FieldRef) -> Result<Self> {
let Some(name) = field.extension_type_name() else {
return not_impl_err!(
"Virtual column '{}' is missing an Arrow extension type; \
supported extension types: [{}]",
field.name(),
RowNumber::NAME
);
};
match name {
n if n == RowNumber::NAME => Ok(Self::RowNumber(Arc::clone(field))),
other => not_impl_err!(
"Virtual column '{}' uses unsupported Arrow extension type '{}'; \
supported types: [{}]. Add a ParquetVirtualColumn variant and \
a test for this type before wiring it through.",
field.name(),
other,
RowNumber::NAME
),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field};

#[test]
fn row_number_field_converts() {
let field: FieldRef = Arc::new(
Field::new("row_number", DataType::Int64, false)
.with_extension_type(RowNumber),
);
let col = ParquetVirtualColumn::try_from(&field).expect("valid row_number");
assert!(matches!(col, ParquetVirtualColumn::RowNumber(_)));
assert_eq!(col.field().name(), "row_number");
}

#[test]
fn missing_extension_type_rejected() {
let field: FieldRef = Arc::new(Field::new("plain", DataType::Int64, false));
let err = ParquetVirtualColumn::try_from(&field).unwrap_err();
assert!(
err.to_string().contains("missing an Arrow extension type"),
"got: {err}"
);
}

#[test]
fn unsupported_extension_type_rejected() {
// RowGroupIndex is a real arrow-rs virtual type not yet in our enum.
let field: FieldRef = Arc::new(
Field::new("row_group_index", DataType::Int64, false)
.with_extension_type(parquet::arrow::RowGroupIndex),
);
let err = ParquetVirtualColumn::try_from(&field).unwrap_err();
assert!(
err.to_string().contains("parquet.virtual.row_group_index"),
"error should name the offending extension type, got: {err}"
);
}
}
Loading
Loading