diff --git a/include/lance/lance.h b/include/lance/lance.h index d5c6e5a..a22756f 100644 --- a/include/lance/lance.h +++ b/include/lance/lance.h @@ -283,6 +283,113 @@ int32_t lance_dataset_update( uint64_t* out_num_updated ); +/* ─── lance_dataset_merge_insert ──────────────────────────────────────────── */ + +/** + * Behavior when a target row matches a source row on the join keys. + * Defaults are zero-valued so a zero-initialized LanceMergeInsertParams is a + * valid find-or-create configuration. + */ +typedef enum { + /* Keep the target row unchanged (find-or-create). Default. */ + LANCE_MERGE_WHEN_MATCHED_DO_NOTHING = 0, + /* Replace the target row with the source row (upsert). */ + LANCE_MERGE_WHEN_MATCHED_UPDATE_ALL = 1, + /* Replace only when an SQL filter evaluates true; requires + when_matched_expr. */ + LANCE_MERGE_WHEN_MATCHED_UPDATE_IF = 2, + /* Fail the operation on any match. */ + LANCE_MERGE_WHEN_MATCHED_FAIL = 3, + /* Drop the matching target row without inserting anything. */ + LANCE_MERGE_WHEN_MATCHED_DELETE = 4, +} LanceMergeWhenMatched; + +/** Behavior when a source row has no matching target row. */ +typedef enum { + /* Insert the source row. Default. */ + LANCE_MERGE_WHEN_NOT_MATCHED_INSERT_ALL = 0, + /* Discard the source row. */ + LANCE_MERGE_WHEN_NOT_MATCHED_DO_NOTHING = 1, +} LanceMergeWhenNotMatched; + +/** Behavior when a target row has no matching source row. */ +typedef enum { + /* Keep the target row. Default. */ + LANCE_MERGE_WHEN_NOT_MATCHED_BY_SOURCE_KEEP = 0, + /* Delete every unmatched target row. */ + LANCE_MERGE_WHEN_NOT_MATCHED_BY_SOURCE_DELETE = 1, + /* Delete unmatched target rows that satisfy an SQL filter; requires + when_not_matched_by_source_expr. */ + LANCE_MERGE_WHEN_NOT_MATCHED_BY_SOURCE_DELETE_IF = 2, +} LanceMergeWhenNotMatchedBySource; + +/** + * Tunable parameters for lance_dataset_merge_insert. Pass NULL to use the + * find-or-create defaults (DO_NOTHING / INSERT_ALL / KEEP). + * + * Expression strings are read only when the corresponding mode requires + * them; spurious non-NULL pointers on other modes are rejected so the + * contract is unambiguous. + */ +typedef struct LanceMergeInsertParams { + /* LanceMergeWhenMatched discriminant. */ + int32_t when_matched; + /* SQL filter for UPDATE_IF; NULL otherwise. Empty string is rejected. */ + const char* when_matched_expr; + /* LanceMergeWhenNotMatched discriminant. */ + int32_t when_not_matched; + /* LanceMergeWhenNotMatchedBySource discriminant. */ + int32_t when_not_matched_by_source; + /* SQL filter for DELETE_IF; NULL otherwise. Empty string is rejected. */ + const char* when_not_matched_by_source_expr; +} LanceMergeInsertParams; + +/** Per-call merge statistics returned via the optional out parameter. */ +typedef struct LanceMergeInsertResult { + uint64_t num_inserted_rows; + uint64_t num_updated_rows; + uint64_t num_deleted_rows; +} LanceMergeInsertResult; + +/** + * Merge `source` into `dataset` keyed on `on_columns`, committing a new + * manifest. Mirrors SQL MERGE; the default parameters yield a find-or-create + * (insert rows that do not match an existing key). + * + * Mutates `dataset` in place — the same handle remains valid afterward and + * sees the new version. Scanners already in flight against this dataset + * keep their pre-merge snapshot view. + * + * @param dataset Open dataset (not consumed). Must not be NULL. + * @param on_columns Join keys. Length = `num_on_columns`. Must be + * non-NULL when `num_on_columns > 0`; each entry + * must be a non-NULL, non-empty C string. Column + * names are matched case-insensitively (upstream). + * @param num_on_columns Length of `on_columns`. Must be >= 1. + * @param source Arrow C Data Interface stream of source rows. + * Consumed by this call. Its schema must be + * compatible with the dataset schema (full match or + * a subschema). + * @param params Tunable parameters. Pass NULL for find-or-create + * defaults. + * @param out_result Optional. If non-NULL, on success receives the + * per-call insert/update/delete counts. On error the + * slot is left unchanged — do not read it. + * @return 0 on success, -1 on error. Error codes: + * LANCE_ERR_INVALID_ARGUMENT for NULL/empty args, out-of-range mode + * discriminants, missing or extraneous expression strings, malformed + * SQL, unknown columns, schema incompatibility, and no-op + * configurations; LANCE_ERR_COMMIT_CONFLICT for a concurrent writer. + */ +int32_t lance_dataset_merge_insert( + LanceDataset* dataset, + const char* const* on_columns, + size_t num_on_columns, + struct ArrowArrayStream* source, + const LanceMergeInsertParams* params, + LanceMergeInsertResult* out_result +); + /** * Export the dataset schema via Arrow C Data Interface. * @param out Pointer to caller-allocated ArrowSchema struct diff --git a/include/lance/lance.hpp b/include/lance/lance.hpp index 8b03a91..bf029e9 100644 --- a/include/lance/lance.hpp +++ b/include/lance/lance.hpp @@ -381,6 +381,48 @@ class Dataset { return num_updated; } + /// Merge `source` into this dataset keyed on `on_columns`, committing a + /// new manifest. Defaults to find-or-create semantics (insert rows that + /// do not match an existing key). Returns the per-call insert / update / + /// delete counts. + /// + /// `on_columns` must be non-empty. `params` controls match behavior; pass + /// `nullptr` for find-or-create defaults. `source` is consumed. + /// Throws lance::Error on failure (empty key, schema mismatch, malformed + /// SQL, missing expression for *_IF mode, commit conflict, ...). + LanceMergeInsertResult merge_insert( + const std::vector& on_columns, + ArrowArrayStream* source, + const LanceMergeInsertParams* params = nullptr) { + std::vector col_ptrs; + col_ptrs.reserve(on_columns.size()); + for (const auto& c : on_columns) { + col_ptrs.push_back(c.c_str()); + } + LanceMergeInsertResult result{}; + if (lance_dataset_merge_insert( + handle_.get(), + col_ptrs.data(), + on_columns.size(), + source, + params, + &result) != 0) { + check_error(); + } + return result; + } + + /// Convenience: classic upsert (when_matched=UpdateAll, when_not_matched=InsertAll). + LanceMergeInsertResult upsert( + const std::vector& on_columns, + ArrowArrayStream* source) { + LanceMergeInsertParams params{}; + params.when_matched = LANCE_MERGE_WHEN_MATCHED_UPDATE_ALL; + params.when_not_matched = LANCE_MERGE_WHEN_NOT_MATCHED_INSERT_ALL; + params.when_not_matched_by_source = LANCE_MERGE_WHEN_NOT_MATCHED_BY_SOURCE_KEEP; + return merge_insert(on_columns, source, ¶ms); + } + /// Export the schema as an Arrow C Data Interface struct. void schema(ArrowSchema* out) const { if (lance_dataset_schema(handle_.get(), out) != 0) { diff --git a/src/lib.rs b/src/lib.rs index 112a10e..328214d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ mod error; mod fragment_writer; mod helpers; mod index; +mod merge_insert; mod restore; pub mod runtime; mod scanner; @@ -39,6 +40,7 @@ pub use error::{ }; pub use fragment_writer::*; pub use index::*; +pub use merge_insert::*; pub use restore::*; pub use scanner::*; pub use update::*; diff --git a/src/merge_insert.rs b/src/merge_insert.rs new file mode 100644 index 0000000..c78b20b --- /dev/null +++ b/src/merge_insert.rs @@ -0,0 +1,472 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Merge-insert C API: SQL-MERGE-style upsert from an Arrow record-batch +//! stream into an existing dataset, committing a new manifest. +//! +//! Mutates the dataset in place under an exclusive write lock; existing +//! scanners that already cloned the inner Arc keep their snapshot view. + +use std::ffi::c_char; +use std::sync::Arc; + +use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; +use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WhenNotMatchedBySource}; +use lance_core::Result; + +use crate::dataset::LanceDataset; +use crate::error::ffi_try; +use crate::helpers; +use crate::runtime::block_on; + +/// Behavior when a target row matches a source row on the join keys. +/// +/// Discriminants are pinned for ABI stability. Out-of-range values stored on +/// the FFI side are rejected with `LANCE_ERR_INVALID_ARGUMENT` rather than +/// being transmuted into a `repr(C)` enum (which would be UB). +#[repr(C)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum LanceMergeWhenMatched { + /// Keep the target row unchanged (find-or-create). This is the default. + DoNothing = 0, + /// Replace the target row with the source row (upsert). + UpdateAll = 1, + /// Replace the target row only when an SQL filter evaluates true. + /// Requires `when_matched_expr` on `LanceMergeInsertParams`. + UpdateIf = 2, + /// Fail the operation on any match. + Fail = 3, + /// Drop the matching target row without inserting anything in its place. + Delete = 4, +} + +impl LanceMergeWhenMatched { + fn from_raw(raw: i32) -> Result { + match raw { + 0 => Ok(Self::DoNothing), + 1 => Ok(Self::UpdateAll), + 2 => Ok(Self::UpdateIf), + 3 => Ok(Self::Fail), + 4 => Ok(Self::Delete), + other => Err(lance_core::Error::InvalidInput { + source: format!( + "invalid when_matched {other}; expected 0..=4 (see LanceMergeWhenMatched)" + ) + .into(), + location: snafu::location!(), + }), + } + } +} + +/// Behavior when a source row has no matching target row. +#[repr(C)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum LanceMergeWhenNotMatched { + /// Insert the source row (the default). + InsertAll = 0, + /// Discard the source row. + DoNothing = 1, +} + +impl LanceMergeWhenNotMatched { + fn from_raw(raw: i32) -> Result { + match raw { + 0 => Ok(Self::InsertAll), + 1 => Ok(Self::DoNothing), + other => Err(lance_core::Error::InvalidInput { + source: format!( + "invalid when_not_matched {other}; expected 0 or 1 (see LanceMergeWhenNotMatched)" + ) + .into(), + location: snafu::location!(), + }), + } + } +} + +/// Behavior when a target row has no matching source row. +#[repr(C)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum LanceMergeWhenNotMatchedBySource { + /// Keep the target row (the default). + Keep = 0, + /// Delete every unmatched target row. + Delete = 1, + /// Delete unmatched target rows that satisfy an SQL filter. Requires + /// `when_not_matched_by_source_expr` on `LanceMergeInsertParams`. + DeleteIf = 2, +} + +impl LanceMergeWhenNotMatchedBySource { + fn from_raw(raw: i32) -> Result { + match raw { + 0 => Ok(Self::Keep), + 1 => Ok(Self::Delete), + 2 => Ok(Self::DeleteIf), + other => Err(lance_core::Error::InvalidInput { + source: format!( + "invalid when_not_matched_by_source {other}; expected 0..=2 (see LanceMergeWhenNotMatchedBySource)" + ) + .into(), + location: snafu::location!(), + }), + } + } +} + +/// Tunable parameters for `lance_dataset_merge_insert`. Pass NULL to use the +/// upstream find-or-create defaults (`DoNothing` / `InsertAll` / `Keep`). +/// +/// The struct is `#[repr(C)]` and ABI-stable within a minor version. +/// Expression strings are read only when the corresponding mode requires +/// them; spurious non-NULL pointers on other modes are rejected to keep the +/// contract unambiguous. +#[repr(C)] +pub struct LanceMergeInsertParams { + /// `LanceMergeWhenMatched` discriminant. Default: `DoNothing` (0). + pub when_matched: i32, + /// SQL filter for `UpdateIf`. Required iff `when_matched == UpdateIf`, + /// forbidden otherwise. Must not be empty when set. + pub when_matched_expr: *const c_char, + /// `LanceMergeWhenNotMatched` discriminant. Default: `InsertAll` (0). + pub when_not_matched: i32, + /// `LanceMergeWhenNotMatchedBySource` discriminant. Default: `Keep` (0). + pub when_not_matched_by_source: i32, + /// SQL filter for `DeleteIf`. Required iff + /// `when_not_matched_by_source == DeleteIf`, forbidden otherwise. Must + /// not be empty when set. + pub when_not_matched_by_source_expr: *const c_char, +} + +/// Per-call merge statistics returned via the optional out parameter. +#[repr(C)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct LanceMergeInsertResult { + /// Rows that did not match any target row and were inserted. + pub num_inserted_rows: u64, + /// Target rows that matched a source row and were updated in place. + pub num_updated_rows: u64, + /// Target rows deleted as a result of the merge (e.g. `WhenMatched::Delete` + /// or `WhenNotMatchedBySource::Delete[If]`). + pub num_deleted_rows: u64, +} + +/// Resolved merge-insert parameters with caller strings copied into owned +/// `String`s so they outlive the FFI argument lifetime. +struct ResolvedParams { + when_matched: WhenMatched, + when_not_matched: WhenNotMatched, + when_not_matched_by_source: ResolvedWhenNotMatchedBySource, +} + +enum ResolvedWhenNotMatchedBySource { + Keep, + Delete, + DeleteIf(String), +} + +impl ResolvedParams { + fn defaults() -> Self { + Self { + when_matched: WhenMatched::DoNothing, + when_not_matched: WhenNotMatched::InsertAll, + when_not_matched_by_source: ResolvedWhenNotMatchedBySource::Keep, + } + } +} + +/// Merge `source` into `dataset` keyed on `on_columns`, committing a new +/// manifest. +/// +/// - `dataset`: Open dataset (mutated; same handle remains valid afterward). +/// Must not be NULL. +/// - `on_columns` / `num_on_columns`: Join keys. Must be non-NULL with +/// `num_on_columns >= 1`; each entry must be a non-NULL, non-empty C +/// string. Column names are matched case-insensitively (upstream +/// behavior). +/// - `source`: Arrow C Data Interface stream of source rows. Consumed by +/// this call — the caller must not use it again on any return path. Its +/// schema must be compatible with the dataset schema (full match or a +/// subschema; upstream rejects mismatches with `INVALID_ARGUMENT`). +/// - `params`: Optional. NULL uses the find-or-create defaults +/// (`DoNothing` / `InsertAll` / `Keep`). +/// - `out_result`: Optional. If non-NULL, on success receives the +/// `LanceMergeInsertResult` for this call. On error the slot is untouched. +/// +/// Returns 0 on success, -1 on error. Error codes: +/// `LANCE_ERR_INVALID_ARGUMENT` for NULL/empty args, out-of-range mode +/// discriminants, missing/extraneous expression strings, malformed SQL, +/// unknown columns, schema incompatibility, or no-op configurations; +/// `LANCE_ERR_COMMIT_CONFLICT` for a concurrent writer. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_merge_insert( + dataset: *mut LanceDataset, + on_columns: *const *const c_char, + num_on_columns: usize, + source: *mut FFI_ArrowArrayStream, + params: *const LanceMergeInsertParams, + out_result: *mut LanceMergeInsertResult, +) -> i32 { + ffi_try!( + unsafe { + merge_insert_inner( + dataset, + on_columns, + num_on_columns, + source, + params, + out_result, + ) + }, + neg + ) +} + +unsafe fn merge_insert_inner( + dataset: *mut LanceDataset, + on_columns: *const *const c_char, + num_on_columns: usize, + source: *mut FFI_ArrowArrayStream, + params: *const LanceMergeInsertParams, + out_result: *mut LanceMergeInsertResult, +) -> Result { + // The stream NULL check is the only validation that runs *before* the + // stream is consumed; once `from_raw` succeeds, every other return path + // drops `reader`, which fires the FFI release callback. Reordering the + // dataset/on-columns checks ahead of `from_raw` would leak the stream on + // those paths and break the documented "consumed on every return" + // contract (mirrors `lance_dataset_write`). + if source.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "source stream must not be NULL".into(), + location: snafu::location!(), + }); + } + + // SAFETY: `source` is non-NULL (checked above) and the caller guarantees + // it points to an initialized, properly-aligned `FFI_ArrowArrayStream` + // owned by them. `from_raw` performs a `ptr::replace` that transfers + // ownership into the returned reader, zeroing the caller's release + // callback so it cannot be released twice. + let reader = unsafe { ArrowArrayStreamReader::from_raw(source) }.map_err(|e| { + lance_core::Error::InvalidInput { + source: e.to_string().into(), + location: snafu::location!(), + } + })?; + + if dataset.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "dataset must not be NULL".into(), + location: snafu::location!(), + }); + } + if num_on_columns == 0 { + return Err(lance_core::Error::InvalidInput { + source: "num_on_columns must be >= 1".into(), + location: snafu::location!(), + }); + } + if on_columns.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "on_columns must not be NULL when num_on_columns > 0".into(), + location: snafu::location!(), + }); + } + + // Materialize key columns up front so a precise per-index error fires + // before the write lock is taken. + let mut keys: Vec = Vec::with_capacity(num_on_columns); + for i in 0..num_on_columns { + // SAFETY: `on_columns` is non-NULL (checked above) and the caller + // guarantees the array has at least `num_on_columns` entries. + let entry_ptr = unsafe { *on_columns.add(i) }; + // SAFETY: each entry pointer is either NULL (rejected below) or a + // NUL-terminated C string the caller keeps alive for this call. + let key = unsafe { helpers::parse_c_string(entry_ptr)? } + .filter(|s| !s.is_empty()) + .ok_or_else(|| lance_core::Error::InvalidInput { + source: format!("on_columns[{i}] must not be NULL or empty").into(), + location: snafu::location!(), + })?; + keys.push(key.to_string()); + } + + // SAFETY: `params` is either NULL (use defaults) or points to a valid + // `LanceMergeInsertParams` for the duration of this call. + let resolved = unsafe { resolve_params(params)? }; + + // SAFETY: `dataset` is non-NULL (checked above) and the caller guarantees + // it points to a live `LanceDataset` not aliased mutably elsewhere. + let ds = unsafe { &*dataset }; + let stats = ds.with_mut(|d| { + block_on(async { + // MergeInsertBuilder takes `Arc` (snapshot-based), so + // mirror what update.rs does: clone for the builder, then publish + // the new dataset back into `*d` after the commit lands. + let snapshot = Arc::new(d.clone()); + + let when_not_matched_by_source = match resolved.when_not_matched_by_source { + ResolvedWhenNotMatchedBySource::Keep => WhenNotMatchedBySource::Keep, + ResolvedWhenNotMatchedBySource::Delete => WhenNotMatchedBySource::Delete, + ResolvedWhenNotMatchedBySource::DeleteIf(expr) => { + // `delete_if` parses the SQL against the dataset's schema + // and surfaces parse / unknown-column errors as + // InvalidInput → INVALID_ARGUMENT at the FFI boundary. + WhenNotMatchedBySource::delete_if(&snapshot, &expr)? + } + }; + + let mut builder = MergeInsertBuilder::try_new(snapshot, keys)?; + builder + .when_matched(resolved.when_matched) + .when_not_matched(resolved.when_not_matched) + .when_not_matched_by_source(when_not_matched_by_source); + let job = builder.try_build()?; + let (new_dataset, stats) = job.execute_reader(reader).await?; + *d = Arc::try_unwrap(new_dataset.clone()).unwrap_or_else(|arc| (*arc).clone()); + Ok::<_, lance_core::Error>(stats) + }) + })?; + + if !out_result.is_null() { + // SAFETY: caller guarantees `out_result` (when non-NULL) points to + // caller-owned, writable storage of size `sizeof(LanceMergeInsertResult)`. + // We only write on success; on the error paths above the slot stays + // untouched per the documented contract. + unsafe { + *out_result = LanceMergeInsertResult { + num_inserted_rows: stats.num_inserted_rows, + num_updated_rows: stats.num_updated_rows, + num_deleted_rows: stats.num_deleted_rows, + }; + } + } + Ok(0) +} + +/// Translate caller-supplied `LanceMergeInsertParams` (or NULL) into the +/// upstream behavior enums, reading every C string by shared reference. +unsafe fn resolve_params(params: *const LanceMergeInsertParams) -> Result { + if params.is_null() { + return Ok(ResolvedParams::defaults()); + } + + // SAFETY: `params` is non-NULL (checked above) and the caller guarantees + // it points to a properly-initialized `LanceMergeInsertParams` valid for + // the duration of this call. We read by shared reference. + let params = unsafe { &*params }; + + let when_matched_kind = LanceMergeWhenMatched::from_raw(params.when_matched)?; + let when_not_matched = LanceMergeWhenNotMatched::from_raw(params.when_not_matched)?; + let when_not_matched_by_source_kind = + LanceMergeWhenNotMatchedBySource::from_raw(params.when_not_matched_by_source)?; + + // SAFETY: pointer is either NULL (no string) or a NUL-terminated C string + // valid for this call; `parse_c_string` reads by shared reference. + let when_matched_expr = + unsafe { read_optional_expr(params.when_matched_expr, "when_matched_expr")? }; + let when_not_matched_by_source_expr = unsafe { + read_optional_expr( + params.when_not_matched_by_source_expr, + "when_not_matched_by_source_expr", + )? + }; + + let when_matched = match when_matched_kind { + LanceMergeWhenMatched::DoNothing => { + reject_unused_expr("when_matched", "DoNothing", &when_matched_expr)?; + WhenMatched::DoNothing + } + LanceMergeWhenMatched::UpdateAll => { + reject_unused_expr("when_matched", "UpdateAll", &when_matched_expr)?; + WhenMatched::UpdateAll + } + LanceMergeWhenMatched::UpdateIf => { + let expr = when_matched_expr.ok_or_else(|| lance_core::Error::InvalidInput { + source: "when_matched=UpdateIf requires when_matched_expr".into(), + location: snafu::location!(), + })?; + // Upstream `WhenMatched::update_if` defers parsing until execute + // time; we only forward the string here. + WhenMatched::UpdateIf(expr) + } + LanceMergeWhenMatched::Fail => { + reject_unused_expr("when_matched", "Fail", &when_matched_expr)?; + WhenMatched::Fail + } + LanceMergeWhenMatched::Delete => { + reject_unused_expr("when_matched", "Delete", &when_matched_expr)?; + WhenMatched::Delete + } + }; + + let when_not_matched = match when_not_matched { + LanceMergeWhenNotMatched::InsertAll => WhenNotMatched::InsertAll, + LanceMergeWhenNotMatched::DoNothing => WhenNotMatched::DoNothing, + }; + + let when_not_matched_by_source = match when_not_matched_by_source_kind { + LanceMergeWhenNotMatchedBySource::Keep => { + reject_unused_expr( + "when_not_matched_by_source", + "Keep", + &when_not_matched_by_source_expr, + )?; + ResolvedWhenNotMatchedBySource::Keep + } + LanceMergeWhenNotMatchedBySource::Delete => { + reject_unused_expr( + "when_not_matched_by_source", + "Delete", + &when_not_matched_by_source_expr, + )?; + ResolvedWhenNotMatchedBySource::Delete + } + LanceMergeWhenNotMatchedBySource::DeleteIf => { + let expr = when_not_matched_by_source_expr.ok_or_else(|| { + lance_core::Error::InvalidInput { + source: + "when_not_matched_by_source=DeleteIf requires when_not_matched_by_source_expr" + .into(), + location: snafu::location!(), + } + })?; + ResolvedWhenNotMatchedBySource::DeleteIf(expr) + } + }; + + Ok(ResolvedParams { + when_matched, + when_not_matched, + when_not_matched_by_source, + }) +} + +unsafe fn read_optional_expr(ptr: *const c_char, field: &str) -> Result> { + // SAFETY: the caller's contract on `LanceMergeInsertParams` requires + // every non-NULL expression pointer to be a NUL-terminated C string + // valid for the call. + let parsed = unsafe { helpers::parse_c_string(ptr)? }; + let Some(s) = parsed else { + return Ok(None); + }; + if s.is_empty() { + return Err(lance_core::Error::InvalidInput { + source: format!("{field} must not be empty").into(), + location: snafu::location!(), + }); + } + Ok(Some(s.to_string())) +} + +fn reject_unused_expr(field: &str, mode: &str, expr: &Option) -> Result<()> { + if expr.is_some() { + return Err(lance_core::Error::InvalidInput { + source: format!("{field}_expr must be NULL when {field}={mode}").into(), + location: snafu::location!(), + }); + } + Ok(()) +} diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index f3f9145..7eb269c 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -4706,3 +4706,1068 @@ fn test_update_out_param_untouched_on_error() { unsafe { lance_dataset_close(ds) }; } + +// =========================================================================== +// lance_dataset_merge_insert +// =========================================================================== + +/// Build a {id, value, label} batch matching `create_large_dataset`'s schema. +fn make_merge_source(rows: &[(i32, f32, &str)]) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Float32, true), + Field::new("label", DataType::Utf8, true), + ])); + let ids: Vec = rows.iter().map(|r| r.0).collect(); + let values: Vec = rows.iter().map(|r| r.1).collect(); + let labels: Vec<&str> = rows.iter().map(|r| r.2).collect(); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(Float32Array::from(values)), + Arc::new(StringArray::from(labels)), + ], + ) + .unwrap() +} + +/// Build a `LanceMergeInsertParams` zero-initialized except for the supplied +/// fields. Helps keep tests readable when only a couple of knobs differ from +/// the find-or-create defaults. +fn merge_params( + when_matched: LanceMergeWhenMatched, + when_not_matched: LanceMergeWhenNotMatched, + when_not_matched_by_source: LanceMergeWhenNotMatchedBySource, +) -> LanceMergeInsertParams { + LanceMergeInsertParams { + when_matched: when_matched as i32, + when_matched_expr: ptr::null(), + when_not_matched: when_not_matched as i32, + when_not_matched_by_source: when_not_matched_by_source as i32, + when_not_matched_by_source_expr: ptr::null(), + } +} + +#[test] +fn test_merge_insert_default_is_find_or_create() { + // Default params (`params=NULL`) should match upstream's find-or-create: + // existing keys are kept untouched; missing keys are inserted. + let (_tmp, uri) = create_large_dataset(10); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + + let source = make_merge_source(&[(5, 999.0, "rewritten"), (200, 12.5, "new_row")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + + let mut result = LanceMergeInsertResult::default(); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ptr::null(), + &mut result, + ) + }; + assert_eq!(rc, 0); + assert_eq!(result.num_inserted_rows, 1); + assert_eq!(result.num_updated_rows, 0); + assert_eq!(result.num_deleted_rows, 0); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 11); + + // id=5 must remain unchanged (DoNothing on match). + let batches = scan_all_rows(ds); + let mut row5_value = None; + for batch in &batches { + let ids = batch + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let values = batch + .column_by_name("value") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + if ids.value(i) == 5 { + row5_value = Some(values.value(i)); + } + } + } + assert_eq!( + row5_value, + Some(2.5), + "id=5 should be unchanged on DoNothing" + ); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_upsert_updates_and_inserts() { + let (_tmp, uri) = create_large_dataset(10); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let source = make_merge_source(&[(5, 999.0, "rewritten"), (200, 12.5, "new_row")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + + let params = merge_params( + LanceMergeWhenMatched::UpdateAll, + LanceMergeWhenNotMatched::InsertAll, + LanceMergeWhenNotMatchedBySource::Keep, + ); + let mut result = LanceMergeInsertResult::default(); + let rc = unsafe { + lance_dataset_merge_insert(ds, on_ptrs.as_ptr(), 1, &mut stream, ¶ms, &mut result) + }; + assert_eq!(rc, 0); + assert_eq!(result.num_inserted_rows, 1); + assert_eq!(result.num_updated_rows, 1); + assert_eq!(result.num_deleted_rows, 0); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 11); + + // id=5 should now read 999.0 / "rewritten"; id=200 should appear with + // the source values; everything else stays as the original generator + // produced (`row_`, value = id * 0.5). + let batches = scan_all_rows(ds); + let mut seen_5 = false; + let mut seen_200 = false; + for batch in &batches { + let ids = batch + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let values = batch + .column_by_name("value") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let labels = batch + .column_by_name("label") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + match ids.value(i) { + 5 => { + assert_eq!(values.value(i), 999.0); + assert_eq!(labels.value(i), "rewritten"); + seen_5 = true; + } + 200 => { + assert_eq!(values.value(i), 12.5); + assert_eq!(labels.value(i), "new_row"); + seen_200 = true; + } + id => { + assert_eq!(values.value(i), id as f32 * 0.5); + assert_eq!(labels.value(i), format!("row_{id}")); + } + } + } + } + assert!(seen_5 && seen_200); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_when_matched_fail_errors_on_match() { + let (_tmp, uri) = create_large_dataset(10); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let source = make_merge_source(&[(5, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + + let params = merge_params( + LanceMergeWhenMatched::Fail, + LanceMergeWhenNotMatched::InsertAll, + LanceMergeWhenNotMatchedBySource::Keep, + ); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + // Dataset is left unchanged on the error path. + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 10); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_when_matched_delete_drops_match() { + let (_tmp, uri) = create_large_dataset(10); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + // Source has matching id=5 and non-matching id=200. With Delete+DoNothing + // the matching row is removed, the non-matching row is dropped. + let source = make_merge_source(&[(5, 0.0, "x"), (200, 0.0, "y")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + + let params = merge_params( + LanceMergeWhenMatched::Delete, + LanceMergeWhenNotMatched::DoNothing, + LanceMergeWhenNotMatchedBySource::Keep, + ); + let mut result = LanceMergeInsertResult::default(); + let rc = unsafe { + lance_dataset_merge_insert(ds, on_ptrs.as_ptr(), 1, &mut stream, ¶ms, &mut result) + }; + assert_eq!(rc, 0); + assert_eq!(result.num_inserted_rows, 0); + assert_eq!(result.num_deleted_rows, 1); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 9); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_update_if_filters_matches() { + // UpdateIf only updates matched rows where the filter holds. The source + // matches both id=2 and id=8; the filter `target.value > 3` selects only + // id=8 (target value 4.0) — id=2's target value 1.0 stays put. + let (_tmp, uri) = create_large_dataset(10); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let source = make_merge_source(&[(2, 100.0, "x"), (8, 100.0, "y")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + + let expr = c_str("target.value > 3"); + let params = LanceMergeInsertParams { + when_matched: LanceMergeWhenMatched::UpdateIf as i32, + when_matched_expr: expr.as_ptr(), + when_not_matched: LanceMergeWhenNotMatched::DoNothing as i32, + when_not_matched_by_source: LanceMergeWhenNotMatchedBySource::Keep as i32, + when_not_matched_by_source_expr: ptr::null(), + }; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + + let batches = scan_all_rows(ds); + let mut row2_value = None; + let mut row8_value = None; + for batch in &batches { + let ids = batch + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let values = batch + .column_by_name("value") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + match ids.value(i) { + 2 => row2_value = Some(values.value(i)), + 8 => row8_value = Some(values.value(i)), + _ => {} + } + } + } + assert_eq!( + row2_value, + Some(1.0), + "id=2 should be unchanged (filter false)" + ); + assert_eq!( + row8_value, + Some(100.0), + "id=8 should be updated (filter true)" + ); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_when_not_matched_do_nothing_skips_inserts() { + let (_tmp, uri) = create_large_dataset(5); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let source = make_merge_source(&[(100, 0.0, "x"), (200, 0.0, "y")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + + let params = merge_params( + LanceMergeWhenMatched::UpdateAll, + LanceMergeWhenNotMatched::DoNothing, + LanceMergeWhenNotMatchedBySource::Keep, + ); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + // Source rows did not match anything; with DoNothing they are discarded. + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 5); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_when_not_matched_by_source_delete() { + // Replace-everything-not-in-source semantics: target rows whose key does + // not appear in the source are dropped. + let (_tmp, uri) = create_large_dataset(5); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let source = make_merge_source(&[(2, 0.0, "x"), (3, 0.0, "y")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + + let params = merge_params( + LanceMergeWhenMatched::DoNothing, + LanceMergeWhenNotMatched::DoNothing, + LanceMergeWhenNotMatchedBySource::Delete, + ); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + // 5 -> 2 rows remain (ids 2 and 3). + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 2); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_when_not_matched_by_source_delete_if() { + // DeleteIf("id < 3"): drop unmatched target rows that satisfy the filter + // (ids 0, 1) and keep the rest (ids 3, 4). id=2 is matched by source so + // it is preserved regardless of the filter. + let (_tmp, uri) = create_large_dataset(5); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let source = make_merge_source(&[(2, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + + let expr = c_str("id < 3"); + let params = LanceMergeInsertParams { + when_matched: LanceMergeWhenMatched::DoNothing as i32, + when_matched_expr: ptr::null(), + when_not_matched: LanceMergeWhenNotMatched::DoNothing as i32, + when_not_matched_by_source: LanceMergeWhenNotMatchedBySource::DeleteIf as i32, + when_not_matched_by_source_expr: expr.as_ptr(), + }; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + // id=0 and id=1 deleted; id=2,3,4 kept. + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 3); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_multi_column_keys() { + // Match on (id, label). The source row matches id=3 but with a different + // label, so no target row is matched and the source row is inserted as a + // brand-new row under upsert semantics. + let (_tmp, uri) = create_large_dataset(5); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let source = make_merge_source(&[(3, 99.0, "different")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id"), c_str("label")]; + let on_ptrs = cstr_ptrs(&on); + + let params = merge_params( + LanceMergeWhenMatched::UpdateAll, + LanceMergeWhenNotMatched::InsertAll, + LanceMergeWhenNotMatchedBySource::Keep, + ); + let mut result = LanceMergeInsertResult::default(); + let rc = unsafe { + lance_dataset_merge_insert(ds, on_ptrs.as_ptr(), 2, &mut stream, ¶ms, &mut result) + }; + assert_eq!(rc, 0); + assert_eq!(result.num_inserted_rows, 1); + assert_eq!(result.num_updated_rows, 0); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 6); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_bumps_version() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let v_before = unsafe { lance_dataset_version(ds) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + let v_after = unsafe { lance_dataset_version(ds) }; + assert!(v_after > v_before); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_out_result_optional() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + // Pass NULL out_result — must succeed without writing anything. + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + + unsafe { lance_dataset_close(ds) }; +} + +// Locks in the documented contract: when the call fails, `out_result` must be +// left unchanged. A future refactor that pre-zeroes the slot before validating +// inputs would silently break this guarantee. +#[test] +fn test_merge_insert_out_result_untouched_on_error() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let sentinel = LanceMergeInsertResult { + num_inserted_rows: 0xDEAD, + num_updated_rows: 0xBEEF, + num_deleted_rows: 0xCAFE, + }; + let mut out = sentinel; + + // num_on_columns = 0 → INVALID_ARGUMENT before any work happens. The + // stream is still consumed (NULL stream is the only check ahead of the + // `from_raw` consume), but the result slot must be untouched. + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let rc = unsafe { + lance_dataset_merge_insert(ds, ptr::null(), 0, &mut stream, ptr::null(), &mut out) + }; + assert_eq!(rc, -1); + assert_eq!(out, sentinel, "out slot must be untouched on error"); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_null_dataset_rejected() { + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let rc = unsafe { + lance_dataset_merge_insert( + ptr::null_mut(), + on_ptrs.as_ptr(), + 1, + &mut stream, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); +} + +#[test] +fn test_merge_insert_null_source_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + ptr::null_mut(), + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 3); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_zero_num_on_columns_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + ptr::null(), + 0, + &mut stream, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 3); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_null_on_columns_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + ptr::null(), + 1, + &mut stream, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_empty_key_entry_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("")]; + let on_ptrs = cstr_ptrs(&on); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_null_entry_in_on_columns_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on_ptrs: [*const c_char; 1] = [ptr::null()]; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_unknown_key_column_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("no_such_column")]; + let on_ptrs = cstr_ptrs(&on); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + // MergeInsertBuilder::try_new returns InvalidInput for an unknown key. + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 3); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_invalid_when_matched_discriminant_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let params = LanceMergeInsertParams { + when_matched: 99, + when_matched_expr: ptr::null(), + when_not_matched: 0, + when_not_matched_by_source: 0, + when_not_matched_by_source_expr: ptr::null(), + }; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 3); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_invalid_when_not_matched_discriminant_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let params = LanceMergeInsertParams { + when_matched: 0, + when_matched_expr: ptr::null(), + when_not_matched: 99, + when_not_matched_by_source: 0, + when_not_matched_by_source_expr: ptr::null(), + }; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_invalid_when_not_matched_by_source_discriminant_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let params = LanceMergeInsertParams { + when_matched: 0, + when_matched_expr: ptr::null(), + when_not_matched: 0, + when_not_matched_by_source: 99, + when_not_matched_by_source_expr: ptr::null(), + }; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_empty_expr_rejected() { + // Empty expression string is rejected at the FFI boundary so callers hit + // a precise error rather than an opaque parser failure later on. + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let empty = c_str(""); + let params = LanceMergeInsertParams { + when_matched: LanceMergeWhenMatched::UpdateIf as i32, + when_matched_expr: empty.as_ptr(), + when_not_matched: 0, + when_not_matched_by_source: 0, + when_not_matched_by_source_expr: ptr::null(), + }; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_update_if_missing_expr_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let params = LanceMergeInsertParams { + when_matched: LanceMergeWhenMatched::UpdateIf as i32, + when_matched_expr: ptr::null(), + when_not_matched: 0, + when_not_matched_by_source: 0, + when_not_matched_by_source_expr: ptr::null(), + }; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_unused_expr_for_update_all_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let expr = c_str("id > 0"); + let params = LanceMergeInsertParams { + when_matched: LanceMergeWhenMatched::UpdateAll as i32, + when_matched_expr: expr.as_ptr(), + when_not_matched: 0, + when_not_matched_by_source: 0, + when_not_matched_by_source_expr: ptr::null(), + }; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_unused_expr_for_keep_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let expr = c_str("id > 0"); + let params = LanceMergeInsertParams { + when_matched: 0, + when_matched_expr: ptr::null(), + when_not_matched: 0, + when_not_matched_by_source: LanceMergeWhenNotMatchedBySource::Keep as i32, + when_not_matched_by_source_expr: expr.as_ptr(), + }; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_delete_if_missing_expr_rejected() { + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let params = LanceMergeInsertParams { + when_matched: 0, + when_matched_expr: ptr::null(), + when_not_matched: 0, + when_not_matched_by_source: LanceMergeWhenNotMatchedBySource::DeleteIf as i32, + when_not_matched_by_source_expr: ptr::null(), + }; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_no_op_config_rejected() { + // DoNothing + DoNothing + Keep is a configuration that mutates nothing; + // upstream's `try_build` rejects it as InvalidInput. + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(100, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let params = merge_params( + LanceMergeWhenMatched::DoNothing, + LanceMergeWhenNotMatched::DoNothing, + LanceMergeWhenNotMatchedBySource::Keep, + ); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 3); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_schema_mismatch_rejected() { + // Source `value` column is Float64 instead of Float32, so upstream's + // schema-compatibility check rejects the merge before any commit lands. + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let bad_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Float64, true), + ])); + let bad_batch = RecordBatch::try_new( + bad_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![100])), + Arc::new(arrow_array::Float64Array::from(vec![1.0])), + ], + ) + .unwrap(); + let reader = arrow::record_batch::RecordBatchIterator::new(vec![Ok(bad_batch)], bad_schema); + let mut stream = FFI_ArrowArrayStream::new(Box::new(reader)); + + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + let params = merge_params( + LanceMergeWhenMatched::UpdateAll, + LanceMergeWhenNotMatched::InsertAll, + LanceMergeWhenNotMatchedBySource::Keep, + ); + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + // The dataset should not be corrupted by the rejected merge. + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 3); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_merge_insert_unknown_predicate_column_in_delete_if_rejected() { + // DeleteIf parses against the dataset schema at FFI time; an unknown + // column surfaces as InvalidArgument. + let (_tmp, uri) = create_large_dataset(3); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let source = make_merge_source(&[(2, 0.0, "x")]); + let mut stream = batch_to_ffi_stream(source); + let on = [c_str("id")]; + let on_ptrs = cstr_ptrs(&on); + + let expr = c_str("no_such_column = 1"); + let params = LanceMergeInsertParams { + when_matched: 0, + when_matched_expr: ptr::null(), + when_not_matched: 0, + when_not_matched_by_source: LanceMergeWhenNotMatchedBySource::DeleteIf as i32, + when_not_matched_by_source_expr: expr.as_ptr(), + }; + let rc = unsafe { + lance_dataset_merge_insert( + ds, + on_ptrs.as_ptr(), + 1, + &mut stream, + ¶ms, + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 3); + unsafe { lance_dataset_close(ds) }; +} diff --git a/tests/cpp/test_c_api.c b/tests/cpp/test_c_api.c index 91f1f92..ae15c80 100644 --- a/tests/cpp/test_c_api.c +++ b/tests/cpp/test_c_api.c @@ -300,6 +300,51 @@ static void test_update(const char *write_uri) { printf("updated=%llu... OK\n", (unsigned long long)updated); } +/* Re-opens the dataset just written by `test_dataset_write_roundtrip` and + * exercises `lance_dataset_merge_insert`. Must run before `test_delete`, + * which empties the dataset. The source comes from scanning the dataset + * itself, so under find-or-create defaults every row is a self-match + * (DoNothing) and nothing changes — this validates the FFI plumbing without + * needing to hand-build an Arrow batch in pure C. */ +static void test_merge_insert(const char *write_uri) { + printf(" test_merge_insert... "); + + LanceDataset *ds = lance_dataset_open(write_uri, NULL, 0); + ASSERT(ds != NULL, "open failed"); + uint64_t before = lance_dataset_count_rows(ds); + CHECK_OK(); + ASSERT(before > 0, "fixture expected to have rows"); + + /* Build a self-source via the scanner. */ + LanceScanner *scanner = lance_scanner_new(ds, NULL, NULL); + ASSERT(scanner != NULL, "scanner creation failed"); + + struct ArrowArrayStream stream; + memset(&stream, 0, sizeof(stream)); + int32_t rc = lance_scanner_to_arrow_stream(scanner, &stream); + ASSERT(rc == 0, "to_arrow_stream failed"); + + const char *on_cols[] = {"id"}; + LanceMergeInsertResult result; + memset(&result, 0, sizeof(result)); + rc = lance_dataset_merge_insert(ds, on_cols, 1, &stream, NULL, &result); + ASSERT(rc == 0, "merge_insert failed"); + /* Self-match under DoNothing: nothing inserted, nothing updated. */ + ASSERT(result.num_inserted_rows == 0, "expected 0 inserts"); + ASSERT(result.num_updated_rows == 0, "expected 0 updates"); + ASSERT(lance_dataset_count_rows(ds) == before, "row count must be unchanged"); + + /* num_on_columns == 0 must be rejected. */ + rc = lance_dataset_merge_insert(ds, NULL, 0, NULL, NULL, NULL); + ASSERT(rc == -1, "num_on_columns=0 must fail"); + ASSERT(lance_last_error_code() == LANCE_ERR_INVALID_ARGUMENT, + "expected INVALID_ARGUMENT"); + + lance_scanner_close(scanner); + lance_dataset_close(ds); + printf("OK\n"); +} + /* Re-opens the dataset just written by `test_dataset_write_roundtrip` and * exercises `lance_dataset_delete`. Must run after the write roundtrip. */ static void test_delete(const char *write_uri) { @@ -347,6 +392,7 @@ int main(int argc, char **argv) { test_error_handling(); test_dataset_write_roundtrip(uri, write_uri); test_update(write_uri); + test_merge_insert(write_uri); test_delete(write_uri); printf("All C tests passed!\n"); diff --git a/tests/cpp/test_cpp_api.cpp b/tests/cpp/test_cpp_api.cpp index 5c8dcd6..0ca515d 100644 --- a/tests/cpp/test_cpp_api.cpp +++ b/tests/cpp/test_cpp_api.cpp @@ -331,6 +331,42 @@ static void test_update(const std::string& dst_uri) { PASS(); } +// Re-opens the dataset just written by `test_dataset_write_roundtrip` and +// exercises `Dataset::merge_insert`. Must run before `test_delete_rows`, +// which empties the dataset. +static void test_merge_insert(const std::string& dst_uri) { + TEST(test_merge_insert); + + auto ds = lance::Dataset::open(dst_uri); + uint64_t before = ds.count_rows(); + assert(before > 0 && "test fixture expected to have rows"); + + // Self-merge: scan the dataset itself and use that as the source. With + // find-or-create defaults every row is a self-match and DoNothing fires, + // so insert/update counts stay at zero and the row count is preserved. + auto scanner = ds.scan(); + ArrowArrayStream stream; + memset(&stream, 0, sizeof(stream)); + scanner.to_arrow_stream(&stream); + + auto result = ds.merge_insert({"id"}, &stream); + assert(result.num_inserted_rows == 0); + assert(result.num_updated_rows == 0); + assert(ds.count_rows() == before); + + // Empty key vector must throw (num_on_columns == 0). + bool caught_empty = false; + try { + ds.merge_insert({}, nullptr); + } catch (const lance::Error& e) { + caught_empty = true; + assert(e.code == LANCE_ERR_INVALID_ARGUMENT); + } + assert(caught_empty); + + PASS(); +} + // Re-opens the dataset just written by `test_dataset_write_roundtrip` and // exercises `Dataset::delete_rows`. Must run after the write roundtrip. static void test_delete_rows(const std::string& dst_uri) { @@ -382,6 +418,7 @@ int main(int argc, char** argv) { test_fts_smoke(uri); test_dataset_write_roundtrip(uri, write_uri); test_update(write_uri); + test_merge_insert(write_uri); test_delete_rows(write_uri); printf("All C++ tests passed!\n");