Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/iceberg/manifest/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ struct ICEBERG_EXPORT DataFile {
SchemaField::MakeRequired(kFileFormatFieldId, "file_format", string(),
"File format name: avro, orc, or parquet");

static constexpr int32_t kSpecIdFieldId = 141;
inline static const SchemaField kSpecId =
SchemaField::MakeOptional(kSpecIdFieldId, "spec_id", int32(), "Partition spec ID");

static constexpr int32_t kPartitionFieldId = 102;
inline static const std::string kPartitionField = "partition";
inline static const std::string kPartitionDoc =
Expand Down
99 changes: 92 additions & 7 deletions src/iceberg/manifest/manifest_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@

#include "iceberg/manifest/manifest_group.h"

#include <algorithm>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>

#include "iceberg/expression/binder.h"
#include "iceberg/expression/evaluator.h"
#include "iceberg/expression/expression.h"
#include "iceberg/expression/manifest_evaluator.h"
Expand All @@ -29,14 +35,47 @@
#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/partition_spec.h"
#include "iceberg/row/manifest_wrapper.h"
#include "iceberg/schema.h"
#include "iceberg/table_scan.h"
#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/content_file_util.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

std::shared_ptr<Schema> DataFileFilterSchema() {
auto empty_partition_type = std::make_shared<StructType>(std::vector<SchemaField>{});
return std::make_shared<Schema>(std::vector<SchemaField>{
DataFile::kContent,
DataFile::kFilePath,
DataFile::kFileFormat,
DataFile::kSpecId,
SchemaField::MakeRequired(DataFile::kPartitionFieldId, DataFile::kPartitionField,
std::move(empty_partition_type), DataFile::kPartitionDoc),
DataFile::kRecordCount,
DataFile::kFileSize,
DataFile::kColumnSizes,
DataFile::kValueCounts,
DataFile::kNullValueCounts,
DataFile::kNanValueCounts,
DataFile::kLowerBounds,
DataFile::kUpperBounds,
DataFile::kKeyMetadata,
DataFile::kSplitOffsets,
DataFile::kEqualityIds,
DataFile::kSortOrderId,
DataFile::kFirstRowId,
DataFile::kReferencedDataFile,
DataFile::kContentOffset,
DataFile::kContentSize});
}

} // namespace

Result<std::unique_ptr<ManifestGroup>> ManifestGroup::Make(
std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
Expand Down Expand Up @@ -265,10 +304,39 @@ Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, io_, schema_, specs_by_id_));

auto columns = columns_;
if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue &&
!columns.empty() &&
std::ranges::find(columns, Schema::kAllColumns) == columns.end()) {
auto data_file_schema = DataFileFilterSchema();
ICEBERG_ASSIGN_OR_RAISE(
auto bound_file_filter,
Binder::Bind(*data_file_schema, file_filter_, case_sensitive_));
ICEBERG_ASSIGN_OR_RAISE(auto referenced_field_ids,
ReferenceVisitor::GetReferencedFieldIds(bound_file_filter));

std::unordered_set<std::string> selected_columns(columns.cbegin(), columns.cend());
for (const auto field_id : referenced_field_ids) {
if (field_id == DataFile::kSpecIdFieldId) {
continue;
}
ICEBERG_ASSIGN_OR_RAISE(auto column_name,
data_file_schema->FindColumnNameById(field_id));
if (column_name.has_value()) {
std::string column_name_str(column_name.value());
if (selected_columns.contains(column_name_str)) {
continue;
}
columns.push_back(std::move(column_name_str));
selected_columns.insert(columns.back());
}
}
}

reader->FilterRows(data_filter_)
.FilterPartitions(partition_filter_)
.CaseSensitive(case_sensitive_)
.Select(columns_);
.Select(std::move(columns));

return reader;
}
Expand Down Expand Up @@ -299,11 +367,23 @@ ManifestGroup::ReadEntries() {
return eval_cache[spec_id].get();
};

