Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions include/lance/lance.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,113 @@ int32_t lance_dataset_update(
uint64_t* out_num_updated
);

/* ─── lance_dataset_merge_insert ──────────────────────────────────────────── */

/**
* Behavior when a target row matches a source row on the join keys.
* Defaults are zero-valued so a zero-initialized LanceMergeInsertParams is a
* valid find-or-create configuration.
*/
typedef enum {
/* Keep the target row unchanged (find-or-create). Default. */
LANCE_MERGE_WHEN_MATCHED_DO_NOTHING = 0,
/* Replace the target row with the source row (upsert). */
LANCE_MERGE_WHEN_MATCHED_UPDATE_ALL = 1,
/* Replace only when an SQL filter evaluates true; requires
when_matched_expr. */
LANCE_MERGE_WHEN_MATCHED_UPDATE_IF = 2,
/* Fail the operation on any match. */
LANCE_MERGE_WHEN_MATCHED_FAIL = 3,
/* Drop the matching target row without inserting anything. */
LANCE_MERGE_WHEN_MATCHED_DELETE = 4,
} LanceMergeWhenMatched;

/** Behavior when a source row has no matching target row. */
typedef enum {
/* Insert the source row. Default. */
LANCE_MERGE_WHEN_NOT_MATCHED_INSERT_ALL = 0,
/* Discard the source row. */
LANCE_MERGE_WHEN_NOT_MATCHED_DO_NOTHING = 1,
} LanceMergeWhenNotMatched;

/** Behavior when a target row has no matching source row. */
typedef enum {
/* Keep the target row. Default. */
LANCE_MERGE_WHEN_NOT_MATCHED_BY_SOURCE_KEEP = 0,
/* Delete every unmatched target row. */
LANCE_MERGE_WHEN_NOT_MATCHED_BY_SOURCE_DELETE = 1,
/* Delete unmatched target rows that satisfy an SQL filter; requires
when_not_matched_by_source_expr. */
LANCE_MERGE_WHEN_NOT_MATCHED_BY_SOURCE_DELETE_IF = 2,
} LanceMergeWhenNotMatchedBySource;

/**
* Tunable parameters for lance_dataset_merge_insert. Pass NULL to use the
* find-or-create defaults (DO_NOTHING / INSERT_ALL / KEEP).
*
* Expression strings are read only when the corresponding mode requires
* them; spurious non-NULL pointers on other modes are rejected so the
* contract is unambiguous.
*/
typedef struct LanceMergeInsertParams {
/* LanceMergeWhenMatched discriminant. */
int32_t when_matched;
/* SQL filter for UPDATE_IF; NULL otherwise. Empty string is rejected. */
const char* when_matched_expr;
/* LanceMergeWhenNotMatched discriminant. */
int32_t when_not_matched;
/* LanceMergeWhenNotMatchedBySource discriminant. */
int32_t when_not_matched_by_source;
/* SQL filter for DELETE_IF; NULL otherwise. Empty string is rejected. */
const char* when_not_matched_by_source_expr;
} LanceMergeInsertParams;

/** Per-call merge statistics returned via the optional out parameter. */
typedef struct LanceMergeInsertResult {
uint64_t num_inserted_rows;
uint64_t num_updated_rows;
uint64_t num_deleted_rows;
} LanceMergeInsertResult;

/**
* Merge `source` into `dataset` keyed on `on_columns`, committing a new
* manifest. Mirrors SQL MERGE; the default parameters yield a find-or-create
* (insert rows that do not match an existing key).
*
* Mutates `dataset` in place — the same handle remains valid afterward and
* sees the new version. Scanners already in flight against this dataset
* keep their pre-merge snapshot view.
*
* @param dataset Open dataset (not consumed). Must not be NULL.
* @param on_columns Join keys. Length = `num_on_columns`. Must be
* non-NULL when `num_on_columns > 0`; each entry
* must be a non-NULL, non-empty C string. Column
* names are matched case-insensitively (upstream).
* @param num_on_columns Length of `on_columns`. Must be >= 1.
* @param source Arrow C Data Interface stream of source rows.
* Consumed by this call. Its schema must be
* compatible with the dataset schema (full match or
* a subschema).
* @param params Tunable parameters. Pass NULL for find-or-create
* defaults.
* @param out_result Optional. If non-NULL, on success receives the
* per-call insert/update/delete counts. On error the
* slot is left unchanged — do not read it.
* @return 0 on success, -1 on error. Error codes:
* LANCE_ERR_INVALID_ARGUMENT for NULL/empty args, out-of-range mode
* discriminants, missing or extraneous expression strings, malformed
* SQL, unknown columns, schema incompatibility, and no-op
* configurations; LANCE_ERR_COMMIT_CONFLICT for a concurrent writer.
*/
int32_t lance_dataset_merge_insert(
LanceDataset* dataset,
const char* const* on_columns,
size_t num_on_columns,
struct ArrowArrayStream* source,
const LanceMergeInsertParams* params,
LanceMergeInsertResult* out_result
);

/**
* Export the dataset schema via Arrow C Data Interface.
* @param out Pointer to caller-allocated ArrowSchema struct
Expand Down
42 changes: 42 additions & 0 deletions include/lance/lance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,48 @@ class Dataset {
return num_updated;
}

/// Merge `source` into this dataset keyed on `on_columns`, committing a
/// new manifest. Defaults to find-or-create semantics (insert rows that
/// do not match an existing key). Returns the per-call insert / update /
/// delete counts.
///
/// `on_columns` must be non-empty. `params` controls match behavior; pass
/// `nullptr` for find-or-create defaults. `source` is consumed.
/// Throws lance::Error on failure (empty key, schema mismatch, malformed
/// SQL, missing expression for *_IF mode, commit conflict, ...).
LanceMergeInsertResult merge_insert(
const std::vector<std::string>& on_columns,
ArrowArrayStream* source,
const LanceMergeInsertParams* params = nullptr) {
std::vector<const char*> col_ptrs;
col_ptrs.reserve(on_columns.size());
for (const auto& c : on_columns) {
col_ptrs.push_back(c.c_str());
}
LanceMergeInsertResult result{};
if (lance_dataset_merge_insert(
handle_.get(),
col_ptrs.data(),
on_columns.size(),
source,
params,
&result) != 0) {
check_error();
}
return result;
}

/// Convenience: classic upsert (when_matched=UpdateAll, when_not_matched=InsertAll).
LanceMergeInsertResult upsert(
const std::vector<std::string>& on_columns,
ArrowArrayStream* source) {
LanceMergeInsertParams params{};
params.when_matched = LANCE_MERGE_WHEN_MATCHED_UPDATE_ALL;
params.when_not_matched = LANCE_MERGE_WHEN_NOT_MATCHED_INSERT_ALL;
params.when_not_matched_by_source = LANCE_MERGE_WHEN_NOT_MATCHED_BY_SOURCE_KEEP;
return merge_insert(on_columns, source, &params);
}

/// Export the schema as an Arrow C Data Interface struct.
void schema(ArrowSchema* out) const {
if (lance_dataset_schema(handle_.get(), out) != 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod error;
mod fragment_writer;
mod helpers;
mod index;
mod merge_insert;
mod restore;
pub mod runtime;
mod scanner;
Expand All @@ -39,6 +40,7 @@ pub use error::{
};
pub use fragment_writer::*;
pub use index::*;
pub use merge_insert::*;
pub use restore::*;
pub use scanner::*;
pub use update::*;
Expand Down
Loading
Loading