diff --git a/src/iceberg/manifest/manifest_entry.h b/src/iceberg/manifest/manifest_entry.h index c1f81d0ae..17c5388bd 100644 --- a/src/iceberg/manifest/manifest_entry.h +++ b/src/iceberg/manifest/manifest_entry.h @@ -193,6 +193,10 @@ struct ICEBERG_EXPORT DataFile { SchemaField::MakeRequired(kFileFormatFieldId, "file_format", string(), "File format name: avro, orc, or parquet"); + static constexpr int32_t kSpecIdFieldId = 141; + inline static const SchemaField kSpecId = + SchemaField::MakeOptional(kSpecIdFieldId, "spec_id", int32(), "Partition spec ID"); + static constexpr int32_t kPartitionFieldId = 102; inline static const std::string kPartitionField = "partition"; inline static const std::string kPartitionDoc = diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc index 8af717b25..61bb57da2 100644 --- a/src/iceberg/manifest/manifest_group.cc +++ b/src/iceberg/manifest/manifest_group.cc @@ -19,8 +19,14 @@ #include "iceberg/manifest/manifest_group.h" +#include +#include +#include +#include #include +#include +#include "iceberg/expression/binder.h" #include "iceberg/expression/evaluator.h" #include "iceberg/expression/expression.h" #include "iceberg/expression/manifest_evaluator.h" @@ -29,14 +35,47 @@ #include "iceberg/file_io.h" #include "iceberg/manifest/manifest_reader.h" #include "iceberg/partition_spec.h" +#include "iceberg/row/manifest_wrapper.h" #include "iceberg/schema.h" #include "iceberg/table_scan.h" +#include "iceberg/type.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/content_file_util.h" #include "iceberg/util/macros.h" namespace iceberg { +namespace { + +std::shared_ptr DataFileFilterSchema() { + auto empty_partition_type = std::make_shared(std::vector{}); + return std::make_shared(std::vector{ + DataFile::kContent, + DataFile::kFilePath, + DataFile::kFileFormat, + DataFile::kSpecId, + SchemaField::MakeRequired(DataFile::kPartitionFieldId, DataFile::kPartitionField, + std::move(empty_partition_type), DataFile::kPartitionDoc), + DataFile::kRecordCount, + DataFile::kFileSize, + DataFile::kColumnSizes, + DataFile::kValueCounts, + DataFile::kNullValueCounts, + DataFile::kNanValueCounts, + DataFile::kLowerBounds, + DataFile::kUpperBounds, + DataFile::kKeyMetadata, + DataFile::kSplitOffsets, + DataFile::kEqualityIds, + DataFile::kSortOrderId, + DataFile::kFirstRowId, + DataFile::kReferencedDataFile, + DataFile::kContentOffset, + DataFile::kContentSize}); +} + +} // namespace + Result> ManifestGroup::Make( std::shared_ptr io, std::shared_ptr schema, std::unordered_map> specs_by_id, @@ -265,10 +304,39 @@ Result> ManifestGroup::MakeReader( ICEBERG_ASSIGN_OR_RAISE(auto reader, ManifestReader::Make(manifest, io_, schema_, specs_by_id_)); + auto columns = columns_; + if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue && + !columns.empty() && + std::ranges::find(columns, Schema::kAllColumns) == columns.end()) { + auto data_file_schema = DataFileFilterSchema(); + ICEBERG_ASSIGN_OR_RAISE( + auto bound_file_filter, + Binder::Bind(*data_file_schema, file_filter_, case_sensitive_)); + ICEBERG_ASSIGN_OR_RAISE(auto referenced_field_ids, + ReferenceVisitor::GetReferencedFieldIds(bound_file_filter)); + + std::unordered_set selected_columns(columns.cbegin(), columns.cend()); + for (const auto field_id : referenced_field_ids) { + if (field_id == DataFile::kSpecIdFieldId) { + continue; + } + ICEBERG_ASSIGN_OR_RAISE(auto column_name, + data_file_schema->FindColumnNameById(field_id)); + if (column_name.has_value()) { + std::string column_name_str(column_name.value()); + if (selected_columns.contains(column_name_str)) { + continue; + } + columns.push_back(std::move(column_name_str)); + selected_columns.insert(columns.back()); + } + } + } + reader->FilterRows(data_filter_) .FilterPartitions(partition_filter_) .CaseSensitive(case_sensitive_) - .Select(columns_); + .Select(std::move(columns)); return reader; } @@ -299,10 +367,13 @@ ManifestGroup::ReadEntries() { return eval_cache[spec_id].get(); }; + const bool has_file_filter = + file_filter_ && file_filter_->op() != Expression::Operation::kTrue; std::unique_ptr data_file_evaluator; - if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue) { - // TODO(gangwu): create an Evaluator on the DataFile schema with empty - // partition type + if (has_file_filter) { + ICEBERG_ASSIGN_OR_RAISE( + data_file_evaluator, + Evaluator::Make(*DataFileFilterSchema(), file_filter_, case_sensitive_)); } std::unordered_map> result; @@ -343,8 +414,12 @@ ManifestGroup::ReadEntries() { } if (data_file_evaluator != nullptr) { - // TODO(gangwu): implement data_file_evaluator to evaluate StructLike on - // top of entry.data_file + DataFileStructLike data_file(*entry.data_file); + ICEBERG_ASSIGN_OR_RAISE(bool should_match, + data_file_evaluator->Evaluate(data_file)); + if (!should_match) { + continue; + } } if (!manifest_entry_predicate_(entry)) { diff --git a/src/iceberg/row/manifest_wrapper.cc b/src/iceberg/row/manifest_wrapper.cc index 851f9e72f..18be82233 100644 --- a/src/iceberg/row/manifest_wrapper.cc +++ b/src/iceberg/row/manifest_wrapper.cc @@ -19,18 +19,57 @@ #include "iceberg/row/manifest_wrapper.h" +#include +#include +#include +#include +#include +#include + #include "iceberg/manifest/manifest_reader_internal.h" #include "iceberg/util/macros.h" namespace iceberg { namespace { + +enum class DataFileFieldPosition : size_t { + kContent = 0, + kFilePath = 1, + kFileFormat = 2, + kSpecId = 3, + kPartition = 4, + kRecordCount = 5, + kFileSize = 6, + kColumnSizes = 7, + kValueCounts = 8, + kNullValueCounts = 9, + kNanValueCounts = 10, + kLowerBounds = 11, + kUpperBounds = 12, + kKeyMetadata = 13, + kSplitOffsets = 14, + kEqualityIds = 15, + kSortOrderId = 16, + kFirstRowId = 17, + kReferencedDataFile = 18, + kContentOffset = 19, + kContentSize = 20, + kNextUnusedId = 21, +}; + template requires std::is_same_v> || std::is_same_v std::string_view ToView(const T& value) { return {reinterpret_cast(value.data()), value.size()}; // NOLINT } +Scalar ToScalar(const int32_t value) { return value; } + +Scalar ToScalar(const int64_t value) { return value; } + +Scalar ToScalar(const std::vector& value) { return ToView(value); } + template Result FromOptional(const std::optional& value) { if (value.has_value()) { @@ -39,6 +78,79 @@ Result FromOptional(const std::optional& value) { return std::monostate{}; } +Result FromOptionalString(const std::optional& value) { + if (value.has_value()) { + return ToView(value.value()); + } + return std::monostate{}; +} + +template +class VectorArrayLike : public ArrayLike { + public: + explicit VectorArrayLike(std::span values) : values_(values) {} + + Result GetElement(size_t pos) const override { + if (pos >= size()) { + return InvalidArgument("Invalid array index: {}", pos); + } + return ToScalar(values_[pos]); + } + + size_t size() const override { return values_.size(); } + + private: + std::span values_; +}; + +template +class IntMapLike : public MapLike { + public: + explicit IntMapLike(const std::map& values) : values_(values) {} + + Result GetKey(size_t pos) const override { + if (pos >= size()) { + return InvalidArgument("Invalid map index: {}", pos); + } + return std::next(values_.get().cbegin(), pos)->first; + } + + Result GetValue(size_t pos) const override { + if (pos >= size()) { + return InvalidArgument("Invalid map index: {}", pos); + } + return ToScalar(std::next(values_.get().cbegin(), pos)->second); + } + + size_t size() const override { return values_.get().size(); } + + private: + std::reference_wrapper> values_; +}; + +template +Result FromOptionalMap(const std::map& values) { + if (values.empty()) { + return std::monostate{}; + } + return std::make_shared>(values); +} + +template +Result FromOptionalVector(const std::vector& values) { + if (values.empty()) { + return std::monostate{}; + } + return std::make_shared>(values); +} + +Result FromOptionalBytes(const std::vector& value) { + if (value.empty()) { + return std::monostate{}; + } + return ToView(value); +} + } // namespace Result PartitionFieldSummaryStructLike::GetField(size_t pos) const { @@ -134,4 +246,65 @@ std::unique_ptr FromManifestFile(const ManifestFile& file) { return std::make_unique(file); } +Result DataFileStructLike::GetField(size_t pos) const { + if (pos >= num_fields()) { + return InvalidArgument("Invalid data file field index: {}", pos); + } + + const auto& data_file = data_file_.get(); + switch (static_cast(pos)) { + case DataFileFieldPosition::kContent: + return static_cast(data_file.content); + case DataFileFieldPosition::kFilePath: + return ToView(data_file.file_path); + case DataFileFieldPosition::kFileFormat: + return ToString(data_file.file_format); + case DataFileFieldPosition::kSpecId: + return FromOptional(data_file.partition_spec_id); + case DataFileFieldPosition::kPartition: { + partition_ = std::make_shared(data_file.partition); + return partition_; + } + case DataFileFieldPosition::kRecordCount: + return data_file.record_count; + case DataFileFieldPosition::kFileSize: + return data_file.file_size_in_bytes; + case DataFileFieldPosition::kColumnSizes: + return FromOptionalMap(data_file.column_sizes); + case DataFileFieldPosition::kValueCounts: + return FromOptionalMap(data_file.value_counts); + case DataFileFieldPosition::kNullValueCounts: + return FromOptionalMap(data_file.null_value_counts); + case DataFileFieldPosition::kNanValueCounts: + return FromOptionalMap(data_file.nan_value_counts); + case DataFileFieldPosition::kLowerBounds: + return FromOptionalMap(data_file.lower_bounds); + case DataFileFieldPosition::kUpperBounds: + return FromOptionalMap(data_file.upper_bounds); + case DataFileFieldPosition::kKeyMetadata: + return FromOptionalBytes(data_file.key_metadata); + case DataFileFieldPosition::kSplitOffsets: + return FromOptionalVector(data_file.split_offsets); + case DataFileFieldPosition::kEqualityIds: + return FromOptionalVector(data_file.equality_ids); + case DataFileFieldPosition::kSortOrderId: + return FromOptional(data_file.sort_order_id); + case DataFileFieldPosition::kFirstRowId: + return FromOptional(data_file.first_row_id); + case DataFileFieldPosition::kReferencedDataFile: + return FromOptionalString(data_file.referenced_data_file); + case DataFileFieldPosition::kContentOffset: + return FromOptional(data_file.content_offset); + case DataFileFieldPosition::kContentSize: + return FromOptional(data_file.content_size_in_bytes); + case DataFileFieldPosition::kNextUnusedId: + return InvalidArgument("Invalid data file field index: {}", pos); + } + return InvalidArgument("Invalid data file field index: {}", pos); +} + +size_t DataFileStructLike::num_fields() const { + return static_cast(DataFileFieldPosition::kNextUnusedId); +} + } // namespace iceberg diff --git a/src/iceberg/row/manifest_wrapper.h b/src/iceberg/row/manifest_wrapper.h index bc04c1e8b..20c2165b2 100644 --- a/src/iceberg/row/manifest_wrapper.h +++ b/src/iceberg/row/manifest_wrapper.h @@ -26,6 +26,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" #include "iceberg/row/struct_like.h" @@ -97,4 +98,24 @@ class ICEBERG_EXPORT ManifestFileStructLike : public StructLike { mutable std::shared_ptr summaries_; }; +/// \brief StructLike wrapper for DataFile metadata. +class ICEBERG_EXPORT DataFileStructLike : public StructLike { + public: + explicit DataFileStructLike(const DataFile& file) : data_file_(file) {} + ~DataFileStructLike() override = default; + + DataFileStructLike(const DataFileStructLike&) = delete; + DataFileStructLike& operator=(const DataFileStructLike&) = delete; + + Result GetField(size_t pos) const override; + + size_t num_fields() const override; + + void Reset(const DataFile& file) { data_file_ = std::cref(file); } + + private: + std::reference_wrapper data_file_; + mutable std::shared_ptr partition_; +}; + } // namespace iceberg diff --git a/src/iceberg/test/manifest_group_test.cc b/src/iceberg/test/manifest_group_test.cc index 017f98036..70e2cea99 100644 --- a/src/iceberg/test/manifest_group_test.cc +++ b/src/iceberg/test/manifest_group_test.cc @@ -76,13 +76,14 @@ class ManifestGroupTest : public testing::TestWithParam { std::shared_ptr MakeDataFile(const std::string& path, const PartitionValues& partition, - int32_t spec_id, int64_t record_count = 1) { + int32_t spec_id, int64_t record_count = 1, + int64_t file_size_in_bytes = 10) { return std::make_shared(DataFile{ .file_path = path, .file_format = FileFormatType::kParquet, .partition = partition, .record_count = record_count, - .file_size_in_bytes = 10, + .file_size_in_bytes = file_size_in_bytes, .sort_order_id = 0, .partition_spec_id = spec_id, }); @@ -404,6 +405,187 @@ TEST_P(ManifestGroupTest, CustomManifestEntriesFilter) { "/path/to/data3.parquet")); } +TEST_P(ManifestGroupTest, FilterFilesByRecordCount) { + auto version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + const auto part_value = PartitionValues({Literal::Int(0)}); + + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/small.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/5)), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/boundary.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/10)), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/large.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/15))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + + std::vector manifests = {data_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->FilterFiles(Expressions::GreaterThanOrEqual("record_count", Literal::Long(10))); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries()); + EXPECT_THAT(GetEntryPaths(entries), + testing::UnorderedElementsAre("/path/to/boundary.parquet", + "/path/to/large.parquet")); +} + +TEST_P(ManifestGroupTest, FilterFilesRejectsPartitionMetadata) { + auto version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + const auto part_value = PartitionValues({Literal::Int(0)}); + + std::vector data_entries{MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/data.parquet", part_value, partitioned_spec_->spec_id()))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + + std::vector manifests = {data_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->FilterFiles(Expressions::Equal("partition.data_bucket_16_2", Literal::Int(1))); + + auto result = group->Entries(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidExpression)); + EXPECT_THAT(result, HasErrorMessage("Cannot find field 'partition.data_bucket_16_2'")); +} + +TEST_P(ManifestGroupTest, FilterFilesRejectsPartitionMetadataWhenEmpty) { + std::vector manifests; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->FilterFiles(Expressions::Equal("partition.data_bucket_16_2", Literal::Int(1))); + + auto result = group->Entries(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidExpression)); + EXPECT_THAT(result, HasErrorMessage("Cannot find field 'partition.data_bucket_16_2'")); +} + +TEST_P(ManifestGroupTest, FilterFilesRejectsPartitionMetadataBeforeManifestPruning) { + auto version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + const auto part_value = PartitionValues({Literal::Int(0)}); + + std::vector data_entries{MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/data.parquet", part_value, partitioned_spec_->spec_id()))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + + std::vector manifests = {data_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->FilterPartitions(Expressions::Equal("data_bucket_16_2", Literal::Int(1))) + .FilterFiles(Expressions::Equal("partition.data_bucket_16_2", Literal::Int(1))); + + auto result = group->Entries(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidExpression)); + EXPECT_THAT(result, HasErrorMessage("Cannot find field 'partition.data_bucket_16_2'")); +} + +TEST_P(ManifestGroupTest, FilterFilesReadsFilteredColumnsWhenSelected) { + auto version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + const auto part_value = PartitionValues({Literal::Int(0)}); + + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/too-small.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/1, + /*file_size_in_bytes=*/5)), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/matching.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/1, + /*file_size_in_bytes=*/20))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + + std::vector manifests = {data_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->Select({"file_path"}) + .FilterFiles(Expressions::GreaterThan("file_size_in_bytes", Literal::Long(10))); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries()); + EXPECT_THAT(GetEntryPaths(entries), testing::ElementsAre("/path/to/matching.parquet")); +} + +TEST_P(ManifestGroupTest, FilterFilesHonorsCaseInsensitiveMatchingWhenSelected) { + auto version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + const auto part_value = PartitionValues({Literal::Int(0)}); + + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/small.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/5)), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/large.parquet", part_value, + partitioned_spec_->spec_id(), /*record_count=*/15))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + + std::vector manifests = {data_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->CaseSensitive(false) + .Select({"FILE_PATH"}) + .FilterFiles(Expressions::GreaterThanOrEqual("RECORD_COUNT", Literal::Long(10))); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries()); + EXPECT_THAT(GetEntryPaths(entries), testing::ElementsAre("/path/to/large.parquet")); +} + +TEST_P(ManifestGroupTest, FilterFilesBySpecIdWhenSelected) { + auto version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + const auto unpartitioned_value = PartitionValues(std::vector{}); + const auto partitioned_value = PartitionValues({Literal::Int(1)}); + + std::vector unpartitioned_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/unpartitioned.parquet", unpartitioned_value, + unpartitioned_spec_->spec_id()))}; + auto unpartitioned_manifest = WriteDataManifest( + version, kSnapshotId, std::move(unpartitioned_entries), unpartitioned_spec_); + + std::vector partitioned_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/partitioned.parquet", partitioned_value, + partitioned_spec_->spec_id()))}; + auto partitioned_manifest = WriteDataManifest( + version, kSnapshotId, std::move(partitioned_entries), partitioned_spec_); + + std::vector manifests = {unpartitioned_manifest, partitioned_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->Select({"file_path"}) + .FilterFiles( + Expressions::Equal("spec_id", Literal::Int(partitioned_spec_->spec_id()))); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries()); + EXPECT_THAT(GetEntryPaths(entries), + testing::ElementsAre("/path/to/partitioned.parquet")); +} + TEST_P(ManifestGroupTest, EmptyManifestGroup) { std::vector manifests; ICEBERG_UNWRAP_OR_FAIL(