const bool has_file_filter =
file_filter_ && file_filter_->op() != Expression::Operation::kTrue;
std::unique_ptr<Evaluator> data_file_evaluator;
if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue) {
// TODO(gangwu): create an Evaluator on the DataFile schema with empty
// partition type
}
auto get_data_file_evaluator = [&]() -> Result<Evaluator*> {
if (!has_file_filter) {
return nullptr;
}
if (data_file_evaluator != nullptr) {
return data_file_evaluator.get();
}

ICEBERG_ASSIGN_OR_RAISE(
data_file_evaluator,
Evaluator::Make(*DataFileFilterSchema(), file_filter_, case_sensitive_));

return data_file_evaluator.get();
};

std::unordered_map<int32_t, std::vector<ManifestEntry>> result;

Expand Down Expand Up @@ -336,15 +416,20 @@ ManifestGroup::ReadEntries() {
ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
ICEBERG_ASSIGN_OR_RAISE(auto entries,
ignore_deleted_ ? reader->LiveEntries() : reader->Entries());
ICEBERG_ASSIGN_OR_RAISE(auto data_file_evaluator, get_data_file_evaluator());

for (auto& entry : entries) {
if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
continue;
}

if (data_file_evaluator != nullptr) {
// TODO(gangwu): implement data_file_evaluator to evaluate StructLike on
// top of entry.data_file
DataFileStructLike data_file(*entry.data_file);
ICEBERG_ASSIGN_OR_RAISE(bool should_match,
data_file_evaluator->Evaluate(data_file));
if (!should_match) {
continue;
}
}

if (!manifest_entry_predicate_(entry)) {
Expand Down
173 changes: 173 additions & 0 deletions src/iceberg/row/manifest_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,57 @@

#include "iceberg/row/manifest_wrapper.h"

#include <iterator>
#include <map>
#include <memory>
#include <span>
#include <type_traits>
#include <vector>

#include "iceberg/manifest/manifest_reader_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

enum class DataFileFieldPosition : size_t {
kContent = 0,
kFilePath = 1,
kFileFormat = 2,
kSpecId = 3,
kPartition = 4,
kRecordCount = 5,
kFileSize = 6,
kColumnSizes = 7,
kValueCounts = 8,
kNullValueCounts = 9,
kNanValueCounts = 10,
kLowerBounds = 11,
kUpperBounds = 12,
kKeyMetadata = 13,
kSplitOffsets = 14,
kEqualityIds = 15,
kSortOrderId = 16,
kFirstRowId = 17,
kReferencedDataFile = 18,
kContentOffset = 19,
kContentSize = 20,
kNextUnusedId = 21,
};

template <typename T>
requires std::is_same_v<T, std::vector<uint8_t>> || std::is_same_v<T, std::string>
std::string_view ToView(const T& value) {
return {reinterpret_cast<const char*>(value.data()), value.size()}; // NOLINT
}

Scalar ToScalar(const int32_t value) { return value; }

Scalar ToScalar(const int64_t value) { return value; }

Scalar ToScalar(const std::vector<uint8_t>& value) { return ToView(value); }

template <typename T>
Result<Scalar> FromOptional(const std::optional<T>& value) {
if (value.has_value()) {
Expand All @@ -39,6 +78,79 @@ Result<Scalar> FromOptional(const std::optional<T>& value) {
return std::monostate{};
}

Result<Scalar> FromOptionalString(const std::optional<std::string>& value) {
if (value.has_value()) {
return ToView(value.value());
}
return std::monostate{};
}

template <typename T>
class VectorArrayLike : public ArrayLike {
public:
explicit VectorArrayLike(std::span<const T> values) : values_(values) {}

Result<Scalar> GetElement(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid array index: {}", pos);
}
return ToScalar(values_[pos]);
}

size_t size() const override { return values_.size(); }

private:
std::span<const T> values_;
};

template <typename V>
class IntMapLike : public MapLike {
public:
explicit IntMapLike(const std::map<int32_t, V>& values) : values_(values) {}

Result<Scalar> GetKey(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid map index: {}", pos);
}
return std::next(values_.get().cbegin(), pos)->first;
}

Result<Scalar> GetValue(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid map index: {}", pos);
}
return ToScalar(std::next(values_.get().cbegin(), pos)->second);
}

size_t size() const override { return values_.get().size(); }

private:
std::reference_wrapper<const std::map<int32_t, V>> values_;
};

template <typename V>
Result<Scalar> FromOptionalMap(const std::map<int32_t, V>& values) {
if (values.empty()) {
return std::monostate{};
}
return std::make_shared<IntMapLike<V>>(values);
}

template <typename T>
Result<Scalar> FromOptionalVector(const std::vector<T>& values) {
if (values.empty()) {
return std::monostate{};
}
return std::make_shared<VectorArrayLike<T>>(values);
}

Result<Scalar> FromOptionalBytes(const std::vector<uint8_t>& value) {
if (value.empty()) {
return std::monostate{};
}
return ToView(value);
}

} // namespace

