diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 2ecd652f7..808a1455d 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -81,6 +81,7 @@ set(ICEBERG_SOURCES transform_function.cc type.cc update/pending_update.cc + update/snapshot_update.cc update/update_partition_spec.cc update/update_properties.cc update/update_schema.cc diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index ba38c1f93..b19ffce7d 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -17,7 +17,6 @@ * under the License. */ -#include #include #include #include @@ -40,6 +39,7 @@ #include "iceberg/schema_util_internal.h" #include "iceberg/util/formatter.h" #include "iceberg/util/macros.h" +#include "iceberg/util/string_util.h" #include "iceberg/util/visit_type.h" namespace iceberg::avro { @@ -471,13 +471,7 @@ Result GetId(const ::avro::NodePtr& node, const std::string& attr_name, return InvalidSchema("Missing avro attribute: {}", attr_name); } - int32_t id; - const auto& id_value = id_str.value(); - auto [_, ec] = std::from_chars(id_value.data(), id_value.data() + id_value.size(), id); - if (ec != std::errc()) { - return InvalidSchema("Invalid {}: {}", attr_name, id_value); - } - return id; + return StringUtils::ParseInt(id_str.value()); } Result GetElementId(const ::avro::NodePtr& node) { diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index 669d0b50b..9daef843f 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -1553,9 +1553,17 @@ Result> TableUpdateFromJson(const nlohmann::json& j GetJsonValueOptional(json, kMaxSnapshotAgeMs)); ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age, GetJsonValueOptional(json, kMaxRefAgeMs)); - return std::make_unique(std::move(ref_name), snapshot_id, type, - min_snapshots, max_snapshot_age, - max_ref_age); + if (type == SnapshotRefType::kTag) { + ICEBERG_ASSIGN_OR_RAISE(auto tag, SnapshotRef::MakeTag(snapshot_id, max_ref_age)); + return std::make_unique(std::move(ref_name), *tag); + } else { + ICEBERG_CHECK(type == SnapshotRefType::kBranch, + "Expected branch type for snapshot ref"); + ICEBERG_ASSIGN_OR_RAISE(auto branch, + SnapshotRef::MakeBranch(snapshot_id, min_snapshots, + max_snapshot_age, max_ref_age)); + return std::make_unique(std::move(ref_name), *branch); + } } if (action == kActionSetProperties) { using StringMap = std::unordered_map; diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 78ebd604c..efc983014 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -102,6 +102,7 @@ iceberg_sources = files( 'transform_function.cc', 'type.cc', 'update/pending_update.cc', + 'update/snapshot_update.cc', 'update/update_partition_spec.cc', 'update/update_properties.cc', 'update/update_schema.cc', diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc index 5b6aabc10..497750805 100644 --- a/src/iceberg/snapshot.cc +++ b/src/iceberg/snapshot.cc @@ -23,6 +23,7 @@ #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/manifest_reader.h" #include "iceberg/util/macros.h" +#include "iceberg/util/string_util.h" namespace iceberg { @@ -49,6 +50,78 @@ SnapshotRefType SnapshotRef::type() const noexcept { retention); } +Status SnapshotRef::MinSnapshotsToKeep(std::optional value) { + ICEBERG_PRECHECK(this->type() != SnapshotRefType::kTag, + "Tags do not support setting minSnapshotsToKeep"); + ICEBERG_PRECHECK(!value.has_value() || value.value() > 0, + "Min snapshots to keep must be greater than 0"); + std::get(this->retention).min_snapshots_to_keep = value; + return {}; +} + +Status SnapshotRef::MaxSnapshotAgeMs(std::optional value) { + ICEBERG_PRECHECK(this->type() != SnapshotRefType::kTag, + "Tags do not support setting maxSnapshotAgeMs"); + ICEBERG_PRECHECK(!value.has_value() || value.value() > 0, + "Max snapshot age must be greater than 0 ms"); + std::get(this->retention).max_snapshot_age_ms = value; + return {}; +} + +Status SnapshotRef::MaxRefAgeMs(std::optional value) { + ICEBERG_PRECHECK(!value.has_value() || value.value() > 0, + "Max reference age must be greater than 0"); + if (this->type() == SnapshotRefType::kBranch) { + std::get(this->retention).max_ref_age_ms = value; + } else { + std::get(this->retention).max_ref_age_ms = value; + } + return {}; +} + +Result> SnapshotRef::MakeBranch( + int64_t snapshot_id, std::optional min_snapshots_to_keep, + std::optional max_snapshot_age_ms, std::optional max_ref_age_ms) { + // Validate optional parameters + if (min_snapshots_to_keep.has_value() && min_snapshots_to_keep.value() <= 0) { + return InvalidArgument("Min snapshots to keep must be greater than 0"); + } + if (max_snapshot_age_ms.has_value() && max_snapshot_age_ms.value() <= 0) { + return InvalidArgument("Max snapshot age must be greater than 0 ms"); + } + if (max_ref_age_ms.has_value() && max_ref_age_ms.value() <= 0) { + return InvalidArgument("Max reference age must be greater than 0"); + } + + auto ref = std::make_unique(); + ref->snapshot_id = snapshot_id; + ref->retention = Branch{.min_snapshots_to_keep = min_snapshots_to_keep, + .max_snapshot_age_ms = max_snapshot_age_ms, + .max_ref_age_ms = max_ref_age_ms}; + return ref; +} + +Result> SnapshotRef::MakeTag( + int64_t snapshot_id, std::optional max_ref_age_ms) { + // Validate optional parameter + if (max_ref_age_ms.has_value() && max_ref_age_ms.value() <= 0) { + return InvalidArgument("Max reference age must be greater than 0"); + } + + auto ref = std::make_unique(); + ref->snapshot_id = snapshot_id; + ref->retention = Tag{.max_ref_age_ms = max_ref_age_ms}; + return ref; +} + +std::unique_ptr SnapshotRef::Clone( + std::optional new_snapshot_id) const { + auto ref = std::make_unique(); + ref->snapshot_id = new_snapshot_id.value_or(snapshot_id); + ref->retention = retention; + return ref; +} + bool SnapshotRef::Equals(const SnapshotRef& other) const { if (this == &other) { return true; @@ -75,6 +148,24 @@ std::optional Snapshot::operation() const { return std::nullopt; } +Result> Snapshot::FirstRowId() const { + auto it = summary.find(SnapshotSummaryFields::kFirstRowId); + if (it == summary.end()) { + return std::nullopt; + } + + return StringUtils::ParseInt(it->second); +} + +Result> Snapshot::AddedRows() const { + auto it = summary.find(SnapshotSummaryFields::kAddedRows); + if (it == summary.end()) { + return std::nullopt; + } + + return StringUtils::ParseInt(it->second); +} + bool Snapshot::Equals(const Snapshot& other) const { if (this == &other) { return true; diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index 86a7fe20d..71be3a3cd 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -25,7 +25,6 @@ #include #include #include -#include #include #include "iceberg/iceberg_export.h" @@ -114,6 +113,51 @@ struct ICEBERG_EXPORT SnapshotRef { SnapshotRefType type() const noexcept; + /// \brief Set the minimum number of snapshots to keep (branch only) + /// \param value The minimum number of snapshots to keep, or nullopt for default + /// \return Status indicating success or failure + Status MinSnapshotsToKeep(std::optional value); + + /// \brief Set the maximum snapshot age in milliseconds (branch only) + /// \param value The maximum snapshot age in milliseconds, or nullopt for default + /// \return Status indicating success or failure + Status MaxSnapshotAgeMs(std::optional value); + + /// \brief Set the maximum reference age in milliseconds + /// \param value The maximum reference age in milliseconds, or nullopt for default + /// \return Status indicating success or failure + Status MaxRefAgeMs(std::optional value); + + /// \brief Create a branch reference + /// + /// \param snapshot_id The snapshot ID for the branch + /// \param min_snapshots_to_keep Optional minimum number of snapshots to keep + /// \param max_snapshot_age_ms Optional maximum snapshot age in milliseconds + /// \param max_ref_age_ms Optional maximum reference age in milliseconds + /// \return A Result containing a unique_ptr to the SnapshotRef, or an error if + /// validation failed + static Result> MakeBranch( + int64_t snapshot_id, std::optional min_snapshots_to_keep = std::nullopt, + std::optional max_snapshot_age_ms = std::nullopt, + std::optional max_ref_age_ms = std::nullopt); + + /// \brief Create a tag reference + /// + /// \param snapshot_id The snapshot ID for the tag + /// \param max_ref_age_ms Optional maximum reference age in milliseconds + /// \return A Result containing a unique_ptr to the SnapshotRef, or an error if + /// validation failed + static Result> MakeTag( + int64_t snapshot_id, std::optional max_ref_age_ms = std::nullopt); + + /// \brief Clone this SnapshotRef with an optional new snapshot ID + /// + /// \param new_snapshot_id Optional new snapshot ID. If not provided, uses the current + /// snapshot_id + /// \return A unique_ptr to the cloned SnapshotRef + std::unique_ptr Clone( + std::optional new_snapshot_id = std::nullopt) const; + /// \brief Compare two snapshot refs for equality friend bool operator==(const SnapshotRef& lhs, const SnapshotRef& rhs) { return lhs.Equals(rhs); @@ -129,6 +173,12 @@ struct SnapshotSummaryFields { /// \brief The operation field key inline static const std::string kOperation = "operation"; + /// \brief The first row id field key + inline static const std::string kFirstRowId = "first-row-id"; + + /// \brief The added rows field key + inline static const std::string kAddedRows = "added-rows"; + /// Metrics, see https://iceberg.apache.org/spec/#metrics /// \brief Number of data files added in the snapshot @@ -253,6 +303,29 @@ struct ICEBERG_EXPORT Snapshot { /// unknown. std::optional operation() const; + /// \brief The row-id of the first newly added row in this snapshot. + /// + /// All rows added in this snapshot will have a row-id assigned to them greater than + /// this value. All rows with a row-id less than this value were created in a snapshot + /// that was added to the table (but not necessarily committed to this branch) in the + /// past. + /// + /// \return the first row-id to be used in this snapshot or nullopt when row lineage + /// is not supported + Result> FirstRowId() const; + + /// \brief The upper bound of number of rows with assigned row IDs in this snapshot. + /// + /// It can be used safely to increment the table's `next-row-id` during a commit. It + /// can be more than the number of rows added in this snapshot and include some + /// existing rows. + /// + /// This field is optional but is required when the table version supports row lineage. + /// + /// \return the upper bound of number of rows with assigned row IDs in this snapshot + /// or nullopt if the value was not stored. + Result> AddedRows() const; + /// \brief Compare two snapshots for equality. friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) { return lhs.Equals(rhs); diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 311395856..77fe763ff 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -50,7 +50,7 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { virtual ~Table(); - /// \brief Return the identifier of this table + /// \brief Returns the identifier of this table const TableIdentifier& name() const { return identifier_; } /// \brief Returns the UUID of the table @@ -59,40 +59,40 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \brief Return the schema for this table, return NotFoundError if not found Result> schema() const; - /// \brief Return a map of schema for this table + /// \brief Returns a map of schema for this table Result< std::reference_wrapper>>> schemas() const; - /// \brief Return the partition spec for this table, return NotFoundError if not found + /// \brief Returns the partition spec for this table, return NotFoundError if not found Result> spec() const; - /// \brief Return a map of partition specs for this table + /// \brief Returns a map of partition specs for this table Result>>> specs() const; - /// \brief Return the sort order for this table, return NotFoundError if not found + /// \brief Returns the sort order for this table, return NotFoundError if not found Result> sort_order() const; - /// \brief Return a map of sort order IDs to sort orders for this table + /// \brief Returns a map of sort order IDs to sort orders for this table Result>>> sort_orders() const; - /// \brief Return a map of string properties for this table + /// \brief Returns the properties of this table const TableProperties& properties() const; - /// \brief Return the table's metadata file location + /// \brief Returns the table's metadata file location std::string_view metadata_file_location() const; - /// \brief Return the table's base location + /// \brief Returns the table's base location std::string_view location() const; /// \brief Returns the time when this table was last updated TimePointMs last_updated_ms() const; - /// \brief Return the table's current snapshot, return NotFoundError if not found + /// \brief Returns the table's current snapshot, return NotFoundError if not found Result> current_snapshot() const; /// \brief Get the snapshot of this table with the given id diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 851048b30..799fb343a 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -52,6 +52,7 @@ #include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" #include "iceberg/util/property_util.h" +#include "iceberg/util/timepoint.h" #include "iceberg/util/type_util.h" #include "iceberg/util/uuid.h" @@ -277,6 +278,20 @@ Result> TableMetadata::SnapshotById(int64_t snapshot_i return *iter; } +std::shared_ptr TableMetadata::OptionalSnapshotById(int64_t snapshot_id) const { + auto iter = std::ranges::find_if(snapshots, [snapshot_id](const auto& snapshot) { + return snapshot != nullptr && snapshot->snapshot_id == snapshot_id; + }); + if (iter == snapshots.end()) { + return nullptr; + } + return *iter; +} + +int64_t TableMetadata::NextSequenceNumber() const { + return format_version > 1 ? last_sequence_number + 1 : kInitialSequenceNumber; +} + namespace { template @@ -555,6 +570,10 @@ class TableMetadataBuilder::Impl { sort_orders_by_id_.emplace(order->order_id(), order); } + for (const auto& snapshot : metadata_.snapshots) { + snapshots_by_id_.emplace(snapshot->snapshot_id, snapshot); + } + metadata_.last_updated_ms = kInvalidLastUpdatedMs; } @@ -591,6 +610,9 @@ class TableMetadataBuilder::Impl { Status RemoveSchemas(const std::unordered_set& schema_ids); Result AddSchema(const Schema& schema, int32_t new_last_column_id); void SetLocation(std::string_view location); + Status AddSnapshot(std::shared_ptr snapshot); + Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch); + Status SetRef(const std::string& name, std::shared_ptr ref); Result> Build(); @@ -613,6 +635,32 @@ class TableMetadataBuilder::Impl { /// \return The ID to use for this schema (reused if exists, new otherwise int32_t ReuseOrCreateNewSchemaId(const Schema& new_schema) const; + /// \brief Finds intermediate snapshots that have not been committed as the current + /// snapshot. + /// + /// Transactions can create snapshots that are never the current snapshot because + /// several changes are combined by the transaction into one table metadata update. When + /// each intermediate snapshot is added to table metadata, it is added to the snapshot + /// log, assuming that it will be the current snapshot. When there are multiple snapshot + /// updates, the log must be corrected by suppressing the intermediate snapshot entries. + /// + /// A snapshot is an intermediate snapshot if it was added but is not the current + /// snapshot. + /// + /// \param current_snapshot_id The current snapshot ID + /// \return A set of snapshot IDs for all added snapshots that were later replaced as + /// the current snapshot in changes + std::unordered_set IntermediateSnapshotIdSet( + int64_t current_snapshot_id) const; + + /// \brief Updates the snapshot log by removing intermediate snapshots and handling + /// removed snapshots. + /// + /// \param current_snapshot_id The current snapshot ID + /// \return Updated snapshot log or error + Result> UpdateSnapshotLog( + int64_t current_snapshot_id) const; + private: // Base metadata (nullptr for new tables) const TableMetadata* base_; @@ -634,6 +682,7 @@ class TableMetadataBuilder::Impl { std::unordered_map> schemas_by_id_; std::unordered_map> specs_by_id_; std::unordered_map> sort_orders_by_id_; + std::unordered_map> snapshots_by_id_; }; Status TableMetadataBuilder::Impl::AssignUUID(std::string_view uuid) { @@ -982,6 +1031,205 @@ void TableMetadataBuilder::Impl::SetLocation(std::string_view location) { changes_.push_back(std::make_unique(std::string(location))); } +Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr snapshot) { + if (snapshot == nullptr) { + // change is a noop + return {}; + } + ICEBERG_CHECK(!metadata_.schemas.empty(), + "Attempting to add a snapshot before a schema is added"); + ICEBERG_CHECK(!metadata_.partition_specs.empty(), + "Attempting to add a snapshot before a partition spec is added"); + ICEBERG_CHECK(!metadata_.sort_orders.empty(), + "Attempting to add a snapshot before a sort order is added"); + + ICEBERG_CHECK(!snapshots_by_id_.contains(snapshot->snapshot_id), + "Snapshot already exists for id: {}", snapshot->snapshot_id); + + ICEBERG_CHECK( + metadata_.format_version == 1 || + snapshot->sequence_number > metadata_.last_sequence_number || + !snapshot->parent_snapshot_id.has_value(), + "Cannot add snapshot with sequence number {} older than last sequence number {}", + snapshot->sequence_number, metadata_.last_sequence_number); + + metadata_.last_updated_ms = snapshot->timestamp_ms; + metadata_.last_sequence_number = snapshot->sequence_number; + + metadata_.snapshots.push_back(snapshot); + snapshots_by_id_.emplace(snapshot->snapshot_id, snapshot); + + changes_.push_back(std::make_unique(snapshot)); + + // Handle row lineage for format version >= 3 + if (metadata_.format_version >= TableMetadata::kMinFormatVersionRowLineage) { + ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, snapshot->FirstRowId()); + ICEBERG_CHECK(first_row_id.has_value(), + "Cannot add a snapshot: first-row-id is null"); + ICEBERG_CHECK( + first_row_id.value() >= metadata_.next_row_id, + "Cannot add a snapshot, first-row-id is behind table next-row-id: {} < {}", + first_row_id.value(), metadata_.next_row_id); + + ICEBERG_ASSIGN_OR_RAISE(auto add_rows, snapshot->AddedRows()); + metadata_.next_row_id += add_rows.value_or(0); + } + + return {}; +} + +Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id, + const std::string& branch) { + // Check if ref already exists with the same snapshot ID + auto ref_it = metadata_.refs.find(branch); + if (ref_it != metadata_.refs.end() && ref_it->second->snapshot_id == snapshot_id) { + return {}; + } + + auto snapshot_it = snapshots_by_id_.find(snapshot_id); + ICEBERG_PRECHECK(snapshot_it != snapshots_by_id_.end(), + "Cannot set {} to unknown snapshot: {}", branch, snapshot_id); + const auto& snapshot = snapshot_it->second; + + // If ref exists, validate it's a branch and check if snapshot ID matches + if (ref_it != metadata_.refs.end()) { + const auto& ref = ref_it->second; + ICEBERG_CHECK(ref->type() == SnapshotRefType::kBranch, + "Cannot update branch: {} is a tag", branch); + if (ref->snapshot_id == snapshot_id) { + return {}; + } + } + + ICEBERG_CHECK( + metadata_.format_version == 1 || + snapshot->sequence_number <= metadata_.last_sequence_number, + "Last sequence number {} is less than existing snapshot sequence number {}", + metadata_.last_sequence_number, snapshot->sequence_number); + + // Create new ref: either from existing ref or create new branch ref + std::shared_ptr new_ref; + if (ref_it != metadata_.refs.end()) { + new_ref = std::shared_ptr(ref_it->second->Clone(snapshot_id)); + } else { + ICEBERG_ASSIGN_OR_RAISE(auto ref_result, SnapshotRef::MakeBranch(snapshot_id)); + new_ref = std::shared_ptr(std::move(ref_result)); + } + + return SetRef(branch, std::move(new_ref)); +} + +Status TableMetadataBuilder::Impl::SetRef(const std::string& name, + std::shared_ptr ref) { + auto existing_ref_it = metadata_.refs.find(name); + if (existing_ref_it != metadata_.refs.end() && *existing_ref_it->second == *ref) { + return {}; + } + + int64_t snapshot_id = ref->snapshot_id; + auto snapshot_it = snapshots_by_id_.find(snapshot_id); + ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(), + "Cannot set {} to unknown snapshot: {}", name, snapshot_id); + const auto& snapshot = snapshot_it->second; + + // If snapshot was added in this set of changes, update last_updated_ms + if (std::ranges::any_of(changes_, [snapshot_id](const auto& change) { + if (change->kind() != TableUpdate::Kind::kAddSnapshot) { + return false; + } + const auto* add_snapshot = + internal::checked_cast(change.get()); + return add_snapshot->snapshot()->snapshot_id == snapshot_id; + })) { + metadata_.last_updated_ms = snapshot->timestamp_ms; + } + + // If it's MAIN_BRANCH, update currentSnapshotId and add to snapshotLog + if (name == SnapshotRef::kMainBranch) { + metadata_.current_snapshot_id = ref->snapshot_id; + if (metadata_.last_updated_ms == kInvalidLastUpdatedMs) { + metadata_.last_updated_ms = CurrentTimePointMs(); + } + metadata_.snapshot_log.emplace_back(metadata_.last_updated_ms, ref->snapshot_id); + } + + // Set the ref + metadata_.refs[name] = ref; + + changes_.push_back(std::make_unique(name, *ref)); + + return {}; +} + +std::unordered_set TableMetadataBuilder::Impl::IntermediateSnapshotIdSet( + int64_t current_snapshot_id) const { + std::unordered_set added_snapshot_ids; + std::unordered_set intermediate_snapshot_ids; + + std::ranges::for_each(changes_, [&](const auto& change) { + if (change->kind() == TableUpdate::Kind::kAddSnapshot) { + // Adds must always come before set current snapshot + const auto* add_snapshot = + internal::checked_cast(change.get()); + added_snapshot_ids.insert(add_snapshot->snapshot()->snapshot_id); + } else if (change->kind() == TableUpdate::Kind::kSetSnapshotRef) { + const auto* set_ref = + internal::checked_cast(change.get()); + int64_t snapshot_id = set_ref->snapshot_id(); + if (added_snapshot_ids.contains(snapshot_id) && + set_ref->ref_name() == SnapshotRef::kMainBranch && + snapshot_id != current_snapshot_id) { + intermediate_snapshot_ids.insert(snapshot_id); + } + } + }); + + return intermediate_snapshot_ids; +} + +Result> TableMetadataBuilder::Impl::UpdateSnapshotLog( + int64_t current_snapshot_id) const { + std::unordered_set intermediate_snapshot_ids = + IntermediateSnapshotIdSet(current_snapshot_id); + bool has_removed_snapshots = std::ranges::any_of(changes_, [](const auto& change) { + return change->kind() == TableUpdate::Kind::kRemoveSnapshots; + }); + + if (intermediate_snapshot_ids.empty() && !has_removed_snapshots) { + return metadata_.snapshot_log; + } + + // Update the snapshot log + std::vector new_snapshot_log; + for (const auto& log_entry : metadata_.snapshot_log) { + int64_t snapshot_id = log_entry.snapshot_id; + if (snapshots_by_id_.contains(snapshot_id)) { + if (!intermediate_snapshot_ids.contains(snapshot_id)) { + // Copy the log entries that are still valid + new_snapshot_log.push_back(log_entry); + } + } else if (has_removed_snapshots) { + // Any invalid entry causes the history before it to be removed. Otherwise, there + // could be history gaps that cause time-travel queries to produce incorrect + // results. For example, if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is + // removed, the history cannot be [(t1, s1), (t3, s3)] because it appears that s3 + // was current during the time between t2 and t3 when in fact s2 was the current + // snapshot. + new_snapshot_log.clear(); + } + } + + if (snapshots_by_id_.contains(current_snapshot_id)) { + ICEBERG_CHECK(!new_snapshot_log.empty(), + "Cannot set invalid snapshot log: no entries"); + ICEBERG_CHECK( + new_snapshot_log.back().snapshot_id == current_snapshot_id, + "Cannot set invalid snapshot log: latest entry is not the current snapshot"); + } + + return new_snapshot_log; +} + Result> TableMetadataBuilder::Impl::Build() { // 1. Validate metadata consistency through TableMetadata#Validate @@ -1025,7 +1273,9 @@ Result> TableMetadataBuilder::Impl::Build() { metadata_.metadata_log.end() - max_metadata_log_size); } - // TODO(anyone): 4. update snapshot_log + // 4. Update snapshot_log + ICEBERG_ASSIGN_OR_RAISE(metadata_.snapshot_log, + UpdateSnapshotLog(metadata_.current_snapshot_id)); // 5. Create and return the TableMetadata return std::make_unique(std::move(metadata_)); @@ -1207,17 +1457,20 @@ TableMetadataBuilder& TableMetadataBuilder::AddSortOrder( TableMetadataBuilder& TableMetadataBuilder::AddSnapshot( std::shared_ptr snapshot) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(snapshot)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(int64_t snapshot_id, const std::string& branch) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetBranchSnapshot(snapshot_id, branch)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetRef(const std::string& name, std::shared_ptr ref) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetRef(name, std::move(ref))); + return *this; } TableMetadataBuilder& TableMetadataBuilder::RemoveRef(const std::string& name) { diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index 3e3eb9c70..a3a757f14 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -145,6 +145,11 @@ struct ICEBERG_EXPORT TableMetadata { Result> Snapshot() const; /// \brief Get the snapshot of this table with the given id Result> SnapshotById(int64_t snapshot_id) const; + /// \brief Get the snapshot of this table with the given id + /// \note Like SnapshotById, but returns nullptr if not found + std::shared_ptr OptionalSnapshotById(int64_t snapshot_id) const; + /// \brief Get the next sequence number + int64_t NextSequenceNumber() const; ICEBERG_EXPORT friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs); diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h index feb4a2001..5d5c17db0 100644 --- a/src/iceberg/table_properties.h +++ b/src/iceberg/table_properties.h @@ -20,7 +20,6 @@ #pragma once #include -#include #include #include #include @@ -244,6 +243,9 @@ class ICEBERG_EXPORT TableProperties : public ConfigBase { inline static Entry kDeleteTargetFileSizeBytes{ "write.delete.target-file-size-bytes", int64_t{64} * 1024 * 1024}; // 64 MB + inline static Entry kSnapshotIdInheritanceEnabled{ + "compatibility.snapshot-id-inheritance.enabled", false}; + // Garbage collection properties inline static Entry kGcEnabled{"gc.enabled", true}; diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 38ce0fbc9..29388d47c 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -274,7 +274,7 @@ std::unique_ptr SetDefaultSortOrder::Clone() const { // AddSnapshot void AddSnapshot::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.AddSnapshot(snapshot_); } void AddSnapshot::GenerateRequirements(TableUpdateContext& context) const { @@ -344,7 +344,7 @@ std::unique_ptr RemoveSnapshotRef::Clone() const { // SetSnapshotRef void SetSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.SetBranchSnapshot(snapshot_id_, ref_name_); } void SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const { diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h index 875243195..3c9c9dbbc 100644 --- a/src/iceberg/table_update.h +++ b/src/iceberg/table_update.h @@ -401,6 +401,19 @@ class ICEBERG_EXPORT SetSnapshotRef : public TableUpdate { max_snapshot_age_ms_(max_snapshot_age_ms), max_ref_age_ms_(max_ref_age_ms) {} + SetSnapshotRef(std::string ref_name, const SnapshotRef& ref) + : ref_name_(std::move(ref_name)), snapshot_id_(ref.snapshot_id), type_(ref.type()) { + if (type_ == SnapshotRefType::kBranch) { + const auto& branch = std::get(ref.retention); + min_snapshots_to_keep_ = branch.min_snapshots_to_keep; + max_snapshot_age_ms_ = branch.max_snapshot_age_ms; + max_ref_age_ms_ = branch.max_ref_age_ms; + } else { + const auto& tag = std::get(ref.retention); + max_ref_age_ms_ = tag.max_ref_age_ms; + } + } + const std::string& ref_name() const { return ref_name_; } int64_t snapshot_id() const { return snapshot_id_; } SnapshotRefType type() const { return type_; } diff --git a/src/iceberg/test/table_requirements_test.cc b/src/iceberg/test/table_requirements_test.cc index 80f836367..dd0aa8f63 100644 --- a/src/iceberg/test/table_requirements_test.cc +++ b/src/iceberg/test/table_requirements_test.cc @@ -883,12 +883,12 @@ TEST(TableRequirementsTest, SetSnapshotRef) { // Multiple updates to same ref should deduplicate std::vector> updates; - updates.push_back(std::make_unique(kRefName, kSnapshotId, - SnapshotRefType::kBranch)); - updates.push_back(std::make_unique(kRefName, kSnapshotId + 1, - SnapshotRefType::kBranch)); - updates.push_back(std::make_unique(kRefName, kSnapshotId + 2, - SnapshotRefType::kBranch)); + ICEBERG_UNWRAP_OR_FAIL(auto ref1, SnapshotRef::MakeBranch(kSnapshotId)); + updates.push_back(std::make_unique(kRefName, *ref1)); + ICEBERG_UNWRAP_OR_FAIL(auto ref2, SnapshotRef::MakeBranch(kSnapshotId + 1)); + updates.push_back(std::make_unique(kRefName, *ref2)); + ICEBERG_UNWRAP_OR_FAIL(auto ref3, SnapshotRef::MakeBranch(kSnapshotId + 2)); + updates.push_back(std::make_unique(kRefName, *ref3)); auto result = TableRequirements::ForUpdateTable(*metadata, updates); ASSERT_THAT(result, IsOk()); diff --git a/src/iceberg/test/update_properties_test.cc b/src/iceberg/test/update_properties_test.cc index 8ac8e5eb2..59fa1d8d6 100644 --- a/src/iceberg/test/update_properties_test.cc +++ b/src/iceberg/test/update_properties_test.cc @@ -107,7 +107,7 @@ TEST_F(UpdatePropertiesTest, UpgradeFormatVersionInvalidString) { auto result = update->Apply(); EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(result, HasErrorMessage("Invalid format version")); + EXPECT_THAT(result, HasErrorMessage("Failed to parse integer from string")); } TEST_F(UpdatePropertiesTest, UpgradeFormatVersionOutOfRange) { diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 6641a1afd..0cc77aba9 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -25,15 +25,18 @@ #include "iceberg/schema.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" #include "iceberg/table_requirement.h" #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" #include "iceberg/update/update_sort_order.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -69,6 +72,16 @@ const TableMetadata* Transaction::base() const { return metadata_builder_->base( const TableMetadata& Transaction::current() const { return metadata_builder_->current(); } +std::string Transaction::MetadataFileLocation(std::string_view filename) const { + const auto metadata_location = + current().properties.Get(TableProperties::kWriteMetadataLocation); + if (metadata_location.empty()) { + return std::format("{}/{}", LocationUtil::StripTrailingSlash(metadata_location), + filename); + } + return std::format("{}/metadata/{}", current().location, filename); +} + Status Transaction::AddUpdate(const std::shared_ptr& update) { if (!last_update_committed_) { return InvalidArgument("Cannot add update when previous update is not committed"); @@ -113,6 +126,23 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->SetCurrentSchema(std::move(result.schema), result.new_last_column_id); } break; + case PendingUpdate::Kind::kUpdateSnapshot: { + auto& update_snapshot = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto result, update_snapshot.Apply()); + if (metadata_builder_->current() + .SnapshotById(result.snapshot->snapshot_id) + .has_value()) { + metadata_builder_->SetBranchSnapshot(result.snapshot->snapshot_id, + result.target_branch); + } else if (result.stage_only) { + metadata_builder_->AddSnapshot(result.snapshot); + } else { + // Normal commit - add snapshot first, then set as branch snapshot + metadata_builder_->AddSnapshot(result.snapshot); + metadata_builder_->SetBranchSnapshot(result.snapshot->snapshot_id, + result.target_branch); + } + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -158,6 +188,12 @@ Result> Transaction::Commit() { ICEBERG_ASSIGN_OR_RAISE(auto updated_table, table_->catalog()->UpdateTable( table_->name(), requirements, updates)); + for (const auto& update : pending_updates_) { + if (auto update_ptr = update.lock()) { + ICEBERG_RETURN_UNEXPECTED(update_ptr->Finalize()); + } + } + // Mark as committed and update table reference committed_ = true; table_ = std::move(updated_table); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index ea918a173..54f0ae143 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -49,6 +49,12 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_thisApply(*this); } +Status PendingUpdate::Finalize() { return {}; } + +const TableMetadata& PendingUpdate::base() const { return transaction_->current(); } + } // namespace iceberg diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 90723987c..498d3e775 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -46,6 +46,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { kUpdateProperties, kUpdateSchema, kUpdateSortOrder, + kUpdateSnapshot, }; /// \brief Return the kind of this pending update. @@ -59,6 +60,14 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { /// - CommitStateUnknown: unknown status, no cleanup should be done. virtual Status Commit(); + /// \brief Finalize the pending update. + /// + /// This method is called after the update is committed successfully. + /// Implementations should override this method to clean up any resources. + /// + /// \return Status indicating success or failure + virtual Status Finalize(); + // Non-copyable, movable PendingUpdate(const PendingUpdate&) = delete; PendingUpdate& operator=(const PendingUpdate&) = delete; @@ -70,6 +79,8 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { protected: explicit PendingUpdate(std::shared_ptr transaction); + const TableMetadata& base() const; + std::shared_ptr transaction_; }; diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc new file mode 100644 index 000000000..addecac56 --- /dev/null +++ b/src/iceberg/update/snapshot_update.cc @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/snapshot_update.h" + +#include + +#include "iceberg/constants.h" +#include "iceberg/file_io.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/manifest/rolling_manifest_writer.h" +#include "iceberg/partition_summary_internal.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/string_util.h" +#include "iceberg/util/uuid.h" + +namespace iceberg { + +namespace { + +Status UpdateTotal(std::unordered_map& summary, + const std::unordered_map& previous_summary, + const std::string& total_property, const std::string& added_property, + const std::string& deleted_property) { + auto total_it = previous_summary.find(total_property); + if (total_it != previous_summary.end()) { + ICEBERG_ASSIGN_OR_RAISE(auto new_total, + StringUtils::ParseInt(total_it->second)); + + auto added_it = summary.find(added_property); + if (new_total >= 0 && added_it != summary.end()) { + ICEBERG_ASSIGN_OR_RAISE(auto added_value, + StringUtils::ParseInt(added_it->second)); + new_total += added_value; + } + + auto deleted_it = summary.find(deleted_property); + if (new_total >= 0 && deleted_it != summary.end()) { + ICEBERG_ASSIGN_OR_RAISE(auto deleted_value, + StringUtils::ParseInt(deleted_it->second)); + new_total -= deleted_value; + } + + if (new_total >= 0) { + summary[total_property] = std::to_string(new_total); + } + } + return {}; +} + +/// \brief Add metadata to a manifest file by reading it and extracting statistics. +/// +/// This function reads the manifest file and fills in missing fields like +/// added_snapshot_id, file counts, row counts, and partition summaries. +Result AddMetadata(const ManifestFile& manifest, + std::shared_ptr file_io, + const TableMetadata& metadata) { + ICEBERG_PRECHECK(manifest.added_snapshot_id == kInvalidSnapshotId, + "Manifest already has a snapshot ID"); + + // Get the partition spec for this manifest + auto spec_iter = + std::ranges::find_if(metadata.partition_specs, [&manifest](const auto& spec) { + return spec != nullptr && spec->spec_id() == manifest.partition_spec_id; + }); + if (spec_iter == metadata.partition_specs.end()) { + return NotFound("Partition spec with ID {} is not found", manifest.partition_spec_id); + } + auto spec = *spec_iter; + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + + // Create a manifest reader and read all entries + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, file_io, schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + + // Statistics + int32_t added_files = 0; + int64_t added_rows = 0; + int32_t existing_files = 0; + int64_t existing_rows = 0; + int32_t deleted_files = 0; + int64_t deleted_rows = 0; + + std::optional snapshot_id; + int64_t max_snapshot_id = std::numeric_limits::min(); + + // Get partition type and create partition summary + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(*schema)); + PartitionSummary partition_summary(*partition_type); + + // Process entries + for (const auto& entry : entries) { + // Track max snapshot ID + if (entry.snapshot_id.has_value()) { + if (entry.snapshot_id.value() > max_snapshot_id) { + max_snapshot_id = entry.snapshot_id.value(); + } + } + + // Update statistics based on entry status + switch (entry.status) { + case ManifestStatus::kAdded: + added_files += 1; + if (entry.data_file) { + added_rows += entry.data_file->record_count; + } + if (!snapshot_id.has_value() && entry.snapshot_id.has_value()) { + snapshot_id = entry.snapshot_id; + } + break; + case ManifestStatus::kExisting: + existing_files += 1; + if (entry.data_file) { + existing_rows += entry.data_file->record_count; + } + break; + case ManifestStatus::kDeleted: + deleted_files += 1; + if (entry.data_file) { + deleted_rows += entry.data_file->record_count; + } + if (!snapshot_id.has_value() && entry.snapshot_id.has_value()) { + snapshot_id = entry.snapshot_id; + } + break; + } + + // Update partition summary + if (entry.data_file) { + ICEBERG_RETURN_UNEXPECTED(partition_summary.Update(entry.data_file->partition)); + } + } + + // If no snapshot ID was found from ADDED/DELETED entries, use the max snapshot ID + if (!snapshot_id.has_value()) { + if (max_snapshot_id != std::numeric_limits::min()) { + snapshot_id = max_snapshot_id; + } else { + return InvalidManifest("Cannot determine snapshot ID for manifest: {}", + manifest.manifest_path); + } + } + + ICEBERG_ASSIGN_OR_RAISE(auto partition_summaries, partition_summary.Summaries()); + + // Create enriched manifest file + ManifestFile enriched = manifest; + enriched.added_snapshot_id = snapshot_id.value(); + enriched.added_files_count = + added_files > 0 ? std::make_optional(added_files) : std::nullopt; + enriched.existing_files_count = + existing_files > 0 ? std::make_optional(existing_files) : std::nullopt; + enriched.deleted_files_count = + deleted_files > 0 ? std::make_optional(deleted_files) : std::nullopt; + enriched.added_rows_count = + added_rows > 0 ? std::make_optional(added_rows) : std::nullopt; + enriched.existing_rows_count = + existing_rows > 0 ? std::make_optional(existing_rows) : std::nullopt; + enriched.deleted_rows_count = + deleted_rows > 0 ? std::make_optional(deleted_rows) : std::nullopt; + enriched.partitions = std::move(partition_summaries); + + return enriched; +} + +} // anonymous namespace + +SnapshotUpdate::SnapshotUpdate(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)), + can_inherit_snapshot_id_( + base().format_version == 1 + ? base().properties.Get(TableProperties::kSnapshotIdInheritanceEnabled) + : true), + commit_uuid_(Uuid::GenerateV7().ToString()), + target_manifest_size_bytes_( + base().properties.Get(TableProperties::kManifestTargetSizeBytes)) { + // Initialize delete function if not set + if (!delete_func_) { + delete_func_ = [this](const std::string& path) { + return transaction_->table()->io()->DeleteFile(path); + }; + } +} + +Result> SnapshotUpdate::WriteDataManifests( + const std::vector& data_files, const std::shared_ptr& spec) { + if (data_files.empty()) { + return std::vector{}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); + + int8_t format_version = base().format_version; + std::optional snapshot_id = + snapshot_id_ ? std::make_optional(*snapshot_id_) : std::nullopt; + + // Create factory function for rolling manifest writer + RollingManifestWriter::ManifestWriterFactory factory = + [this, spec, current_schema, format_version, + snapshot_id]() -> Result> { + std::string manifest_path = ManifestPath(); + + if (format_version == 1) { + return ManifestWriter::MakeV1Writer( + snapshot_id, manifest_path, transaction_->table()->io(), spec, current_schema); + } else if (format_version == 2) { + return ManifestWriter::MakeV2Writer(snapshot_id, manifest_path, + transaction_->table()->io(), spec, + current_schema, ManifestContent::kData); + } else { // format_version == 3 + std::optional first_row_id = + transaction_->table()->metadata()->next_row_id; + return ManifestWriter::MakeV3Writer(snapshot_id, first_row_id, manifest_path, + transaction_->table()->io(), spec, + current_schema, ManifestContent::kData); + } + }; + + // Create rolling manifest writer + RollingManifestWriter rolling_writer(factory, target_manifest_size_bytes_); + + // Write all files + for (const auto& file : data_files) { + ICEBERG_RETURN_UNEXPECTED( + rolling_writer.WriteAddedEntry(std::make_shared(file))); + } + + // Close the rolling writer + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + + // Get all manifest files + ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, rolling_writer.ToManifestFiles()); + + return manifest_files; +} + +Result> SnapshotUpdate::WriteDeleteManifests( + const std::vector& delete_files, + const std::shared_ptr& spec) { + if (delete_files.empty()) { + return std::vector{}; + } + + int8_t format_version = base().format_version; + if (format_version < 2) { + // Delete manifests are only supported in format version 2+ + return std::vector{}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); + + std::optional snapshot_id = + snapshot_id_ ? std::make_optional(*snapshot_id_) : std::nullopt; + + // Create factory function for rolling manifest writer + RollingManifestWriter::ManifestWriterFactory factory = + [this, spec, current_schema, format_version, + snapshot_id]() -> Result> { + std::string manifest_path = ManifestPath(); + + if (format_version == 2) { + return ManifestWriter::MakeV2Writer(snapshot_id, manifest_path, + transaction_->table()->io(), spec, + current_schema, ManifestContent::kDeletes); + } else { // format_version == 3 + std::optional first_row_id = + transaction_->table()->metadata()->next_row_id; + return ManifestWriter::MakeV3Writer(snapshot_id, first_row_id, manifest_path, + transaction_->table()->io(), spec, + current_schema, ManifestContent::kDeletes); + } + }; + + // Create rolling manifest writer + RollingManifestWriter rolling_writer(factory, target_manifest_size_bytes_); + + // Write all delete files + for (const auto& file : delete_files) { + ICEBERG_RETURN_UNEXPECTED( + rolling_writer.WriteAddedEntry(std::make_shared(file))); + } + + // Close the rolling writer + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + + // Get all manifest files + ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, rolling_writer.ToManifestFiles()); + + return manifest_files; +} + +int64_t SnapshotUpdate::SnapshotId() { + if (snapshot_id_.has_value()) { + return *snapshot_id_; + } + snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base()); + return *snapshot_id_; +} + +Result SnapshotUpdate::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + // Get the latest snapshot for the target branch + std::optional parent_snapshot_id; + ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot, + SnapshotUtil::OptionalLatestSnapshot(base(), target_branch_)); + parent_snapshot_id = + parent_snapshot ? std::make_optional(parent_snapshot->snapshot_id) : std::nullopt; + int64_t sequence_number = base().NextSequenceNumber(); + + if (parent_snapshot) { + ICEBERG_RETURN_UNEXPECTED(Validate(base(), parent_snapshot)); + } + + ICEBERG_ASSIGN_OR_RAISE(auto manifests, Apply(base(), parent_snapshot)); + + std::string manifest_list_path = ManifestListPath(); + manifest_lists_.push_back(manifest_list_path); + + // Create manifest list writer based on format version + int8_t format_version = base().format_version; + int64_t snapshot_id = SnapshotId(); + std::unique_ptr writer; + + if (format_version == 1) { + ICEBERG_ASSIGN_OR_RAISE(writer, ManifestListWriter::MakeV1Writer( + snapshot_id, parent_snapshot_id, + manifest_list_path, transaction_->table()->io())); + } else if (format_version == 2) { + ICEBERG_ASSIGN_OR_RAISE(writer, ManifestListWriter::MakeV2Writer( + snapshot_id, parent_snapshot_id, sequence_number, + manifest_list_path, transaction_->table()->io())); + } else { // format_version == 3 + int64_t first_row_id = base().next_row_id; + ICEBERG_ASSIGN_OR_RAISE( + writer, ManifestListWriter::MakeV3Writer( + snapshot_id, parent_snapshot_id, sequence_number, first_row_id, + manifest_list_path, transaction_->table()->io())); + } + + // Enrich manifests that are missing metadata (added_snapshot_id == kInvalidSnapshotId) + std::vector enriched_manifests; + enriched_manifests.reserve(manifests.size()); + for (const auto& manifest : manifests) { + if (manifest.added_snapshot_id == kInvalidSnapshotId) { + // Check cache first to avoid regenerating enriched manifest + auto cache_it = enriched_manifest_cache_.find(manifest.manifest_path); + if (cache_it != enriched_manifest_cache_.end()) { + enriched_manifests.push_back(cache_it->second); + } else { + ICEBERG_ASSIGN_OR_RAISE( + auto enriched, AddMetadata(manifest, transaction_->table()->io(), base())); + // Store in cache for future use + enriched_manifest_cache_[manifest.manifest_path] = enriched; + enriched_manifests.push_back(std::move(enriched)); + } + } else { + enriched_manifests.push_back(manifest); + } + } + + ICEBERG_RETURN_UNEXPECTED(writer->AddAll(enriched_manifests)); + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + + // Get nextRowId and assignedRows for format version 3 + std::optional next_row_id; + std::optional assigned_rows; + if (format_version >= 3) { + next_row_id = base().next_row_id; + ICEBERG_CHECK(writer->next_row_id().has_value(), + "Next row ID is not set in manifest writer"); + assigned_rows = *writer->next_row_id() - *next_row_id; + } + + std::string op = operation(); + ICEBERG_CHECK(!op.empty(), "Operation is empty"); + + if (op == DataOperation::kReplace) { + const auto& summary = Summary(); + auto added_records_it = summary.find(SnapshotSummaryFields::kAddedRecords); + auto replaced_records_it = summary.find(SnapshotSummaryFields::kDeletedRecords); + if (added_records_it != summary.end() && replaced_records_it != summary.end()) { + ICEBERG_ASSIGN_OR_RAISE(auto added_records, + StringUtils::ParseInt(added_records_it->second)); + ICEBERG_ASSIGN_OR_RAISE(auto replaced_records, StringUtils::ParseInt( + replaced_records_it->second)); + if (added_records > replaced_records) { + return InvalidArgument( + "Invalid REPLACE operation: {} added records > {} replaced records", + added_records, replaced_records); + } + } + } + + ICEBERG_ASSIGN_OR_RAISE(auto summary, ComputeSummary(base())); + if (next_row_id.has_value()) { + summary[SnapshotSummaryFields::kFirstRowId] = std::to_string(*next_row_id); + } + if (assigned_rows.has_value()) { + summary[SnapshotSummaryFields::kAddedRows] = std::to_string(*assigned_rows); + } + + // Create snapshot + staged_snapshot_ = + std::make_shared(Snapshot{.snapshot_id = snapshot_id, + .parent_snapshot_id = parent_snapshot_id, + .sequence_number = sequence_number, + .timestamp_ms = CurrentTimePointMs(), + .manifest_list = manifest_list_path, + .summary = std::move(summary), + .schema_id = base().current_schema_id}); + + // Return the new snapshot + return ApplyResult{.snapshot = staged_snapshot_, + .target_branch = target_branch_, + .stage_only = stage_only_}; +} + +Status SnapshotUpdate::Finalize() { + // Cleanup after successful commit + if (CleanupAfterCommit() && staged_snapshot_ != nullptr) { + auto cached_snapshot = SnapshotCache(staged_snapshot_.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, + cached_snapshot.Manifests(transaction_->table()->io())); + std::unordered_set manifest_paths; + for (const auto& manifest : manifests) { + manifest_paths.insert(manifest.manifest_path); + } + CleanUncommitted(manifest_paths); + // Clean up unused manifest lists + for (const auto& manifest_list : manifest_lists_) { + if (manifest_list != staged_snapshot_->manifest_list) { + std::ignore = DeleteFile(manifest_list); + } + } + } + + return {}; +} + +Status SnapshotUpdate::SetTargetBranch(const std::string& branch) { + ICEBERG_PRECHECK(!branch.empty(), "Invalid branch name: empty"); + + auto ref_it = base().refs.find(branch); + if (ref_it != base().refs.end()) { + ICEBERG_PRECHECK( + ref_it->second->type() == SnapshotRefType::kBranch, + "{} is a tag, not a branch. Tags cannot be targets for producing snapshots", + branch); + } + + target_branch_ = branch; + return {}; +} + +Result> SnapshotUpdate::ComputeSummary( + const TableMetadata& previous) { + std::unordered_map summary = Summary(); + + if (summary.empty()) { + return summary; + } + + // Get previous summary from the target branch + std::unordered_map previous_summary; + if (auto ref_it = previous.refs.find(target_branch_); ref_it != previous.refs.end()) { + auto snapshot = previous.OptionalSnapshotById(ref_it->second->snapshot_id); + if (snapshot != nullptr && snapshot->summary.size() > 0) { + previous_summary = snapshot->summary; + } + } + + // If no previous summary, initialize with zeros + if (previous_summary.empty()) { + previous_summary[SnapshotSummaryFields::kTotalRecords] = "0"; + previous_summary[SnapshotSummaryFields::kTotalFileSize] = "0"; + previous_summary[SnapshotSummaryFields::kTotalDataFiles] = "0"; + previous_summary[SnapshotSummaryFields::kTotalDeleteFiles] = "0"; + previous_summary[SnapshotSummaryFields::kTotalPosDeletes] = "0"; + previous_summary[SnapshotSummaryFields::kTotalEqDeletes] = "0"; + } + + // Update totals + ICEBERG_RETURN_UNEXPECTED(UpdateTotal( + summary, previous_summary, SnapshotSummaryFields::kTotalRecords, + SnapshotSummaryFields::kAddedRecords, SnapshotSummaryFields::kDeletedRecords)); + ICEBERG_RETURN_UNEXPECTED(UpdateTotal( + summary, previous_summary, SnapshotSummaryFields::kTotalFileSize, + SnapshotSummaryFields::kAddedFileSize, SnapshotSummaryFields::kRemovedFileSize)); + ICEBERG_RETURN_UNEXPECTED(UpdateTotal( + summary, previous_summary, SnapshotSummaryFields::kTotalDataFiles, + SnapshotSummaryFields::kAddedDataFiles, SnapshotSummaryFields::kDeletedDataFiles)); + ICEBERG_RETURN_UNEXPECTED(UpdateTotal(summary, previous_summary, + SnapshotSummaryFields::kTotalDeleteFiles, + SnapshotSummaryFields::kAddedDeleteFiles, + SnapshotSummaryFields::kRemovedDeleteFiles)); + ICEBERG_RETURN_UNEXPECTED(UpdateTotal(summary, previous_summary, + SnapshotSummaryFields::kTotalPosDeletes, + SnapshotSummaryFields::kAddedPosDeletes, + SnapshotSummaryFields::kRemovedPosDeletes)); + ICEBERG_RETURN_UNEXPECTED(UpdateTotal( + summary, previous_summary, SnapshotSummaryFields::kTotalEqDeletes, + SnapshotSummaryFields::kAddedEqDeletes, SnapshotSummaryFields::kRemovedEqDeletes)); + + // TODO(anyone): we can add custom summary fields like engine info in the future + return summary; +} + +void SnapshotUpdate::CleanAll() { + for (const auto& manifest_list : manifest_lists_) { + std::ignore = DeleteFile(manifest_list); + } + manifest_lists_.clear(); + // Pass empty set - subclasses will implement CleanUncommitted + CleanUncommitted(std::unordered_set{}); +} + +Status SnapshotUpdate::DeleteFile(const std::string& path) { return delete_func_(path); } + +std::string SnapshotUpdate::ManifestListPath() { + // Generate manifest list path + // Format: {metadata_location}/snap-{snapshot_id}-{attempt}-{uuid}.avro + int64_t snapshot_id = SnapshotId(); + std::string filename = + std::format("snap-{}-{}-{}.avro", snapshot_id, ++attempt_, commit_uuid_); + return transaction_->MetadataFileLocation(filename); +} + +std::string SnapshotUpdate::ManifestPath() { + // Generate manifest path + // Format: {metadata_location}/{uuid}-m{manifest_count}.avro + std::string filename = std::format("{}-m{}.avro", commit_uuid_, manifest_count_++); + return transaction_->MetadataFileLocation(filename); +} + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h new file mode 100644 index 000000000..6a1b83f53 --- /dev/null +++ b/src/iceberg/update/snapshot_update.h @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +namespace iceberg { + +/// \brief Base class for operations that produce snapshots. +/// +/// This class provides common functionality for creating new snapshots, +/// including manifest list writing and cleanup. +class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { + public: + /// \brief Result of applying a snapshot update + struct ApplyResult { + std::shared_ptr snapshot; + std::string target_branch; + bool stage_only = false; + }; + + ~SnapshotUpdate() override = default; + + /// \brief Set a callback to delete files instead of the table's default. + /// + /// \param delete_func A function used to delete file locations + /// \return Reference to this for method chaining + auto& DeleteWith(this auto& self, + std::function delete_func) { + self.delete_func_ = std::move(delete_func); + return self; + } + + /// \brief Stage a snapshot in table metadata, but not update the current snapshot id. + /// + /// \return Reference to this for method chaining + auto& StageOnly(this auto& self) { + self.stage_only_ = true; + return self; + } + + /// \brief Apply the update's changes to create a new snapshot. + /// + /// This method validates the changes, applies them to the metadata, + /// and creates a new snapshot without committing it. The snapshot + /// is stored internally and can be accessed after Apply() succeeds. + /// + /// \return A result containing the new snapshot, or an error + Result Apply(); + + /// \brief Finalize the snapshot update, cleaning up any uncommitted files. + Status Finalize() override; + + protected: + explicit SnapshotUpdate(std::shared_ptr transaction); + + /// \brief Write data manifests for the given data files + /// + /// \param data_files The data files to write + /// \param spec The partition spec to use + /// \return A vector of manifest files + Result> WriteDataManifests( + const std::vector& data_files, + const std::shared_ptr& spec); + + /// \brief Write delete manifests for the given delete files + /// + /// \param delete_files The delete files to write + /// \param spec The partition spec to use + /// \return A vector of manifest files + Result> WriteDeleteManifests( + const std::vector& delete_files, + const std::shared_ptr& spec); + + Status SetTargetBranch(const std::string& branch); + const std::string& target_branch() const { return target_branch_; } + + bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; } + const std::string& commit_uuid() const { return commit_uuid_; } + int32_t manifest_count() const { return manifest_count_; } + int32_t attempt() const { return attempt_; } + int64_t target_manifest_size_bytes() const { return target_manifest_size_bytes_; } + + /// \brief Clean up any uncommitted manifests that were created. + /// + /// Manifests may not be committed if Apply is called multiple times + /// because a commit conflict has occurred. Implementations may keep + /// around manifests because the same changes will be made by both + /// Apply calls. This method instructs the implementation to clean up + /// those manifests and passes the paths of the manifests that were + /// actually committed. + /// + /// \param committed A set of manifest paths that were actually committed + virtual void CleanUncommitted(const std::unordered_set& committed) = 0; + + /// \brief A string that describes the action that produced the new snapshot. + /// + /// \return A string operation name + virtual std::string operation() = 0; + + /// \brief Validate the current metadata. + /// + /// Child operations can override this to add custom validation. + /// + /// \param current_metadata Current table metadata to validate + /// \param snapshot Ending snapshot on the lineage which is being validated + virtual Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) { + return {}; + }; + + /// \brief Apply the update's changes to the given metadata and snapshot. + /// + /// \param metadata_to_update The base table metadata to apply changes to + /// \param snapshot Snapshot to apply the changes to + /// \return A vector of manifest files for the new snapshot + virtual Result> Apply( + const TableMetadata& metadata_to_update, + const std::shared_ptr& snapshot) = 0; + + /// \brief Get the summary map for this operation. + /// + /// \return A map of summary properties + virtual std::unordered_map Summary() = 0; + + /// \brief Check if cleanup should happen after commit + /// + /// \return True if cleanup should happen after commit + virtual bool CleanupAfterCommit() const { return true; } + + int64_t SnapshotId(); + + private: + /// \brief Returns the snapshot summary from the implementation and updates totals. + Result> ComputeSummary( + const TableMetadata& previous); + + /// \brief Clean up all uncommitted files + void CleanAll(); + + /// \brief Delete a file using the configured delete function + Status DeleteFile(const std::string& path); + + /// \brief Get the path for a manifest list file + std::string ManifestListPath(); + + /// \brief Get the path for a manifest file + std::string ManifestPath(); + + std::shared_ptr spec_; + std::shared_ptr schema_; + + // For format version > 1, inheritance is enabled by default + bool can_inherit_snapshot_id_{true}; + std::string commit_uuid_; + int32_t manifest_count_{0}; + int32_t attempt_{0}; + std::vector manifest_lists_; + int64_t target_manifest_size_bytes_; + std::optional snapshot_id_{std::nullopt}; + bool stage_only_{false}; + std::function delete_func_; + std::string target_branch_{SnapshotRef::kMainBranch}; + + std::shared_ptr staged_snapshot_; + + // Cache for enriched ManifestFile instances to avoid regenerating them on retries + std::unordered_map enriched_manifest_cache_; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/update_partition_spec.cc b/src/iceberg/update/update_partition_spec.cc index ffea1e099..54c3dc60a 100644 --- a/src/iceberg/update/update_partition_spec.cc +++ b/src/iceberg/update/update_partition_spec.cc @@ -47,11 +47,10 @@ Result> UpdatePartitionSpec::Make( UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr transaction) : PendingUpdate(std::move(transaction)) { - const TableMetadata& base_metadata = transaction_->current(); - format_version_ = base_metadata.format_version; + format_version_ = base().format_version; // Get the current/default partition spec - auto spec_result = base_metadata.PartitionSpec(); + auto spec_result = base().PartitionSpec(); if (!spec_result.has_value()) { AddError(spec_result.error()); return; @@ -59,15 +58,15 @@ UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr transactio spec_ = std::move(spec_result.value()); // Get the current schema - auto schema_result = base_metadata.Schema(); + auto schema_result = base().Schema(); if (!schema_result.has_value()) { AddError(schema_result.error()); return; } schema_ = std::move(schema_result.value()); - last_assigned_partition_id_ = std::max(base_metadata.last_partition_id, - PartitionSpec::kLegacyPartitionDataIdStart - 1); + last_assigned_partition_id_ = + std::max(base().last_partition_id, PartitionSpec::kLegacyPartitionDataIdStart - 1); name_to_field_ = IndexSpecByName(*spec_); transform_to_field_ = IndexSpecByTransform(*spec_); @@ -433,18 +432,16 @@ UpdatePartitionSpec::IndexSpecByTransform(const PartitionSpec& spec) { } void UpdatePartitionSpec::BuildHistoricalFieldsIndex() { - const TableMetadata& base_metadata = transaction_->current(); - // Count total fields across all specs to reserve capacity size_t total_fields = 0; - for (const auto& partition_spec : base_metadata.partition_specs) { + for (const auto& partition_spec : base().partition_specs) { total_fields += partition_spec->fields().size(); } historical_fields_.reserve(total_fields); // Index all fields from all historical partition specs // Later specs override earlier ones for the same (source_id, transform) key - for (const auto& partition_spec : base_metadata.partition_specs) { + for (const auto& partition_spec : base().partition_specs) { for (const auto& field : partition_spec->fields()) { TransformKey key{field.source_id(), field.transform()->ToString()}; historical_fields_.emplace(key, field); diff --git a/src/iceberg/update/update_properties.cc b/src/iceberg/update/update_properties.cc index ce809c437..fe49df81c 100644 --- a/src/iceberg/update/update_properties.cc +++ b/src/iceberg/update/update_properties.cc @@ -19,10 +19,8 @@ #include "iceberg/update/update_properties.h" -#include #include #include -#include #include "iceberg/metrics_config.h" #include "iceberg/result.h" @@ -31,6 +29,7 @@ #include "iceberg/transaction.h" #include "iceberg/util/error_collector.h" #include "iceberg/util/macros.h" +#include "iceberg/util/string_util.h" namespace iceberg { @@ -70,7 +69,7 @@ UpdateProperties& UpdateProperties::Remove(const std::string& key) { Result UpdateProperties::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); - const auto& current_props = transaction_->current().properties.configs(); + const auto& current_props = base().properties.configs(); std::unordered_map new_properties; std::vector removals; for (const auto& [key, value] : current_props) { @@ -85,15 +84,8 @@ Result UpdateProperties::Apply() { auto iter = new_properties.find(TableProperties::kFormatVersion.key()); if (iter != new_properties.end()) { - int parsed_version = 0; - const auto& val = iter->second; - auto [ptr, ec] = std::from_chars(val.data(), val.data() + val.size(), parsed_version); - - if (ec == std::errc::invalid_argument) { - return InvalidArgument("Invalid format version '{}': not a valid integer", val); - } else if (ec == std::errc::result_out_of_range) { - return InvalidArgument("Format version '{}' is out of range", val); - } + ICEBERG_ASSIGN_OR_RAISE(auto parsed_version, + StringUtils::ParseInt(iter->second)); if (parsed_version > TableMetadata::kSupportedTableFormatVersion) { return InvalidArgument( @@ -105,7 +97,7 @@ Result UpdateProperties::Apply() { updates_.erase(TableProperties::kFormatVersion.key()); } - if (auto schema = transaction_->current().Schema(); schema.has_value()) { + if (auto schema = base().Schema(); schema.has_value()) { ICEBERG_RETURN_UNEXPECTED( MetricsConfig::VerifyReferencedColumns(new_properties, *schema.value())); } diff --git a/src/iceberg/update/update_schema.cc b/src/iceberg/update/update_schema.cc index 0e81c4ad7..327ee0ac6 100644 --- a/src/iceberg/update/update_schema.cc +++ b/src/iceberg/update/update_schema.cc @@ -250,10 +250,8 @@ Result> UpdateSchema::Make( UpdateSchema::UpdateSchema(std::shared_ptr transaction) : PendingUpdate(std::move(transaction)) { - const TableMetadata& base_metadata = transaction_->current(); - // Get the current schema - auto schema_result = base_metadata.Schema(); + auto schema_result = base().Schema(); if (!schema_result.has_value()) { AddError(schema_result.error()); return; @@ -261,7 +259,7 @@ UpdateSchema::UpdateSchema(std::shared_ptr transaction) schema_ = std::move(schema_result.value()); // Initialize last_column_id from base metadata - last_column_id_ = base_metadata.last_column_id; + last_column_id_ = base().last_column_id; // Initialize identifier field names from the current schema auto identifier_names_result = schema_->IdentifierFieldNames(); diff --git a/src/iceberg/update/update_sort_order.cc b/src/iceberg/update/update_sort_order.cc index e3e651d50..c5c7be322 100644 --- a/src/iceberg/update/update_sort_order.cc +++ b/src/iceberg/update/update_sort_order.cc @@ -19,7 +19,6 @@ #include "iceberg/update/update_sort_order.h" -#include #include #include @@ -52,7 +51,7 @@ UpdateSortOrder& UpdateSortOrder::AddSortField(const std::shared_ptr& term ICEBERG_BUILDER_CHECK(term != nullptr, "Term cannot be null"); ICEBERG_BUILDER_CHECK(term->is_unbound(), "Term must be unbound"); - ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema, transaction_->current().Schema()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema, base().Schema()); if (term->kind() == Term::Kind::kReference) { // kReference is treated as identity transform auto named_ref = internal::checked_pointer_cast(term); @@ -99,7 +98,7 @@ Result> UpdateSortOrder::Apply() { // The actual sort order ID will be assigned by TableMetadataBuilder when // the AddSortOrder update is applied. ICEBERG_ASSIGN_OR_RAISE(order, SortOrder::Make(/*sort_id=*/-1, sort_fields_)); - ICEBERG_ASSIGN_OR_RAISE(auto schema, transaction_->current().Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto schema, base().Schema()); ICEBERG_RETURN_UNEXPECTED(order->Validate(*schema)); } return order; diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index 2ec36478a..4395dc274 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -26,6 +26,7 @@ #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/timepoint.h" +#include "iceberg/util/uuid.h" namespace iceberg { @@ -335,4 +336,28 @@ Result> SnapshotUtil::LatestSnapshot( return metadata.SnapshotById(it->second->snapshot_id); } +Result> SnapshotUtil::OptionalLatestSnapshot( + const TableMetadata& metadata, const std::string& branch) { + return LatestSnapshot(metadata, branch) + .or_else([](const auto& error) -> Result> { + if (error.kind == ErrorKind::kNotFound) { + return nullptr; + } + return std::unexpected(error); + }); +} + +int64_t SnapshotUtil::GenerateSnapshotId() { + auto uuid = Uuid::GenerateV7(); + return (uuid.high_bits() ^ uuid.low_bits()) & std::numeric_limits::max(); +} + +int64_t SnapshotUtil::GenerateSnapshotId(const TableMetadata& metadata) { + auto snapshot_id = GenerateSnapshotId(); + while (metadata.SnapshotById(snapshot_id).has_value()) { + snapshot_id = GenerateSnapshotId(); + } + return snapshot_id; +} + } // namespace iceberg diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index 2b11168ed..be5f376da 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -20,6 +20,7 @@ #pragma once #include +#include #include #include #include @@ -255,6 +256,27 @@ class ICEBERG_EXPORT SnapshotUtil { static Result> LatestSnapshot(const TableMetadata& metadata, const std::string& branch); + /// \brief Fetch the snapshot at the head of the given branch in the given table. + /// + /// Like LatestSnapshot, but returns nullopt if the branch does not exist. + /// + /// \param metadata The table metadata + /// \param branch Branch name of the table metadata (empty string means main + /// branch) + /// \return The latest snapshot for the given branch, or nullopt if the branch does not + /// exist + static Result> OptionalLatestSnapshot( + const TableMetadata& metadata, const std::string& branch); + + /// \brief Generate a new snapshot ID. + static int64_t GenerateSnapshotId(); + + /// \brief Generate a new snapshot ID for the given metadata. + /// + /// \param metadata The table metadata + /// \return A new snapshot ID + static int64_t GenerateSnapshotId(const TableMetadata& metadata); + private: /// \brief Helper function to traverse ancestors of a snapshot. /// diff --git a/src/iceberg/util/string_util.h b/src/iceberg/util/string_util.h index 8aa209c94..64d8609de 100644 --- a/src/iceberg/util/string_util.h +++ b/src/iceberg/util/string_util.h @@ -20,10 +20,13 @@ #pragma once #include +#include #include #include +#include #include "iceberg/iceberg_export.h" +#include "iceberg/result.h" namespace iceberg { @@ -54,6 +57,20 @@ class ICEBERG_EXPORT StringUtils { } return count; } + + template + static Result ParseInt(std::string_view str) { + T value = 0; + auto [ptr, ec] = std::from_chars(str.data(), str.data() + str.size(), value); + if (ec == std::errc::invalid_argument) [[unlikely]] { + return InvalidArgument("Failed to parse integer from string '{}': invalid argument", + str); + } else if (ec == std::errc::result_out_of_range) [[unlikely]] { + return InvalidArgument( + "Failed to parse integer from string '{}': value out of range", str); + } + return value; + } }; /// \brief Transparent hash function that supports std::string_view as lookup key diff --git a/src/iceberg/util/timepoint.cc b/src/iceberg/util/timepoint.cc index 0381e90a6..ed52dddf8 100644 --- a/src/iceberg/util/timepoint.cc +++ b/src/iceberg/util/timepoint.cc @@ -60,4 +60,11 @@ std::string FormatTimePointMs(TimePointMs time_point_ms) { return oss.str(); } +TimePointMs CurrentTimePointMs() { + auto now = std::chrono::system_clock::now(); + auto duration_since_epoch = now.time_since_epoch(); + return TimePointMs{ + std::chrono::duration_cast(duration_since_epoch)}; +} + } // namespace iceberg diff --git a/src/iceberg/util/timepoint.h b/src/iceberg/util/timepoint.h index 6052c94ae..ed303e1fb 100644 --- a/src/iceberg/util/timepoint.h +++ b/src/iceberg/util/timepoint.h @@ -49,4 +49,7 @@ ICEBERG_EXPORT int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns); /// \brief Returns a human-readable string representation of a TimePointMs ICEBERG_EXPORT std::string FormatTimePointMs(TimePointMs time_point_ms); +/// \brief Returns a time point in milliseconds that represents the current system time +ICEBERG_EXPORT TimePointMs CurrentTimePointMs(); + } // namespace iceberg diff --git a/src/iceberg/util/uuid.cc b/src/iceberg/util/uuid.cc index 9322deb93..cc76095a9 100644 --- a/src/iceberg/util/uuid.cc +++ b/src/iceberg/util/uuid.cc @@ -217,4 +217,16 @@ std::string Uuid::ToString() const { data_[15]); } +int64_t Uuid::high_bits() const { + int64_t result; + std::memcpy(&result, data_.data(), 8); + return result; +} + +int64_t Uuid::low_bits() const { + int64_t result; + std::memcpy(&result, data_.data() + 8, 8); + return result; +} + } // namespace iceberg diff --git a/src/iceberg/util/uuid.h b/src/iceberg/util/uuid.h index 64db7c5d6..69ac10fc7 100644 --- a/src/iceberg/util/uuid.h +++ b/src/iceberg/util/uuid.h @@ -78,6 +78,9 @@ class ICEBERG_EXPORT Uuid : public util::Formattable { return lhs.data_ == rhs.data_; } + int64_t high_bits() const; + int64_t low_bits() const; + private: std::array data_; };