Result<Scalar> PartitionFieldSummaryStructLike::GetField(size_t pos) const {
Expand Down Expand Up @@ -134,4 +246,65 @@ std::unique_ptr<StructLike> FromManifestFile(const ManifestFile& file) {
return std::make_unique<ManifestFileStructLike>(file);
}

Result<Scalar> DataFileStructLike::GetField(size_t pos) const {
if (pos >= num_fields()) {
return InvalidArgument("Invalid data file field index: {}", pos);
}

const auto& data_file = data_file_.get();
switch (static_cast<DataFileFieldPosition>(pos)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we expose partition_spec_id field as an optional field? FYI Java puts it at pos 3 (zero-based).

case DataFileFieldPosition::kContent:
return static_cast<int32_t>(data_file.content);
case DataFileFieldPosition::kFilePath:
return ToView(data_file.file_path);
case DataFileFieldPosition::kFileFormat:
return ToString(data_file.file_format);
case DataFileFieldPosition::kSpecId:
return FromOptional(data_file.partition_spec_id);
case DataFileFieldPosition::kPartition: {
partition_ = std::make_shared<PartitionValues>(data_file.partition);
return partition_;
}
case DataFileFieldPosition::kRecordCount:
return data_file.record_count;
case DataFileFieldPosition::kFileSize:
return data_file.file_size_in_bytes;
case DataFileFieldPosition::kColumnSizes:
return FromOptionalMap(data_file.column_sizes);
case DataFileFieldPosition::kValueCounts:
return FromOptionalMap(data_file.value_counts);
case DataFileFieldPosition::kNullValueCounts:
return FromOptionalMap(data_file.null_value_counts);
case DataFileFieldPosition::kNanValueCounts:
return FromOptionalMap(data_file.nan_value_counts);
case DataFileFieldPosition::kLowerBounds:
return FromOptionalMap(data_file.lower_bounds);
case DataFileFieldPosition::kUpperBounds:
return FromOptionalMap(data_file.upper_bounds);
case DataFileFieldPosition::kKeyMetadata:
return FromOptionalBytes(data_file.key_metadata);
case DataFileFieldPosition::kSplitOffsets:
return FromOptionalVector(data_file.split_offsets);
case DataFileFieldPosition::kEqualityIds:
return FromOptionalVector(data_file.equality_ids);
case DataFileFieldPosition::kSortOrderId:
return FromOptional(data_file.sort_order_id);
case DataFileFieldPosition::kFirstRowId:
return FromOptional(data_file.first_row_id);
case DataFileFieldPosition::kReferencedDataFile:
return FromOptionalString(data_file.referenced_data_file);
case DataFileFieldPosition::kContentOffset:
return FromOptional(data_file.content_offset);
case DataFileFieldPosition::kContentSize:
return FromOptional(data_file.content_size_in_bytes);
case DataFileFieldPosition::kNextUnusedId:
return InvalidArgument("Invalid data file field index: {}", pos);
}
return InvalidArgument("Invalid data file field index: {}", pos);
}

size_t DataFileStructLike::num_fields() const {
return static_cast<size_t>(DataFileFieldPosition::kNextUnusedId);
}

} // namespace iceberg
Loading
Loading