diff --git a/src/duckdb/extension/json/json_functions/read_json.cpp b/src/duckdb/extension/json/json_functions/read_json.cpp index f5e9bd405..65209f1f4 100644 --- a/src/duckdb/extension/json/json_functions/read_json.cpp +++ b/src/duckdb/extension/json/json_functions/read_json.cpp @@ -108,13 +108,13 @@ class JSONSchemaTask : public BaseExecutorTask { JSONStructure::ExtractStructure(val, node, true); } } + remaining -= next; if (!node.ContainsVarchar()) { // Can't refine non-VARCHAR types continue; } node.InitializeCandidateTypes(options.max_depth, options.convert_strings_to_integers); node.RefineCandidateTypes(scan_state.values, next, string_vector, allocator, auto_detect_state.date_format_map); - remaining -= next; } auto_detect_state.total_file_size += file_size; auto_detect_state.bytes_scanned += total_read_size; diff --git a/src/duckdb/extension/parquet/column_reader.cpp b/src/duckdb/extension/parquet/column_reader.cpp index a6cdd4a6e..8fa5bfb06 100644 --- a/src/duckdb/extension/parquet/column_reader.cpp +++ b/src/duckdb/extension/parquet/column_reader.cpp @@ -109,7 +109,7 @@ const uint64_t ParquetDecodeUtils::BITPACK_MASKS_SIZE = sizeof(ParquetDecodeUtil const uint8_t ParquetDecodeUtils::BITPACK_DLEN = 8; -ColumnReader::ColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema_p) +ColumnReader::ColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema_p) : column_schema(schema_p), reader(reader), page_rows_available(0), dictionary_decoder(*this), delta_binary_packed_decoder(*this), rle_decoder(*this), delta_length_byte_array_decoder(*this), delta_byte_array_decoder(*this), byte_stream_split_decoder(*this), aad_crypto_metadata(reader.allocator) { @@ -122,7 +122,7 @@ Allocator &ColumnReader::GetAllocator() { return reader.allocator; } -ParquetReader &ColumnReader::Reader() { +const ParquetReader &ColumnReader::Reader() { return reader; } @@ -825,7 +825,7 @@ void ColumnReader::ApplyPendingSkips(data_ptr_t define_out, data_ptr_t repeat_ou // Create Column Reader //===--------------------------------------------------------------------===// template -static unique_ptr CreateDecimalReader(ParquetReader &reader, const ParquetColumnSchema &schema) { +static unique_ptr CreateDecimalReader(const ParquetReader &reader, const ParquetColumnSchema &schema) { switch (schema.type.InternalType()) { case PhysicalType::INT16: return make_uniq>>(reader, schema); @@ -840,7 +840,7 @@ static unique_ptr CreateDecimalReader(ParquetReader &reader, const } } -unique_ptr ColumnReader::CreateReader(ParquetReader &reader, const ParquetColumnSchema &schema) { +unique_ptr ColumnReader::CreateReader(const ParquetReader &reader, const ParquetColumnSchema &schema) { switch (schema.type.id()) { case LogicalTypeId::BOOLEAN: return make_uniq(reader, schema); diff --git a/src/duckdb/extension/parquet/include/column_reader.hpp b/src/duckdb/extension/parquet/include/column_reader.hpp index a5d9dab05..a29f521d6 100644 --- a/src/duckdb/extension/parquet/include/column_reader.hpp +++ b/src/duckdb/extension/parquet/include/column_reader.hpp @@ -62,11 +62,11 @@ class ColumnReader { friend class RLEDecoder; public: - ColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema_p); + ColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema_p); virtual ~ColumnReader(); public: - static unique_ptr CreateReader(ParquetReader &reader, const ParquetColumnSchema &schema); + static unique_ptr CreateReader(const ParquetReader &reader, const ParquetColumnSchema &schema); virtual void InitializeRead(idx_t row_group_index, const vector &columns, TProtocol &protocol_p); virtual idx_t Read(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result_out); virtual void Select(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result_out, @@ -78,7 +78,7 @@ class ColumnReader { SelectionVector &sel, idx_t &approved_tuple_count); virtual void Skip(idx_t num_values); - ParquetReader &Reader(); + const ParquetReader &Reader(); const LogicalType &Type() const { return column_schema.type; } @@ -303,7 +303,7 @@ class ColumnReader { protected: const ParquetColumnSchema &column_schema; - ParquetReader &reader; + const ParquetReader &reader; idx_t pending_skips = 0; bool page_is_filtered_out = false; diff --git a/src/duckdb/extension/parquet/include/parquet_decimal_utils.hpp b/src/duckdb/extension/parquet/include/parquet_decimal_utils.hpp index de042c302..d6fa8986e 100644 --- a/src/duckdb/extension/parquet/include/parquet_decimal_utils.hpp +++ b/src/duckdb/extension/parquet/include/parquet_decimal_utils.hpp @@ -46,7 +46,7 @@ class ParquetDecimalUtils { return res; } - static unique_ptr CreateReader(ParquetReader &reader, const ParquetColumnSchema &schema); + static unique_ptr CreateReader(const ParquetReader &reader, const ParquetColumnSchema &schema); }; template <> diff --git a/src/duckdb/extension/parquet/include/parquet_geometry.hpp b/src/duckdb/extension/parquet/include/parquet_geometry.hpp index 09db147eb..67ba2f77e 100644 --- a/src/duckdb/extension/parquet/include/parquet_geometry.hpp +++ b/src/duckdb/extension/parquet/include/parquet_geometry.hpp @@ -23,7 +23,7 @@ class ColumnReader; class ClientContext; struct GeometryColumnReader { - static unique_ptr Create(ParquetReader &reader, const ParquetColumnSchema &schema, + static unique_ptr Create(const ParquetReader &reader, const ParquetColumnSchema &schema, ClientContext &context); }; diff --git a/src/duckdb/extension/parquet/include/parquet_reader.hpp b/src/duckdb/extension/parquet/include/parquet_reader.hpp index aca82489b..1726d32a8 100644 --- a/src/duckdb/extension/parquet/include/parquet_reader.hpp +++ b/src/duckdb/extension/parquet/include/parquet_reader.hpp @@ -148,7 +148,7 @@ class ParquetReader : public BaseFileReader { shared_ptr metadata = nullptr); ~ParquetReader() override; - CachingFileSystem fs; + mutable CachingFileSystem fs; Allocator &allocator; shared_ptr metadata; ParquetOptions parquet_options; @@ -167,13 +167,15 @@ class ParquetReader : public BaseFileReader { bool TryInitializeScan(ClientContext &context, GlobalTableFunctionState &gstate, LocalTableFunctionState &lstate) override; + void PrepareScan(ClientContext &context, GlobalTableFunctionState &gstate_p, + LocalTableFunctionState &lstate_p) override; AsyncResult Scan(ClientContext &context, GlobalTableFunctionState &global_state, LocalTableFunctionState &local_state, DataChunk &chunk) override; void FinishFile(ClientContext &context, GlobalTableFunctionState &gstate_p) override; double GetProgressInFile(ClientContext &context) override; public: - void InitializeScan(ClientContext &context, ParquetReaderScanState &state, vector groups_to_read); + void InitializeScan(ClientContext &context, ParquetReaderScanState &state, vector groups_to_read) const; AsyncResult Scan(ClientContext &context, ParquetReaderScanState &state, DataChunk &output); idx_t NumRows() const; @@ -184,11 +186,11 @@ class ParquetReader : public BaseFileReader { const duckdb_parquet::FileMetaData *GetFileMetadata() const; string static GetUniqueFileIdentifier(const duckdb_parquet::EncryptionAlgorithm &encryption_algorithm); - uint32_t Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot); + uint32_t Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot) const; uint32_t ReadEncrypted(duckdb_apache::thrift::TBase &object, TProtocol &iprot, CryptoMetaData &aad_crypto_metadata) const; uint32_t ReadData(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer, - const uint32_t buffer_size); + const uint32_t buffer_size) const; uint32_t ReadDataEncrypted(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer, const uint32_t buffer_size, CryptoMetaData &aad_crypto_metadata) const; @@ -226,10 +228,9 @@ class ParquetReader : public BaseFileReader { ParquetColumnSchema ParseSchemaRecursive(idx_t depth, idx_t max_define, idx_t max_repeat, idx_t &next_schema_idx, idx_t &next_file_idx, ClientContext &context); - unique_ptr CreateReader(ClientContext &context); - + unique_ptr CreateReader(ClientContext &context) const; unique_ptr CreateReaderRecursive(ClientContext &context, const vector &indexes, - const ParquetColumnSchema &schema); + const ParquetColumnSchema &schema) const; const duckdb_parquet::RowGroup &GetGroup(ParquetReaderScanState &state); uint64_t GetGroupCompressedSize(ParquetReaderScanState &state); idx_t GetGroupOffset(ParquetReaderScanState &state); diff --git a/src/duckdb/extension/parquet/include/reader/boolean_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/boolean_column_reader.hpp index 17ad2065c..04dfbca93 100644 --- a/src/duckdb/extension/parquet/include/reader/boolean_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/boolean_column_reader.hpp @@ -20,7 +20,7 @@ class BooleanColumnReader : public TemplatedColumnReader(reader, schema), byte_pos(0) { } diff --git a/src/duckdb/extension/parquet/include/reader/callback_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/callback_column_reader.hpp index 232deaf78..eb6a98aab 100644 --- a/src/duckdb/extension/parquet/include/reader/callback_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/callback_column_reader.hpp @@ -27,7 +27,7 @@ class CallbackColumnReader static constexpr const PhysicalType TYPE = PhysicalType::INVALID; public: - CallbackColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema) + CallbackColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema) : TemplatedColumnReader>( reader, schema) { diff --git a/src/duckdb/extension/parquet/include/reader/decimal_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/decimal_column_reader.hpp index e046aafc4..8a5275e3e 100644 --- a/src/duckdb/extension/parquet/include/reader/decimal_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/decimal_column_reader.hpp @@ -56,7 +56,7 @@ class DecimalColumnReader TemplatedColumnReader>; public: - DecimalColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema) + DecimalColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema) : TemplatedColumnReader>(reader, schema) { } diff --git a/src/duckdb/extension/parquet/include/reader/interval_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/interval_column_reader.hpp index 0f93bf9d5..90c32d37f 100644 --- a/src/duckdb/extension/parquet/include/reader/interval_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/interval_column_reader.hpp @@ -58,7 +58,7 @@ struct IntervalValueConversion { class IntervalColumnReader : public TemplatedColumnReader { public: - IntervalColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema) + IntervalColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema) : TemplatedColumnReader(reader, schema) { } }; diff --git a/src/duckdb/extension/parquet/include/reader/list_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/list_column_reader.hpp index 5a20f59e1..32bee1aaa 100644 --- a/src/duckdb/extension/parquet/include/reader/list_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/list_column_reader.hpp @@ -18,7 +18,7 @@ class ListColumnReader : public ColumnReader { static constexpr const PhysicalType TYPE = PhysicalType::LIST; public: - ListColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema, + ListColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema, unique_ptr child_column_reader_p); idx_t Read(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result_out) override; diff --git a/src/duckdb/extension/parquet/include/reader/null_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/null_column_reader.hpp index 627c6ac55..374170572 100644 --- a/src/duckdb/extension/parquet/include/reader/null_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/null_column_reader.hpp @@ -18,7 +18,7 @@ class NullColumnReader : public ColumnReader { static constexpr const PhysicalType TYPE = PhysicalType::INVALID; public: - NullColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema) : ColumnReader(reader, schema) {}; + NullColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema) : ColumnReader(reader, schema) {}; shared_ptr dict; diff --git a/src/duckdb/extension/parquet/include/reader/row_number_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/row_number_column_reader.hpp index fcbe4bdf4..ff9081c17 100644 --- a/src/duckdb/extension/parquet/include/reader/row_number_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/row_number_column_reader.hpp @@ -20,7 +20,7 @@ class RowNumberColumnReader : public ColumnReader { static constexpr const PhysicalType TYPE = PhysicalType::INT64; public: - RowNumberColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema); + RowNumberColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema); public: idx_t Read(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result) override; diff --git a/src/duckdb/extension/parquet/include/reader/string_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/string_column_reader.hpp index e837ce092..31e2139a2 100644 --- a/src/duckdb/extension/parquet/include/reader/string_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/string_column_reader.hpp @@ -31,7 +31,7 @@ class StringColumnReader : public ColumnReader { static constexpr const PhysicalType TYPE = PhysicalType::VARCHAR; public: - StringColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema); + StringColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema); idx_t fixed_width_string_length; const StringColumnType string_column_type; diff --git a/src/duckdb/extension/parquet/include/reader/struct_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/struct_column_reader.hpp index 3cf41e020..0703fd1a7 100644 --- a/src/duckdb/extension/parquet/include/reader/struct_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/struct_column_reader.hpp @@ -18,7 +18,7 @@ class StructColumnReader : public ColumnReader { static constexpr const PhysicalType TYPE = PhysicalType::STRUCT; public: - StructColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema, + StructColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema, vector> child_readers_p); vector> child_readers; diff --git a/src/duckdb/extension/parquet/include/reader/templated_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/templated_column_reader.hpp index b6bd55cc7..26f319e11 100644 --- a/src/duckdb/extension/parquet/include/reader/templated_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/templated_column_reader.hpp @@ -48,7 +48,8 @@ class TemplatedColumnReader : public ColumnReader { static constexpr const PhysicalType TYPE = PhysicalType::INVALID; public: - TemplatedColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema) : ColumnReader(reader, schema) { + TemplatedColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema) + : ColumnReader(reader, schema) { } shared_ptr dict; diff --git a/src/duckdb/extension/parquet/include/reader/uuid_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/uuid_column_reader.hpp index 22d468d0f..0926e87e6 100644 --- a/src/duckdb/extension/parquet/include/reader/uuid_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/uuid_column_reader.hpp @@ -51,7 +51,7 @@ struct UUIDValueConversion { class UUIDColumnReader : public TemplatedColumnReader { public: - UUIDColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema) + UUIDColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema) : TemplatedColumnReader(reader, schema) { } }; diff --git a/src/duckdb/extension/parquet/include/reader/variant_column_reader.hpp b/src/duckdb/extension/parquet/include/reader/variant_column_reader.hpp index ad3e8533d..366b112d7 100644 --- a/src/duckdb/extension/parquet/include/reader/variant_column_reader.hpp +++ b/src/duckdb/extension/parquet/include/reader/variant_column_reader.hpp @@ -18,7 +18,7 @@ class VariantColumnReader : public ColumnReader { static constexpr const PhysicalType TYPE = PhysicalType::STRUCT; public: - VariantColumnReader(ClientContext &context, ParquetReader &reader, const ParquetColumnSchema &schema, + VariantColumnReader(ClientContext &context, const ParquetReader &reader, const ParquetColumnSchema &schema, vector> child_readers_p); ClientContext &context; diff --git a/src/duckdb/extension/parquet/parquet_geometry.cpp b/src/duckdb/extension/parquet/parquet_geometry.cpp index a3726a7ca..26055f730 100644 --- a/src/duckdb/extension/parquet/parquet_geometry.cpp +++ b/src/duckdb/extension/parquet/parquet_geometry.cpp @@ -113,7 +113,7 @@ unique_ptr GeoParquetFileMetadata::TryRead(const duckdb_ // Parse the CRS const auto crs_val = yyjson_obj_get(column_val, "crs"); - if (crs_val) { + if (crs_val && !yyjson_is_null(crs_val)) { // Parse the CRS if (!yyjson_is_obj(crs_val)) { throw InvalidInputException("Geoparquet column '%s' has invalid CRS", column_name); @@ -126,8 +126,10 @@ unique_ptr GeoParquetFileMetadata::TryRead(const duckdb_ // Free the temporary CRS JSON string free(crs_json); + } else if (crs_val && yyjson_is_null(crs_val)) { + // If CRS is null, do nothing } else { - // Otherwise, default to OGC:CRS84 + // Otherwise, if no CRS, default to OGC:CRS84 auto crs = CoordinateReferenceSystem::TryConvert(context, "OGC:CRS84", CoordinateReferenceSystemType::PROJJSON); if (crs) { @@ -363,7 +365,7 @@ optional_ptr GeoParquetFileMetadata::GetColumnMe return &it->second; } -unique_ptr GeometryColumnReader::Create(ParquetReader &reader, const ParquetColumnSchema &schema, +unique_ptr GeometryColumnReader::Create(const ParquetReader &reader, const ParquetColumnSchema &schema, ClientContext &context) { D_ASSERT(schema.type.id() == LogicalTypeId::GEOMETRY); D_ASSERT(schema.children.size() == 1 && schema.children[0].type.id() == LogicalTypeId::BLOB); diff --git a/src/duckdb/extension/parquet/parquet_multi_file_info.cpp b/src/duckdb/extension/parquet/parquet_multi_file_info.cpp index 4493c8842..e47befe2a 100644 --- a/src/duckdb/extension/parquet/parquet_multi_file_info.cpp +++ b/src/duckdb/extension/parquet/parquet_multi_file_info.cpp @@ -65,6 +65,7 @@ struct ParquetReadGlobalState : public GlobalTableFunctionState { struct ParquetReadLocalState : public LocalTableFunctionState { ParquetReaderScanState scan_state; + vector group_indexes; }; static void ParseFileRowNumberOption(MultiFileReaderBindData &bind_data, ParquetOptions &options, @@ -341,7 +342,7 @@ static vector ParquetGetPartitionStats(ClientContext &conte } auto &parquet_data = bind_data.bind_data->Cast(); auto &cached_metadata = parquet_data.TryLoadCaches(bind_data, context); - if (!cached_metadata.empty()) { + if (cached_metadata.empty()) { // no cached metadata - bail return result; } @@ -684,12 +685,17 @@ bool ParquetReader::TryInitializeScan(ClientContext &context, GlobalTableFunctio return false; } // The current reader has rowgroups left to be scanned - vector group_indexes {gstate.row_group_index}; - InitializeScan(context, lstate.scan_state, group_indexes); + lstate.group_indexes = {gstate.row_group_index}; gstate.row_group_index++; return true; } +void ParquetReader::PrepareScan(ClientContext &context, GlobalTableFunctionState &gstate_p, + LocalTableFunctionState &lstate_p) { + auto &lstate = lstate_p.Cast(); + InitializeScan(context, lstate.scan_state, lstate.group_indexes); +} + void ParquetReader::FinishFile(ClientContext &context, GlobalTableFunctionState &gstate_p) { auto &gstate = gstate_p.Cast(); gstate.row_group_index = 0; @@ -705,7 +711,6 @@ AsyncResult ParquetReader::Scan(ClientContext &context, GlobalTableFunctionState } } #endif - auto &gstate = gstate_p.Cast(); auto &local_state = local_state_p.Cast(); local_state.scan_state.op = gstate.op; diff --git a/src/duckdb/extension/parquet/parquet_reader.cpp b/src/duckdb/extension/parquet/parquet_reader.cpp index 33b99a167..97f419c87 100644 --- a/src/duckdb/extension/parquet/parquet_reader.cpp +++ b/src/duckdb/extension/parquet/parquet_reader.cpp @@ -408,7 +408,7 @@ ParquetColumnSchema ParquetReader::ParseColumnSchema(const SchemaElement &s_ele, unique_ptr ParquetReader::CreateReaderRecursive(ClientContext &context, const vector &indexes, - const ParquetColumnSchema &schema) { + const ParquetColumnSchema &schema) const { switch (schema.schema_type) { case ParquetColumnSchemaType::FILE_ROW_NUMBER: return make_uniq(*this, schema); @@ -460,7 +460,7 @@ unique_ptr ParquetReader::CreateReaderRecursive(ClientContext &con } } -unique_ptr ParquetReader::CreateReader(ClientContext &context) { +unique_ptr ParquetReader::CreateReader(ClientContext &context) const { auto ret = CreateReaderRecursive(context, column_indexes, *root_schema); if (ret->Type().id() != LogicalTypeId::STRUCT) { throw InternalException("Root element of Parquet file must be a struct"); @@ -974,7 +974,7 @@ string ParquetReader::GetUniqueFileIdentifier(const duckdb_parquet::EncryptionAl } } -uint32_t ParquetReader::Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot) { +uint32_t ParquetReader::Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot) const { return object.read(&iprot); } @@ -986,7 +986,7 @@ uint32_t ParquetReader::ReadEncrypted(duckdb_apache::thrift::TBase &object, TPro } uint32_t ParquetReader::ReadData(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer, - const uint32_t buffer_size) { + const uint32_t buffer_size) const { return iprot.getTransport()->read(buffer, buffer_size); } @@ -997,7 +997,7 @@ uint32_t ParquetReader::ReadDataEncrypted(duckdb_apache::thrift::protocol::TProt *encryption_util, aad_crypto_metadata); } -static idx_t GetRowGroupOffset(ParquetReader &reader, idx_t group_idx) { +static idx_t GetRowGroupOffset(const ParquetReader &reader, idx_t group_idx) { idx_t row_group_offset = 0; auto &row_groups = reader.GetFileMetadata()->row_groups; for (idx_t i = 0; i < group_idx; i++) { @@ -1232,7 +1232,7 @@ ParquetScanFilter::~ParquetScanFilter() { } void ParquetReader::InitializeScan(ClientContext &context, ParquetReaderScanState &state, - vector groups_to_read) { + vector groups_to_read) const { state.current_group = -1; state.finished = false; state.offset_in_group = 0; diff --git a/src/duckdb/extension/parquet/reader/decimal_column_reader.cpp b/src/duckdb/extension/parquet/reader/decimal_column_reader.cpp index d145d027a..253b9238f 100644 --- a/src/duckdb/extension/parquet/reader/decimal_column_reader.cpp +++ b/src/duckdb/extension/parquet/reader/decimal_column_reader.cpp @@ -3,7 +3,8 @@ namespace duckdb { template -static unique_ptr CreateDecimalReaderInternal(ParquetReader &reader, const ParquetColumnSchema &schema) { +static unique_ptr CreateDecimalReaderInternal(const ParquetReader &reader, + const ParquetColumnSchema &schema) { switch (schema.type.InternalType()) { case PhysicalType::INT16: return make_uniq>(reader, schema); @@ -45,7 +46,8 @@ double ParquetDecimalUtils::ReadDecimalValue(const_data_ptr_t pointer, idx_t siz return res; } -unique_ptr ParquetDecimalUtils::CreateReader(ParquetReader &reader, const ParquetColumnSchema &schema) { +unique_ptr ParquetDecimalUtils::CreateReader(const ParquetReader &reader, + const ParquetColumnSchema &schema) { if (schema.parquet_type == Type::FIXED_LEN_BYTE_ARRAY) { return CreateDecimalReaderInternal(reader, schema); } else { diff --git a/src/duckdb/extension/parquet/reader/list_column_reader.cpp b/src/duckdb/extension/parquet/reader/list_column_reader.cpp index b291e1019..8937ed777 100644 --- a/src/duckdb/extension/parquet/reader/list_column_reader.cpp +++ b/src/duckdb/extension/parquet/reader/list_column_reader.cpp @@ -171,7 +171,7 @@ idx_t ListColumnReader::Read(uint64_t num_values, data_ptr_t define_out, data_pt return ReadInternal(num_values, define_out, repeat_out, result_out); } -ListColumnReader::ListColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema, +ListColumnReader::ListColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema, unique_ptr child_column_reader_p) : ColumnReader(reader, schema), child_column_reader(std::move(child_column_reader_p)), read_cache(reader.allocator, ListType::GetChildType(Type())), read_vector(read_cache), overflow_child_count(0) { diff --git a/src/duckdb/extension/parquet/reader/row_number_column_reader.cpp b/src/duckdb/extension/parquet/reader/row_number_column_reader.cpp index b4cad1373..160aee3d8 100644 --- a/src/duckdb/extension/parquet/reader/row_number_column_reader.cpp +++ b/src/duckdb/extension/parquet/reader/row_number_column_reader.cpp @@ -7,7 +7,7 @@ namespace duckdb { //===--------------------------------------------------------------------===// // Row NumberColumn Reader //===--------------------------------------------------------------------===// -RowNumberColumnReader::RowNumberColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema) +RowNumberColumnReader::RowNumberColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema) : ColumnReader(reader, schema) { } diff --git a/src/duckdb/extension/parquet/reader/string_column_reader.cpp b/src/duckdb/extension/parquet/reader/string_column_reader.cpp index 1997e80bc..1c7d1a283 100644 --- a/src/duckdb/extension/parquet/reader/string_column_reader.cpp +++ b/src/duckdb/extension/parquet/reader/string_column_reader.cpp @@ -8,7 +8,7 @@ namespace duckdb { //===--------------------------------------------------------------------===// // String Column Reader //===--------------------------------------------------------------------===// -StringColumnReader::StringColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema) +StringColumnReader::StringColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema) : ColumnReader(reader, schema), string_column_type(GetStringColumnType(Type())) { fixed_width_string_length = 0; if (schema.parquet_type == Type::FIXED_LEN_BYTE_ARRAY) { diff --git a/src/duckdb/extension/parquet/reader/struct_column_reader.cpp b/src/duckdb/extension/parquet/reader/struct_column_reader.cpp index 996c356c9..bfc1b3ff0 100644 --- a/src/duckdb/extension/parquet/reader/struct_column_reader.cpp +++ b/src/duckdb/extension/parquet/reader/struct_column_reader.cpp @@ -5,7 +5,7 @@ namespace duckdb { //===--------------------------------------------------------------------===// // Struct Column Reader //===--------------------------------------------------------------------===// -StructColumnReader::StructColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema, +StructColumnReader::StructColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema, vector> child_readers_p) : ColumnReader(reader, schema), child_readers(std::move(child_readers_p)) { D_ASSERT(Type().InternalType() == PhysicalType::STRUCT); diff --git a/src/duckdb/extension/parquet/reader/variant_column_reader.cpp b/src/duckdb/extension/parquet/reader/variant_column_reader.cpp index c9d6b9798..1b0c93e9e 100644 --- a/src/duckdb/extension/parquet/reader/variant_column_reader.cpp +++ b/src/duckdb/extension/parquet/reader/variant_column_reader.cpp @@ -7,7 +7,7 @@ namespace duckdb { //===--------------------------------------------------------------------===// // Variant Column Reader //===--------------------------------------------------------------------===// -VariantColumnReader::VariantColumnReader(ClientContext &context, ParquetReader &reader, +VariantColumnReader::VariantColumnReader(ClientContext &context, const ParquetReader &reader, const ParquetColumnSchema &schema, vector> child_readers_p) : ColumnReader(reader, schema), context(context), child_readers(std::move(child_readers_p)) { diff --git a/src/duckdb/src/common/adbc/adbc.cpp b/src/duckdb/src/common/adbc/adbc.cpp index 338529c58..9196ba4cf 100644 --- a/src/duckdb/src/common/adbc/adbc.cpp +++ b/src/duckdb/src/common/adbc/adbc.cpp @@ -112,6 +112,7 @@ enum class IngestionMode { CREATE = 0, APPEND = 1, REPLACE = 2, CREATE_APPEND = struct DuckDBAdbcStatementWrapper { duckdb_connection connection; + duckdb::DuckDBAdbcConnectionWrapper *conn_wrapper; duckdb_prepared_statement statement; char *ingestion_table_name; char *target_catalog; @@ -122,13 +123,23 @@ struct DuckDBAdbcStatementWrapper { uint64_t plan_length; }; +struct MaterializedData { + ArrowArray *batches; + idx_t count; + idx_t current; +}; + struct DuckDBAdbcStreamWrapper { duckdb_result result; char *last_error; AdbcStatusCode status_code; AdbcError adbc_error; + MaterializedData *materialized; + duckdb::DuckDBAdbcConnectionWrapper *conn_wrapper; }; +static void MaterializeActiveStreams(duckdb::DuckDBAdbcConnectionWrapper *conn_wrapper); + static bool IsInterruptError(const char *message) { if (!message) { return false; @@ -953,6 +964,15 @@ AdbcStatusCode ConnectionInit(struct AdbcConnection *connection, struct AdbcData AdbcStatusCode ConnectionRelease(struct AdbcConnection *connection, struct AdbcError *error) { if (connection && connection->private_data) { auto conn_wrapper = static_cast(connection->private_data); + // Materialize active streams before disconnecting so they remain readable + MaterializeActiveStreams(conn_wrapper); + // Detach active streams before deleting conn_wrapper to avoid dangling pointers + for (auto *stream_wrapper : conn_wrapper->active_streams) { + if (stream_wrapper) { + stream_wrapper->conn_wrapper = nullptr; + } + } + conn_wrapper->active_streams.clear(); auto conn = reinterpret_cast(conn_wrapper->connection); duckdb_disconnect(reinterpret_cast(&conn)); delete conn_wrapper; @@ -1001,6 +1021,20 @@ static int get_next(struct ArrowArrayStream *stream, struct ArrowArray *out) { } out->release = nullptr; auto result_wrapper = static_cast(stream->private_data); + + // If the stream has been materialized, return from stored batches + if (result_wrapper->materialized) { + auto mat = result_wrapper->materialized; + if (mat->current >= mat->count) { + return DuckDBSuccess; // end of stream + } + // Transfer ownership of the batch to the caller + *out = mat->batches[mat->current]; + mat->batches[mat->current].release = nullptr; + mat->current++; + return DuckDBSuccess; + } + auto duckdb_chunk = duckdb_fetch_chunk(result_wrapper->result); if (!duckdb_chunk) { // End of stream or error; distinguish by checking the result error message. @@ -1038,6 +1072,26 @@ void release(struct ArrowArrayStream *stream) { } auto result_wrapper = reinterpret_cast(stream->private_data); if (result_wrapper) { + // Unregister from connection's active_streams + if (result_wrapper->conn_wrapper) { + auto &active = result_wrapper->conn_wrapper->active_streams; + auto it = std::find(active.begin(), active.end(), result_wrapper); + if (it != active.end()) { + active.erase(it); + } + } + // Clean up materialized data if present + if (result_wrapper->materialized) { + auto mat = result_wrapper->materialized; + for (idx_t i = mat->current; i < mat->count; i++) { + if (mat->batches[i].release) { + mat->batches[i].release(&mat->batches[i]); + } + } + free(mat->batches); + free(mat); + result_wrapper->materialized = nullptr; + } duckdb_destroy_result(&result_wrapper->result); if (result_wrapper->last_error) { free(result_wrapper->last_error); @@ -1316,6 +1370,7 @@ AdbcStatusCode StatementNew(struct AdbcConnection *connection, struct AdbcStatem auto conn_wrapper = static_cast(connection->private_data); statement_wrapper->connection = conn_wrapper->connection; + statement_wrapper->conn_wrapper = conn_wrapper; statement_wrapper->statement = nullptr; statement_wrapper->ingestion_stream.release = nullptr; statement_wrapper->ingestion_table_name = nullptr; @@ -1449,6 +1504,75 @@ static AdbcStatusCode IngestToTableFromBoundStream(DuckDBAdbcStatementWrapper *s rows_affected); } +// Materialize all active streams on a connection so that a new query can execute. +// This fetches remaining data from each streaming result into memory, making the +// streams independent of the connection's active query context. +static void MaterializeActiveStreams(duckdb::DuckDBAdbcConnectionWrapper *conn_wrapper) { + for (auto *result_wrapper : conn_wrapper->active_streams) { + if (!result_wrapper || result_wrapper->materialized) { + continue; + } + + // Collect remaining batches from the streaming result + duckdb::vector batches; + auto arrow_options = duckdb_result_get_arrow_options(&result_wrapper->result); + while (true) { + ArrowArray array; + std::memset(&array, 0, sizeof(ArrowArray)); + + auto duckdb_chunk = duckdb_fetch_chunk(result_wrapper->result); + if (!duckdb_chunk) { + break; + } + auto conversion_err = duckdb_data_chunk_to_arrow(arrow_options, duckdb_chunk, &array); + duckdb_destroy_data_chunk(&duckdb_chunk); + + if (conversion_err) { + duckdb_destroy_error_data(&conversion_err); + if (array.release) { + array.release(&array); + } + break; + } + batches.push_back(array); + } + duckdb_destroy_arrow_options(&arrow_options); + + // Store materialized data + auto mat = static_cast(malloc(sizeof(MaterializedData))); + if (!mat) { + // Allocation failed — release fetched batches and skip materialization + for (auto &batch : batches) { + if (batch.release) { + batch.release(&batch); + } + } + continue; + } + mat->current = 0; + mat->count = static_cast(batches.size()); + if (!batches.empty()) { + mat->batches = static_cast(malloc(sizeof(ArrowArray) * batches.size())); + if (!mat->batches) { + // Allocation failed — release fetched batches and skip materialization + for (auto &batch : batches) { + if (batch.release) { + batch.release(&batch); + } + } + free(mat); + continue; + } + for (idx_t i = 0; i < batches.size(); i++) { + mat->batches[i] = batches[i]; + } + } else { + mat->batches = nullptr; + } + result_wrapper->materialized = mat; + } +} + AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct ArrowArrayStream *out, int64_t *rows_affected, struct AdbcError *error) { if (!statement) { @@ -1465,6 +1589,13 @@ AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct Arr return ADBC_STATUS_INVALID_STATE; } + // Materialize any active streams on this connection before executing a new query. + // Without materialization, executing a new query would silently invalidate any existing streaming results on the + // same connection. + if (wrapper->conn_wrapper) { + MaterializeActiveStreams(wrapper->conn_wrapper); + } + // TODO: Set affected rows, careful with early return if (rows_affected) { *rows_affected = 0; @@ -1499,6 +1630,8 @@ AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct Arr } stream_wrapper->last_error = nullptr; stream_wrapper->status_code = ADBC_STATUS_OK; + stream_wrapper->materialized = nullptr; + stream_wrapper->conn_wrapper = wrapper->conn_wrapper; std::memset(&stream_wrapper->adbc_error, 0, sizeof(stream_wrapper->adbc_error)); std::memset(&stream_wrapper->result, 0, sizeof(stream_wrapper->result)); // Only process the stream if there are parameters to bind @@ -1616,6 +1749,10 @@ AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct Arr out->get_next = get_next; out->release = release; out->get_last_error = get_last_error; + // Register this stream wrapper so it can be materialized if another query runs + if (wrapper->conn_wrapper) { + wrapper->conn_wrapper->active_streams.push_back(stream_wrapper); + } } else { // Caller didn't request a stream; clean up resources duckdb_destroy_result(&stream_wrapper->result); @@ -1659,6 +1796,12 @@ AdbcStatusCode StatementSetSqlQuery(struct AdbcStatement *statement, const char } auto wrapper = static_cast(statement->private_data); + + // Materialize any active streams before preparing + if (wrapper->conn_wrapper) { + MaterializeActiveStreams(wrapper->conn_wrapper); + } + if (wrapper->ingestion_stream.release) { // Release any resources currently held by the ingestion stream before we overwrite it wrapper->ingestion_stream.release(&wrapper->ingestion_stream); diff --git a/src/duckdb/src/common/box_renderer.cpp b/src/duckdb/src/common/box_renderer.cpp index 861d59c14..45d102606 100644 --- a/src/duckdb/src/common/box_renderer.cpp +++ b/src/duckdb/src/common/box_renderer.cpp @@ -1075,6 +1075,7 @@ bool JSONParser::Process(const string &value) { state = JSONState::REGULAR; char quote_char = '"'; bool can_parse_value = false; + bool in_unquoted_value = false; pos = 0; for (; success && pos < value.size(); pos++) { auto c = value[pos]; @@ -1107,6 +1108,7 @@ bool JSONParser::Process(const string &value) { } separators.pop_back(); HandleBracketClose(c); + in_unquoted_value = false; break; } case '"': @@ -1118,6 +1120,7 @@ bool JSONParser::Process(const string &value) { case ',': // comma - move to next line HandleComma(c); + in_unquoted_value = false; break; case ':': HandleColon(); @@ -1133,11 +1136,16 @@ bool JSONParser::Process(const string &value) { HandleCharacter(c); break; case ' ': + if (in_unquoted_value) { + HandleCharacter(c); + } + break; case '\t': case '\n': // skip whitespace break; default: + in_unquoted_value = true; HandleCharacter(c); break; } diff --git a/src/duckdb/src/common/local_file_system.cpp b/src/duckdb/src/common/local_file_system.cpp index db5352f9e..7dc38f008 100644 --- a/src/duckdb/src/common/local_file_system.cpp +++ b/src/duckdb/src/common/local_file_system.cpp @@ -699,6 +699,17 @@ void LocalFileSystem::RemoveFile(const string &filename, optional_ptr(s.st_dev), data_ptr_cast(&version_tag[0])); + Store(NumericCast(s.st_ino), data_ptr_cast(&version_tag[1])); + Store(NumericCast(s.st_size), data_ptr_cast(&version_tag[2])); + Store(Timestamp::FromEpochSeconds(s.st_mtime).value, data_ptr_cast(&version_tag[3])); + + return string(char_ptr_cast(version_tag), sizeof(uint64_t) * 4); +} + bool LocalFileSystem::ListFilesExtended(const string &directory, const std::function &callback, optional_ptr opener) { @@ -741,6 +752,8 @@ bool LocalFileSystem::ListFilesExtended(const string &directory, options.emplace("file_size", Value::BIGINT(UnsafeNumericCast(status.st_size))); // last modified time options.emplace("last_modified", Value::TIMESTAMP(Timestamp::FromTimeT(status.st_mtime))); + // version tag + options.emplace("etag", Value::BLOB_RAW(GetPosixVersionTag(status))); // invoke callback callback(info); @@ -1510,15 +1523,7 @@ string LocalFileSystem::GetVersionTag(FileHandle &handle) { if (fstat(fd, &s) == -1) { throw IOException("Failed to get file size for file \"%s\": %s", handle.path, strerror(errno)); } - - // dev/ino should be enough, but to guard against in-place writes we also add file size and modification time - uint64_t version_tag[4]; - Store(NumericCast(s.st_dev), data_ptr_cast(&version_tag[0])); - Store(NumericCast(s.st_ino), data_ptr_cast(&version_tag[1])); - Store(NumericCast(s.st_size), data_ptr_cast(&version_tag[2])); - Store(Timestamp::FromEpochSeconds(s.st_mtime).value, data_ptr_cast(&version_tag[3])); - - return string(char_ptr_cast(version_tag), sizeof(uint64_t) * 4); + return GetPosixVersionTag(s); #endif } diff --git a/src/duckdb/src/common/multi_file/base_file_reader.cpp b/src/duckdb/src/common/multi_file/base_file_reader.cpp index a13dc5a58..203e93e45 100644 --- a/src/duckdb/src/common/multi_file/base_file_reader.cpp +++ b/src/duckdb/src/common/multi_file/base_file_reader.cpp @@ -11,6 +11,9 @@ shared_ptr BaseFileReader::GetUnionData(idx_t file_idx) { throw NotImplementedException("Union by name not supported for reader of type %s", GetReaderType()); } +void BaseFileReader::PrepareScan(ClientContext &, GlobalTableFunctionState &, LocalTableFunctionState &) { +} + void BaseFileReader::PrepareReader(ClientContext &context, GlobalTableFunctionState &) { } diff --git a/src/duckdb/src/function/scalar/create_sort_key.cpp b/src/duckdb/src/function/scalar/create_sort_key.cpp index 9c043d4e6..bb7bb4a02 100644 --- a/src/duckdb/src/function/scalar/create_sort_key.cpp +++ b/src/duckdb/src/function/scalar/create_sort_key.cpp @@ -45,10 +45,6 @@ unique_ptr CreateSortKeyBind(ClientContext &context, ScalarFunctio auto sort_specifier_str = sort_specifier.ToString(); result->modifiers.push_back(OrderModifiers::Parse(sort_specifier_str)); } - // push collations - for (idx_t i = 0; i < arguments.size(); i += 2) { - ExpressionBinder::PushCollation(context, arguments[i], arguments[i]->return_type); - } // check if all types are constant bool all_constant = true; idx_t constant_size = 0; diff --git a/src/duckdb/src/function/table/read_duckdb.cpp b/src/duckdb/src/function/table/read_duckdb.cpp index 9f870547b..051e107f0 100644 --- a/src/duckdb/src/function/table/read_duckdb.cpp +++ b/src/duckdb/src/function/table/read_duckdb.cpp @@ -198,6 +198,10 @@ DuckDBReader::DuckDBReader(ClientContext &context_p, OpenFileInfo file_p, const : BaseFileReader(std::move(file_p)), context(context_p), finished(false) { auto &attached = GetAttachedDatabase(); auto &catalog = attached.GetCatalog(); + if (!catalog.IsDuckCatalog()) { + throw NotImplementedException("read_duckdb can only be used to read DuckDB files - \"%s\" is of type \"%s\"", + catalog.GetDBPath(), catalog.GetCatalogType()); + } vector> tables; vector> candidate_tables; catalog.ScanSchemas(context, [&](SchemaCatalogEntry &schema) { diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index 041a4804f..703375990 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "1-dev204" +#define DUCKDB_PATCH_VERSION "1-dev332" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 5 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.5.1-dev204" +#define DUCKDB_VERSION "v1.5.1-dev332" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "3ada6fd54b" +#define DUCKDB_SOURCE_ID "31ea662805" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/include/duckdb/common/adbc/wrappers.hpp b/src/duckdb/src/include/duckdb/common/adbc/wrappers.hpp index 01d59dc90..c34ccb54f 100644 --- a/src/duckdb/src/include/duckdb/common/adbc/wrappers.hpp +++ b/src/duckdb/src/include/duckdb/common/adbc/wrappers.hpp @@ -11,11 +11,18 @@ #include "duckdb.h" #include "duckdb/common/string.hpp" #include "duckdb/common/unordered_map.hpp" +#include "duckdb/common/vector.hpp" + +namespace duckdb_adbc { +struct DuckDBAdbcStreamWrapper; +} // namespace duckdb_adbc namespace duckdb { struct DuckDBAdbcConnectionWrapper { duckdb_connection connection; unordered_map options; + //! Active stream wrappers on this connection (for materialization on concurrent execute) + vector active_streams; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/common/multi_file/base_file_reader.hpp b/src/duckdb/src/include/duckdb/common/multi_file/base_file_reader.hpp index 4ee104e2d..333d838db 100644 --- a/src/duckdb/src/include/duckdb/common/multi_file/base_file_reader.hpp +++ b/src/duckdb/src/include/duckdb/common/multi_file/base_file_reader.hpp @@ -57,7 +57,7 @@ class BaseFileReader : public enable_shared_from_this { const vector &GetColumns() const { return columns; } - const string &GetFileName() { + const string &GetFileName() const { return file.path; } virtual bool UseCastMap() const { @@ -76,8 +76,11 @@ class BaseFileReader : public enable_shared_from_this { //! Prepare reader for scanning DUCKDB_API virtual void PrepareReader(ClientContext &context, GlobalTableFunctionState &); + //! Try to initialize a scan over the reader - this is done while the global lock is held virtual bool TryInitializeScan(ClientContext &context, GlobalTableFunctionState &gstate, LocalTableFunctionState &lstate) = 0; + //! Prepare a scan - called after TryInitializeScan succeeds - this is done without any lock held + virtual void PrepareScan(ClientContext &context, GlobalTableFunctionState &gstate, LocalTableFunctionState &lstate); //! Scan a chunk from the read state virtual AsyncResult Scan(ClientContext &context, GlobalTableFunctionState &global_state, LocalTableFunctionState &local_state, DataChunk &chunk) = 0; diff --git a/src/duckdb/src/include/duckdb/common/multi_file/multi_file_function.hpp b/src/duckdb/src/include/duckdb/common/multi_file/multi_file_function.hpp index eb15df203..8e5e64c0e 100644 --- a/src/duckdb/src/include/duckdb/common/multi_file/multi_file_function.hpp +++ b/src/duckdb/src/include/duckdb/common/multi_file/multi_file_function.hpp @@ -361,7 +361,6 @@ class MultiFileFunction : public TableFunction { static void InitializeFileScanState(ClientContext &context, MultiFileReaderData &reader_data, MultiFileLocalState &lstate, vector &projection_ids) { - lstate.reader = reader_data.reader; lstate.reader_data = reader_data; auto &reader = *lstate.reader; //! Initialize the intermediate chunk to be used by the underlying reader before being finalized @@ -419,13 +418,16 @@ class MultiFileFunction : public TableFunction { if (current_reader_data.file_state == MultiFileFileState::OPEN) { if (current_reader_data.reader->TryInitializeScan(context, *gstate.global_state, *scan_data.local_state)) { - if (!current_reader_data.reader) { + scan_data.reader = current_reader_data.reader; + if (!scan_data.reader) { throw InternalException("MultiFileReader was moved"); } // The current reader has data left to be scanned scan_data.batch_index = gstate.batch_index++; auto old_file_index = scan_data.file_index; scan_data.file_index = gstate.file_index; + parallel_lock.unlock(); + scan_data.reader->PrepareScan(context, *gstate.global_state, *scan_data.local_state); if (old_file_index != scan_data.file_index) { InitializeFileScanState(context, current_reader_data, scan_data, gstate.projection_ids); } diff --git a/src/duckdb/src/include/duckdb/main/config.hpp b/src/duckdb/src/include/duckdb/main/config.hpp index 371bc86f2..74fcbe6c7 100644 --- a/src/duckdb/src/include/duckdb/main/config.hpp +++ b/src/duckdb/src/include/duckdb/main/config.hpp @@ -159,6 +159,8 @@ struct DBConfigOptions { unordered_set allowed_paths; //! Directories that are explicitly allowed, even if enable_external_access is false set allowed_directories; + //! Additional configuration options that are allowed to be changed even when the configuration is locked + case_insensitive_set_t allowed_configs; //! The log configuration LogConfig log_config = LogConfig(); //! Physical memory that the block allocator is allowed to use (this memory is never freed and cannot be reduced) @@ -301,6 +303,7 @@ struct DBConfig { static SettingLookupResult TryGetDefaultValue(optional_ptr option, Value &result); bool CanAccessFile(const string &path, FileType type); + void AddAllowedConfig(const string &config_name); void AddAllowedDirectory(const string &path); void AddAllowedPath(const string &path); string SanitizeAllowedPath(const string &path) const; diff --git a/src/duckdb/src/include/duckdb/main/extension_entries.hpp b/src/duckdb/src/include/duckdb/main/extension_entries.hpp index e5decbd1a..6ad14a255 100644 --- a/src/duckdb/src/include/duckdb/main/extension_entries.hpp +++ b/src/duckdb/src/include/duckdb/main/extension_entries.hpp @@ -1066,6 +1066,7 @@ static constexpr ExtensionEntry EXTENSION_SETTINGS[] = { {"enable_global_s3_configuration", "httpfs"}, {"enable_server_cert_verification", "httpfs"}, {"force_download", "httpfs"}, + {"force_download_threshold", "httpfs"}, {"geometry_always_xy", "spatial"}, {"hf_max_per_page", "httpfs"}, {"hnsw_ef_search", "vss"}, @@ -1080,11 +1081,19 @@ static constexpr ExtensionEntry EXTENSION_SETTINGS[] = { {"iceberg_via_aws_sdk_for_catalog_interactions", "iceberg"}, {"merge_http_secret_into_s3_request", "httpfs"}, {"mysql_bit1_as_boolean", "mysql_scanner"}, + {"mysql_compression_aware_costs", "mysql_scanner"}, + {"mysql_compression_ratio", "mysql_scanner"}, {"mysql_debug_show_queries", "mysql_scanner"}, {"mysql_enable_transactions", "mysql_scanner"}, {"mysql_experimental_filter_pushdown", "mysql_scanner"}, {"mysql_incomplete_dates_as_nulls", "mysql_scanner"}, + {"mysql_pool_acquire_mode", "mysql_scanner"}, + {"mysql_pool_size", "mysql_scanner"}, + {"mysql_pool_timeout_ms", "mysql_scanner"}, + {"mysql_push_threshold_no_index", "mysql_scanner"}, + {"mysql_push_threshold_with_index", "mysql_scanner"}, {"mysql_session_time_zone", "mysql_scanner"}, + {"mysql_thread_local_cache", "mysql_scanner"}, {"mysql_time_as_time", "mysql_scanner"}, {"mysql_tinyint1_as_boolean", "mysql_scanner"}, {"parquet_metadata_cache", "parquet"}, @@ -1116,6 +1125,7 @@ static constexpr ExtensionEntry EXTENSION_SETTINGS[] = { {"s3_version_id_pinning", "httpfs"}, {"sqlite_all_varchar", "sqlite_scanner"}, {"sqlite_debug_show_queries", "sqlite_scanner"}, + {"sqlite_disable_multithreaded_scans", "sqlite_scanner"}, {"timezone", "icu"}, {"ui_local_port", "ui"}, {"ui_polling_interval", "ui"}, diff --git a/src/duckdb/src/include/duckdb/main/settings.hpp b/src/duckdb/src/include/duckdb/main/settings.hpp index b85b00db9..812565dcd 100644 --- a/src/duckdb/src/include/duckdb/main/settings.hpp +++ b/src/duckdb/src/include/duckdb/main/settings.hpp @@ -196,6 +196,17 @@ struct AllowUnsignedExtensionsSetting { static void OnSet(SettingCallbackInfo &info, Value &input); }; +struct AllowedConfigsSetting { + using RETURN_TYPE = vector; + static constexpr const char *Name = "allowed_configs"; + static constexpr const char *Description = + "List of configuration options that are ALWAYS allowed to be changed - even when lock_configuration is true"; + static constexpr const char *InputType = "VARCHAR[]"; + static void SetGlobal(DatabaseInstance *db, DBConfig &config, const Value ¶meter); + static void ResetGlobal(DatabaseInstance *db, DBConfig &config); + static Value GetSetting(const ClientContext &context); +}; + struct AllowedDirectoriesSetting { using RETURN_TYPE = vector; static constexpr const char *Name = "allowed_directories"; @@ -1076,7 +1087,7 @@ struct LateMaterializationMaxRowsSetting { struct LockConfigurationSetting { using RETURN_TYPE = bool; static constexpr const char *Name = "lock_configuration"; - static constexpr const char *Description = "Whether or not the configuration can be altered"; + static constexpr const char *Description = "Whether or not configurations can be altered"; static constexpr const char *InputType = "BOOLEAN"; static constexpr const char *DefaultValue = "false"; static constexpr SettingScopeTarget Scope = SettingScopeTarget::GLOBAL_ONLY; diff --git a/src/duckdb/src/include/duckdb/optimizer/remove_unused_columns.hpp b/src/duckdb/src/include/duckdb/optimizer/remove_unused_columns.hpp index a260b10f5..eb5c5d826 100644 --- a/src/duckdb/src/include/duckdb/optimizer/remove_unused_columns.hpp +++ b/src/duckdb/src/include/duckdb/optimizer/remove_unused_columns.hpp @@ -22,6 +22,7 @@ namespace duckdb { class Binder; class BoundColumnRefExpression; class ClientContext; +class Optimizer; struct ReferencedExtractComponent { public: @@ -76,6 +77,8 @@ enum class BaseColumnPrunerMode : uint8_t { struct MaterializedCTEInfo { public: column_binding_map_t column_references; + unordered_set expected_readers; + unordered_set seen_readers; bool everything_referenced = true; }; @@ -126,22 +129,21 @@ class BaseColumnPruner : public LogicalOperatorVisitor { //! The RemoveUnusedColumns optimizer traverses the logical operator tree and removes any columns that are not required class RemoveUnusedColumns : public BaseColumnPruner { public: - RemoveUnusedColumns(Binder &binder, ClientContext &context, bool is_root = false, - shared_ptr> cte_info_map = nullptr) - : binder(binder), context(context), everything_referenced(is_root), cte_info_map(std::move(cte_info_map)) { - } + explicit RemoveUnusedColumns(Optimizer &optimizer); + RemoveUnusedColumns(RemoveUnusedColumns &parent, bool is_root); void VisitOperator(LogicalOperator &op) override; private: + Optimizer &optimizer; Binder &binder; ClientContext &context; //! Whether or not all the columns are referenced. This happens in the case of the root expression (because the //! output implicitly refers all the columns below it) bool everything_referenced; - shared_ptr> cte_info_map; - RemoveUnusedColumns CreateChildOptimizer(); + RemoveUnusedColumns &root; + unique_ptr> root_cte_map; private: template @@ -152,6 +154,8 @@ class RemoveUnusedColumns : public BaseColumnPruner { void WritePushdownExtractColumns( const ColumnBinding &binding, ReferencedColumn &col, idx_t original_idx, const LogicalType &column_type, const std::function cast_type)> &callback); + unordered_map &GetCTEMap(); + optional_ptr> TryGetCTEMap(); }; class CTERefPruner : public LogicalOperatorVisitor { diff --git a/src/duckdb/src/include/duckdb/optimizer/rule/predicate_factoring.hpp b/src/duckdb/src/include/duckdb/optimizer/rule/predicate_factoring.hpp new file mode 100644 index 000000000..fee63a097 --- /dev/null +++ b/src/duckdb/src/include/duckdb/optimizer/rule/predicate_factoring.hpp @@ -0,0 +1,24 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// duckdb/optimizer/rule/predicate_factoring.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/optimizer/rule.hpp" + +namespace duckdb { + +//! The Predicate Factoring rule extracts predicates on a common column from disjunctive or conjunctive clauses +class PredicateFactoringRule : public Rule { +public: + explicit PredicateFactoringRule(ExpressionRewriter &rewriter); + + unique_ptr Apply(LogicalOperator &op, vector> &bindings, bool &changes_made, + bool is_root) override; +}; + +} // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/optimizer/rule/timestamp_comparison.hpp b/src/duckdb/src/include/duckdb/optimizer/rule/timestamp_comparison.hpp index a38ed64a8..cd873b167 100644 --- a/src/duckdb/src/include/duckdb/optimizer/rule/timestamp_comparison.hpp +++ b/src/duckdb/src/include/duckdb/optimizer/rule/timestamp_comparison.hpp @@ -15,7 +15,7 @@ namespace duckdb { class TimeStampComparison : public Rule { public: - explicit TimeStampComparison(ClientContext &context, ExpressionRewriter &rewriter); + explicit TimeStampComparison(ExpressionRewriter &rewriter); unique_ptr Apply(LogicalOperator &op, vector> &bindings, bool &changes_made, bool is_root) override; diff --git a/src/duckdb/src/include/duckdb/storage/table/row_group_reorderer.hpp b/src/duckdb/src/include/duckdb/storage/table/row_group_reorderer.hpp index d145a94dd..665b9392e 100644 --- a/src/duckdb/src/include/duckdb/storage/table/row_group_reorderer.hpp +++ b/src/duckdb/src/include/duckdb/storage/table/row_group_reorderer.hpp @@ -22,9 +22,11 @@ enum class OrderByColumnType : uint8_t { NUMERIC, STRING }; struct RowGroupOrderOptions { RowGroupOrderOptions(const StorageIndex &column_idx_p, OrderByStatistics order_by_p, OrderType order_type_p, OrderByNullType null_order_p, OrderByColumnType column_type_p, - optional_idx row_limit_p = optional_idx(), idx_t row_group_offset_p = 0) + optional_idx row_limit_p = optional_idx(), idx_t row_group_offset_p = 0, + idx_t leading_null_group_offset_p = 0) : column_idx(column_idx_p), order_by(order_by_p), order_type(order_type_p), null_order(null_order_p), - column_type(column_type_p), row_limit(row_limit_p), row_group_offset(row_group_offset_p) { + column_type(column_type_p), row_limit(row_limit_p), row_group_offset(row_group_offset_p), + leading_null_group_offset(leading_null_group_offset_p) { } const StorageIndex column_idx; @@ -34,6 +36,7 @@ struct RowGroupOrderOptions { const OrderByColumnType column_type; const optional_idx row_limit; const idx_t row_group_offset; + const idx_t leading_null_group_offset; void Serialize(Serializer &serializer) const; static unique_ptr Deserialize(Deserializer &deserializer); @@ -42,6 +45,7 @@ struct RowGroupOrderOptions { struct OffsetPruningResult { idx_t offset_remainder; idx_t pruned_row_group_count; + idx_t leading_null_group_offset; }; class RowGroupReorderer { diff --git a/src/duckdb/src/include/duckdb/transaction/update_info.hpp b/src/duckdb/src/include/duckdb/transaction/update_info.hpp index cde47e2b4..b62c066f7 100644 --- a/src/duckdb/src/include/duckdb/transaction/update_info.hpp +++ b/src/duckdb/src/include/duckdb/transaction/update_info.hpp @@ -60,6 +60,10 @@ struct UpdateInfo { bool AppliesToTransaction(transaction_t start_time, transaction_t transaction_id) { // these tuples were either committed AFTER this transaction started or are not committed yet, use // tuples stored in this version + if (version_number == TRANSACTION_ID_START - 1) { + // dummy transaction number for the root element - should always match + return true; + } return version_number > start_time && version_number != transaction_id; } diff --git a/src/duckdb/src/main/config.cpp b/src/duckdb/src/main/config.cpp index 8d55ae664..0401d62f9 100644 --- a/src/duckdb/src/main/config.cpp +++ b/src/duckdb/src/main/config.cpp @@ -7,6 +7,7 @@ #include "duckdb/common/string_util.hpp" #include "duckdb/main/database.hpp" #include "duckdb/main/settings.hpp" +#include "duckdb/main/extension_helper.hpp" #include "duckdb/storage/storage_extension.hpp" #include "duckdb/common/serializer/serializer.hpp" #include "duckdb/common/exception/parser_exception.hpp" @@ -73,6 +74,7 @@ static const ConfigurationOption internal_options[] = { DUCKDB_GLOBAL(AllowPersistentSecretsSetting), DUCKDB_SETTING_CALLBACK(AllowUnredactedSecretsSetting), DUCKDB_SETTING_CALLBACK(AllowUnsignedExtensionsSetting), + DUCKDB_GLOBAL(AllowedConfigsSetting), DUCKDB_GLOBAL(AllowedDirectoriesSetting), DUCKDB_GLOBAL(AllowedPathsSetting), DUCKDB_SETTING(ArrowLargeBufferSizeSetting), @@ -202,12 +204,12 @@ static const ConfigurationOption internal_options[] = { DUCKDB_SETTING(ZstdMinStringLengthSetting), FINAL_SETTING}; -static const ConfigurationAlias setting_aliases[] = {DUCKDB_SETTING_ALIAS("memory_limit", 96), - DUCKDB_SETTING_ALIAS("null_order", 40), - DUCKDB_SETTING_ALIAS("profiling_output", 115), - DUCKDB_SETTING_ALIAS("user", 130), - DUCKDB_SETTING_ALIAS("wal_autocheckpoint", 23), - DUCKDB_SETTING_ALIAS("worker_threads", 129), +static const ConfigurationAlias setting_aliases[] = {DUCKDB_SETTING_ALIAS("memory_limit", 97), + DUCKDB_SETTING_ALIAS("null_order", 41), + DUCKDB_SETTING_ALIAS("profiling_output", 116), + DUCKDB_SETTING_ALIAS("user", 131), + DUCKDB_SETTING_ALIAS("wal_autocheckpoint", 24), + DUCKDB_SETTING_ALIAS("worker_threads", 130), FINAL_ALIAS}; vector DBConfig::GetOptions() { @@ -561,6 +563,14 @@ void DBConfig::CheckLock(const String &name) { // we are always allowed to change these settings return; } + if (!options.allowed_configs.empty()) { + auto option = GetOptionByName(name); + auto canonical_name = option ? string(option->name) : name.ToStdString(); + if (options.allowed_configs.find(canonical_name) != options.allowed_configs.end()) { + // settings that are allowed through allowed_configs + return; + } + } // not allowed! throw InvalidInputException("Cannot change configuration option \"%s\" - the configuration has been locked", name); } @@ -793,6 +803,37 @@ string DBConfig::SanitizeAllowedPath(const string &path_p) const { return result; } +void DBConfig::AddAllowedConfig(const string &config_name) { + if (config_name.empty()) { + throw InvalidInputException("Cannot provide an empty string for allowed_configs"); + } + duckdb::case_insensitive_set_t always_disallowed_config {"allowed_configs", "lock_configuration"}; + if (always_disallowed_config.find(config_name) != always_disallowed_config.end()) { + throw InvalidInputException("Cannot include '%s' in allowed_configs", config_name); + } + // Validate that the config name refers to a known setting (built-in or extension) + // and resolve aliases to canonical names + auto option = GetOptionByName(config_name); + if (option) { + // Store the canonical name so alias lookups work in CheckLock + options.allowed_configs.insert(option->name); + return; + } + ExtensionOption extension_option; + if (TryGetExtensionOption(config_name, extension_option)) { + options.allowed_configs.insert(config_name); + return; + } + // Check if the setting belongs to a known extension (even if not yet loaded) + auto extension_name = ExtensionHelper::FindExtensionInEntries(config_name, EXTENSION_SETTINGS); + if (!extension_name.empty()) { + // Accept the setting - the extension may be autoloaded later when the setting is used + options.allowed_configs.insert(config_name); + return; + } + throw InvalidInputException("Unknown configuration option '%s' in allowed_configs", config_name); +} + void DBConfig::AddAllowedDirectory(const string &path) { auto allowed_directory = SanitizeAllowedPath(path); if (allowed_directory.empty()) { diff --git a/src/duckdb/src/main/settings/custom_settings.cpp b/src/duckdb/src/main/settings/custom_settings.cpp index 67f6d9a45..853abd69f 100644 --- a/src/duckdb/src/main/settings/custom_settings.cpp +++ b/src/duckdb/src/main/settings/custom_settings.cpp @@ -183,6 +183,30 @@ void AllowUnsignedExtensionsSetting::OnSet(SettingCallbackInfo &info, Value &inp } } +//===----------------------------------------------------------------------===// +// Allowed Configs +//===----------------------------------------------------------------------===// +void AllowedConfigsSetting::SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &input) { + config.options.allowed_configs.clear(); + auto &list = ListValue::GetChildren(input); + for (auto &val : list) { + config.AddAllowedConfig(val.GetValue()); + } +} + +void AllowedConfigsSetting::ResetGlobal(DatabaseInstance *db, DBConfig &config) { + config.options.allowed_configs = DBConfigOptions().allowed_configs; +} + +Value AllowedConfigsSetting::GetSetting(const ClientContext &context) { + auto &config = DBConfig::GetConfig(context); + vector configs; + for (auto &cfg : config.options.allowed_configs) { + configs.emplace_back(cfg); + } + return Value::LIST(LogicalType::VARCHAR, std::move(configs)); +} + //===----------------------------------------------------------------------===// // Allowed Directories //===----------------------------------------------------------------------===// diff --git a/src/duckdb/src/main/user_settings.cpp b/src/duckdb/src/main/user_settings.cpp index d8cc72d44..43f4b1978 100644 --- a/src/duckdb/src/main/user_settings.cpp +++ b/src/duckdb/src/main/user_settings.cpp @@ -2,6 +2,9 @@ #include "duckdb/main/settings.hpp" #include "duckdb/common/types/string.hpp" #include "duckdb/common/types/uuid.hpp" +#ifdef __MVS__ +#include +#endif namespace duckdb { @@ -135,8 +138,13 @@ bool GlobalUserSettings::TryGetExtensionOption(const String &name, ExtensionOpti #ifndef __MINGW32__ CachedGlobalSettings &GlobalUserSettings::GetSettings() const { - // Cache of global settings - used to allow lock-free access to global settings in a thread-safe manner +// Cache of global settings - used to allow lock-free access to global settings in a thread-safe manner +#ifdef __MVS__ + static __tlssim current_cache_impl; +#define current_cache (*current_cache_impl.access()) +#else thread_local CachedGlobalSettings current_cache; +#endif const auto current_version = settings_version.load(std::memory_order_relaxed); if (!current_cache.global_user_settings || this != current_cache.global_user_settings.get() || @@ -146,6 +154,9 @@ CachedGlobalSettings &GlobalUserSettings::GetSettings() const { current_cache = CachedGlobalSettings(*this, settings_version, settings_map); } return current_cache; +#ifdef __MVS__ +#undef current_cache +#endif } hugeint_t GlobalUserSettings::GetUUID() const { diff --git a/src/duckdb/src/optimizer/common_subplan_optimizer.cpp b/src/duckdb/src/optimizer/common_subplan_optimizer.cpp index 15f823178..697b5d43f 100644 --- a/src/duckdb/src/optimizer/common_subplan_optimizer.cpp +++ b/src/duckdb/src/optimizer/common_subplan_optimizer.cpp @@ -8,6 +8,7 @@ #include "duckdb/common/serializer/binary_serializer.hpp" #include "duckdb/common/arena_containers/arena_unordered_map.hpp" #include "duckdb/common/arena_containers/arena_vector.hpp" +#include "duckdb/planner/column_binding_map.hpp" namespace duckdb { @@ -775,6 +776,28 @@ class CommonSubplanFinder { col_names.emplace_back(StringUtil::Format("%s_col_%llu", cte_name, i + 1)); } const auto &primary_subplan_bindings = primary_subplan.canonical_bindings; + column_binding_map_t primary_binding_index; + for (idx_t i = 0; i < primary_subplan_bindings.size(); i++) { + primary_binding_index.emplace(primary_subplan_bindings[i], i); + } + vector> cte_column_indexes(subplan_info.subplans.size()); + vector needs_projection(subplan_info.subplans.size(), false); + for (idx_t subplan_idx = 0; subplan_idx < subplan_info.subplans.size(); subplan_idx++) { + const auto &subplan = subplan_info.subplans[subplan_idx]; + const auto &canonical_bindings = subplan.canonical_bindings; + cte_column_indexes[subplan_idx].reserve(canonical_bindings.size()); + needs_projection[subplan_idx] = canonical_bindings.size() != types.size(); + for (idx_t i = 0; i < canonical_bindings.size(); i++) { + const auto &cb = canonical_bindings[i]; + const auto entry = primary_binding_index.find(cb); + D_ASSERT(entry != primary_binding_index.end()); // guaranteed by FilterSubplans + const auto cte_col_idx = entry->second; + // Types must match: same canonical binding = same base column = same type + D_ASSERT(subplan.op.get()->types[i] == types[cte_col_idx]); + cte_column_indexes[subplan_idx].push_back(cte_col_idx); + needs_projection[subplan_idx] = needs_projection[subplan_idx] || cte_col_idx != i; + } + } // Create CTE refs and figure out column binding replacements vector> cte_refs; @@ -788,18 +811,11 @@ class CommonSubplanFinder { } const auto old_bindings = subplan.op.get()->GetColumnBindings(); auto new_bindings = cte_refs.back()->GetColumnBindings(); - if (old_bindings.size() != new_bindings.size()) { - // Different number of output columns - project columns out - const auto &canonical_bindings = subplan.canonical_bindings; + if (needs_projection[subplan_idx]) { + // Preserve each subplan's original output order when it differs from the + // primary materialized CTE. vector> select_list; - for (auto &cb : canonical_bindings) { - idx_t cte_col_idx = 0; - for (; cte_col_idx < primary_subplan_bindings.size(); cte_col_idx++) { - if (cb == primary_subplan_bindings[cte_col_idx]) { - break; - } - } - D_ASSERT(cte_col_idx < primary_subplan_bindings.size()); + for (auto cte_col_idx : cte_column_indexes[subplan_idx]) { select_list.emplace_back(make_uniq( types[cte_col_idx], ColumnBinding(cte_ref_index, cte_col_idx))); } diff --git a/src/duckdb/src/optimizer/cte_filter_pusher.cpp b/src/duckdb/src/optimizer/cte_filter_pusher.cpp index b930cae00..143784821 100644 --- a/src/duckdb/src/optimizer/cte_filter_pusher.cpp +++ b/src/duckdb/src/optimizer/cte_filter_pusher.cpp @@ -1,5 +1,6 @@ #include "duckdb/optimizer/cte_filter_pusher.hpp" +#include "duckdb/optimizer/optimizer.hpp" #include "duckdb/optimizer/column_binding_replacer.hpp" #include "duckdb/optimizer/filter_pushdown.hpp" #include "duckdb/planner/expression/bound_conjunction_expression.hpp" @@ -108,11 +109,18 @@ void CTEFilterPusher::PushFilterIntoCTE(MaterializedCTEInfo &info) { } } - // Add the filter on top of the CTE definition and push it down + // Add the filter on top of the CTE definition and split the predicates auto new_cte = make_uniq_base(std::move(outer_expr)); + LogicalFilter::SplitPredicates(new_cte->Cast().expressions); + + // Rewrite the operator expressions before adding the child op (children should be rewritten already) + optimizer.rewriter.VisitOperator(*new_cte); new_cte->children.push_back(std::move(info.materialized_cte.children[0])); + + // Push down the filter FilterPushdown pushdown(optimizer); new_cte = pushdown.Rewrite(std::move(new_cte)); + info.materialized_cte.children[0] = std::move(new_cte); } diff --git a/src/duckdb/src/optimizer/cte_inlining.cpp b/src/duckdb/src/optimizer/cte_inlining.cpp index 116d64768..65d7ad19e 100644 --- a/src/duckdb/src/optimizer/cte_inlining.cpp +++ b/src/duckdb/src/optimizer/cte_inlining.cpp @@ -25,6 +25,18 @@ unique_ptr CTEInlining::Optimize(unique_ptr op return op; } +static idx_t CountBaseTableReferences(const LogicalOperator &op) { + idx_t number_of_references = 0; + if (op.type == LogicalOperatorType::LOGICAL_GET) { + number_of_references++; + } + for (auto &child : op.children) { + number_of_references += CountBaseTableReferences(*child); + } + + return number_of_references; +} + static idx_t CountCTEReferences(const LogicalOperator &op, idx_t cte_index) { if (op.type == LogicalOperatorType::LOGICAL_CTE_REF) { auto &cte = op.Cast(); @@ -133,6 +145,13 @@ void CTEInlining::TryInlining(unique_ptr &op) { return; } + // Check how many base table references the CTE has + auto base_table_references = CountBaseTableReferences(*op->children[0]); + + if (base_table_references > 2 && base_table_references * ref_count > 10) { + return; + } + // CTEs require full materialization before the CTE scans begin, // LIMIT and TOP_N operators cannot abort the materialization, // even if only a part of the CTE result is needed. diff --git a/src/duckdb/src/optimizer/late_materialization.cpp b/src/duckdb/src/optimizer/late_materialization.cpp index 6d1a06d9b..9c976a417 100644 --- a/src/duckdb/src/optimizer/late_materialization.cpp +++ b/src/duckdb/src/optimizer/late_materialization.cpp @@ -365,7 +365,7 @@ bool LateMaterialization::TryLateMaterialization(unique_ptr &op } // run the RemoveUnusedColumns optimizer to prune the (now) unused columns the plan - RemoveUnusedColumns unused_optimizer(optimizer.binder, optimizer.context, true); + RemoveUnusedColumns unused_optimizer(optimizer); unused_optimizer.VisitOperator(*op); return true; } diff --git a/src/duckdb/src/optimizer/optimizer.cpp b/src/duckdb/src/optimizer/optimizer.cpp index c2c4540c1..12102856a 100644 --- a/src/duckdb/src/optimizer/optimizer.cpp +++ b/src/duckdb/src/optimizer/optimizer.cpp @@ -40,6 +40,7 @@ #include "duckdb/optimizer/common_subplan_optimizer.hpp" #include "duckdb/optimizer/window_self_join.hpp" #include "duckdb/optimizer/optimizer_extension.hpp" +#include "duckdb/optimizer/rule/predicate_factoring.hpp" #include "duckdb/planner/binder.hpp" #include "duckdb/planner/planner.hpp" @@ -66,7 +67,8 @@ Optimizer::Optimizer(Binder &binder, ClientContext &context) : context(context), rewriter.rules.push_back(make_uniq(rewriter)); rewriter.rules.push_back(make_uniq(rewriter)); rewriter.rules.push_back(make_uniq(rewriter)); - rewriter.rules.push_back(make_uniq(context, rewriter)); + rewriter.rules.push_back(make_uniq(rewriter)); + rewriter.rules.push_back(make_uniq(rewriter)); #ifdef DEBUG for (auto &rule : rewriter.rules) { @@ -218,7 +220,7 @@ void Optimizer::RunBuiltInOptimizers() { // removes unused columns RunOptimizer(OptimizerType::UNUSED_COLUMNS, [&]() { - RemoveUnusedColumns unused(binder, context, true); + RemoveUnusedColumns unused(*this); unused.VisitOperator(*plan); }); diff --git a/src/duckdb/src/optimizer/pushdown/pushdown_left_join.cpp b/src/duckdb/src/optimizer/pushdown/pushdown_left_join.cpp index 1ebf3cedd..7fab6c580 100644 --- a/src/duckdb/src/optimizer/pushdown/pushdown_left_join.cpp +++ b/src/duckdb/src/optimizer/pushdown/pushdown_left_join.cpp @@ -77,10 +77,6 @@ unique_ptr FilterPushdown::PushdownLeftJoin(unique_ptr &left_bindings, unordered_set &right_bindings) { auto &join = op->Cast(); - if (op->type == LogicalOperatorType::LOGICAL_DELIM_JOIN) { - op = PushFiltersIntoDelimJoin(std::move(op)); - return FinishPushdown(std::move(op)); - } FilterPushdown left_pushdown(optimizer, convert_mark_joins), right_pushdown(optimizer, convert_mark_joins); // for a comparison join we create a FilterCombiner that checks if we can push conditions on LHS join conditions // into the RHS of the join diff --git a/src/duckdb/src/optimizer/pushdown/pushdown_semi_anti_join.cpp b/src/duckdb/src/optimizer/pushdown/pushdown_semi_anti_join.cpp index 0b937fe25..b0ff8cb7c 100644 --- a/src/duckdb/src/optimizer/pushdown/pushdown_semi_anti_join.cpp +++ b/src/duckdb/src/optimizer/pushdown/pushdown_semi_anti_join.cpp @@ -11,10 +11,6 @@ using Filter = FilterPushdown::Filter; unique_ptr FilterPushdown::PushdownSemiAntiJoin(unique_ptr op) { auto &join = op->Cast(); - if (op->type == LogicalOperatorType::LOGICAL_DELIM_JOIN) { - op = PushFiltersIntoDelimJoin(std::move(op)); - return FinishPushdown(std::move(op)); - } // push all current filters down the left side op->children[0] = Rewrite(std::move(op->children[0])); diff --git a/src/duckdb/src/optimizer/remove_unused_columns.cpp b/src/duckdb/src/optimizer/remove_unused_columns.cpp index 7c8e15560..3f8ef5630 100644 --- a/src/duckdb/src/optimizer/remove_unused_columns.cpp +++ b/src/duckdb/src/optimizer/remove_unused_columns.cpp @@ -27,12 +27,43 @@ #include "duckdb/function/scalar/struct_utils.hpp" #include "duckdb/function/scalar/variant_utils.hpp" #include "duckdb/function/scalar/nested_functions.hpp" +#include "duckdb/optimizer/optimizer.hpp" #include namespace duckdb { -RemoveUnusedColumns RemoveUnusedColumns::CreateChildOptimizer() { - return RemoveUnusedColumns(binder, context, true, cte_info_map); +static void GatherCTEScans(const idx_t cte_index, const LogicalOperator &op, unordered_set &expected_readers) { + if (op.type == LogicalOperatorType::LOGICAL_CTE_REF) { + auto &cte_scan = op.Cast(); + if (cte_scan.cte_index != cte_index) { + return; + } + expected_readers.insert(cte_scan.table_index); + } + for (auto &child : op.children) { + GatherCTEScans(cte_index, *child, expected_readers); + } +} + +RemoveUnusedColumns::RemoveUnusedColumns(Optimizer &optimizer) + : optimizer(optimizer), binder(optimizer.binder), context(optimizer.context), everything_referenced(true), + root(*this) { +} + +RemoveUnusedColumns::RemoveUnusedColumns(RemoveUnusedColumns &parent, bool is_root) + : optimizer(parent.optimizer), binder(parent.binder), context(parent.context), everything_referenced(is_root), + root(parent.root) { +} + +unordered_map &RemoveUnusedColumns::GetCTEMap() { + if (!root.root_cte_map) { + root.root_cte_map = make_uniq>(); + } + return *root.root_cte_map; +} + +optional_ptr> RemoveUnusedColumns::TryGetCTEMap() { + return root.root_cte_map.get(); } idx_t BaseColumnPruner::ReplaceBinding(ColumnBinding current_binding, ColumnBinding new_binding) { @@ -117,7 +148,7 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { // Note: We allow all optimizations (join column replacement, column pruning) to run below ROLLUP // The duplicate groups optimizer will be responsible for not breaking ROLLUP by skipping when // multiple grouping sets are present - RemoveUnusedColumns remove(binder, context, everything_referenced, cte_info_map); + RemoveUnusedColumns remove(*this, everything_referenced); remove.VisitOperatorExpressions(op); remove.VisitOperator(*op.children[0]); return; @@ -185,6 +216,12 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { } ClearUnusedExpressions(entries, setop.table_index); if (entries.size() >= setop.column_count) { + // We still need to recurse into the children to populate CTE info, etc. + for (auto &child : op.children) { + RemoveUnusedColumns remove(*this, true); + remove.VisitOperator(*child); + } + return; } if (entries.empty()) { @@ -196,7 +233,7 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { setop.column_count = entries.size(); for (idx_t child_idx = 0; child_idx < op.children.size(); child_idx++) { - RemoveUnusedColumns remove(binder, context, true, cte_info_map); + RemoveUnusedColumns remove(*this, true); auto &child = op.children[child_idx]; // we push a projection under this child that references the required columns of the union @@ -208,6 +245,7 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { expressions.push_back( make_uniq(child->types[column_idx], bindings[column_idx])); } + auto new_projection = make_uniq(binder.GenerateTableIndex(), std::move(expressions)); if (child->has_estimated_cardinality) { new_projection->SetEstimatedCardinality(child->estimated_cardinality); @@ -220,7 +258,7 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { return; } for (auto &child : op.children) { - RemoveUnusedColumns remove(binder, context, true, cte_info_map); + RemoveUnusedColumns remove(*this, true); remove.VisitOperator(*child); } return; @@ -229,7 +267,7 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { case LogicalOperatorType::LOGICAL_INTERSECT: { // for INTERSECT/EXCEPT operations we can't remove anything, just recursively visit the children for (auto &child : op.children) { - RemoveUnusedColumns remove(binder, context, true, cte_info_map); + RemoveUnusedColumns remove(*this, true); remove.VisitOperator(*child); } return; @@ -250,7 +288,7 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { } } // then recurse into the children of this projection - RemoveUnusedColumns remove(binder, context, false, cte_info_map); + RemoveUnusedColumns remove(*this, false); remove.VisitOperatorExpressions(op); remove.VisitOperator(*op.children[0]); return; @@ -264,7 +302,7 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { //! on top of them can select from only the table values being inserted. //! TODO: Push down the projections from the returning statement //! TODO: Be careful because you might be adding expressions when a user returns * - RemoveUnusedColumns remove(binder, context, true, cte_info_map); + RemoveUnusedColumns remove(*this, true); remove.VisitOperatorExpressions(op); remove.VisitOperator(*op.children[0]); return; @@ -276,7 +314,7 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { if (!op.children.empty()) { // Some LOGICAL_GET operators (e.g., table in out functions) may have a // child operator. So we recurse into it if it exists. - RemoveUnusedColumns remove(binder, context, true, cte_info_map); + RemoveUnusedColumns remove(*this, true); remove.VisitOperator(*op.children[0]); } return; @@ -299,23 +337,20 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { // to the children. However, we still need to create the cte_info_map for the recursive CTE so that column // references in the CTE body can find the correct CTE entry and mark columns as referenced. auto &rec = op.Cast(); - if (!cte_info_map) { - cte_info_map = make_shared_ptr>(); - } - (*cte_info_map).insert({rec.table_index, MaterializedCTEInfo()}); + auto &cte_info_map = GetCTEMap(); + cte_info_map.insert({rec.table_index, MaterializedCTEInfo()}); everything_referenced = true; break; } case LogicalOperatorType::LOGICAL_MATERIALIZED_CTE: { - if (!cte_info_map) { - cte_info_map = make_shared_ptr>(); - } - - auto &cte_map_ref = *cte_info_map; + auto &cte_map_ref = GetCTEMap(); auto &cte = op.Cast(); auto &cte_map_entry = cte_map_ref[cte.table_index]; - auto rhs_child_optimizer = CreateChildOptimizer(); + // Gather all scans of this CTE in the query and mark them as expected readers of this CTE + GatherCTEScans(cte.table_index, *cte.children[1], cte_map_entry.expected_readers); + cte_map_entry.everything_referenced = false; + RemoveUnusedColumns rhs_child_optimizer(*this, true); rhs_child_optimizer.VisitOperator(*cte.children[1]); unordered_set referenced_columns_in_rhs; @@ -323,26 +358,33 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { referenced_columns_in_rhs.insert(entry.first.column_index); } - if (!cte_map_entry.everything_referenced) { - auto lhs_child_optimizer = CreateChildOptimizer(); + // If we have seen all readers of this CTE, and not all columns are referenced, we can prune the left-hand side + // of the CTE. However, if we have not seen all readers, we opt to not prune, because we might miss column + // references, resulting in incorrect query results. + auto have_seen_all_readers = cte_map_entry.expected_readers == cte_map_entry.seen_readers; + + if (!have_seen_all_readers) { + // We did not traverse the entire plan. Something is wrong. + throw InternalException("CTE pruning did not traverse the entire plan. This is a bug in the optimizer."); + } + if (!cte_map_entry.everything_referenced && have_seen_all_readers) { + RemoveUnusedColumns lhs_child_optimizer(*this, true); // Construct a projection on top of the left-hand side of the CTE // that only projects the columns that are referenced in the right-hand side of the CTE cte.children[0]->ResolveOperatorTypes(); auto bindings = cte.children[0]->GetColumnBindings(); vector> expressions; + if (referenced_columns_in_rhs.empty()) { + // if we have no columns selected just select the first column + referenced_columns_in_rhs.insert(0); + } for (idx_t i = 0; i < bindings.size(); i++) { if (referenced_columns_in_rhs.find(i) != referenced_columns_in_rhs.end()) { expressions.push_back(make_uniq(cte.children[0]->types[i], bindings[i])); } } - if (expressions.empty()) { - // no columns referenced, but we can not have an empty projection, as this would - // result in an empty left-hand side of the cte - break; - } - auto projection = make_uniq(binder.GenerateTableIndex(), std::move(expressions)); projection->children.push_back(std::move(cte.children[0])); cte.children[0] = std::move(projection); @@ -363,10 +405,15 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { return; } everything_referenced = true; - break; + // We may opt out here, but we still need to traverse the left-hand side of the CTE + RemoveUnusedColumns remove(*this, true); + remove.VisitOperator(*cte.children[0]); + return; } case LogicalOperatorType::LOGICAL_CTE_REF: { auto &cte_ref = op.Cast(); + + auto cte_info_map = TryGetCTEMap(); if (!cte_info_map) { everything_referenced = true; break; @@ -376,15 +423,19 @@ void RemoveUnusedColumns::VisitOperator(LogicalOperator &op) { if (it == cte_map_ref.end()) { throw InternalException("Could not find CTE definition for CTE reference"); } + auto &cte_entry = it->second; + auto &referenced_columns = cte_entry.column_references; + + // Mark this CTE reference as a seen reader of the CTE + it->second.seen_readers.insert(cte_ref.table_index); for (auto &entry : column_references) { if (entry.first.table_index == cte_ref.table_index) { - auto &test = cte_map_ref[cte_ref.cte_index].column_references; - test.insert(entry); + referenced_columns.insert(entry); } } - cte_map_ref[cte_ref.cte_index].everything_referenced = + cte_map_ref[cte_ref.cte_index].everything_referenced |= cte_ref.chunk_types.size() == cte_map_ref[cte_ref.cte_index].column_references.size(); break; @@ -495,7 +546,6 @@ void RemoveUnusedColumns::WritePushdownExtractColumns( auto &component = struct_extract.components[depth]; auto &expr = component.cast ? *component.cast : component.extract; - optional_ptr cast_type; auto return_type = expr->return_type; auto &colref = col.bindings[struct_extract.bindings_idx]; diff --git a/src/duckdb/src/optimizer/row_group_pruner.cpp b/src/duckdb/src/optimizer/row_group_pruner.cpp index 386bbf4bf..5b1cb5ded 100644 --- a/src/duckdb/src/optimizer/row_group_pruner.cpp +++ b/src/duckdb/src/optimizer/row_group_pruner.cpp @@ -113,14 +113,6 @@ optional_ptr RowGroupPruner::FindLogicalOrder(const LogicalLimit & } auto &logical_order = current_op.get().Cast(); - for (const auto &order : logical_order.orders) { - // FIXME: the logic to handle this has now been implemented in the row group reorderer, - // but we do not enable the optimization until more thorough testing - if (order.null_order == OrderByNullType::NULLS_FIRST) { - return nullptr; - } - } - auto order_column_type = logical_order.orders[0].expression->return_type; if (!order_column_type.IsNumeric() && !order_column_type.IsTemporal() && order_column_type != LogicalType::VARCHAR) { @@ -183,8 +175,8 @@ RowGroupPruner::CreateRowGroupReordererOptions(const optional_idx row_limit, con if (!partition_stats.empty()) { auto offset_puning_result = RowGroupReorderer::GetOffsetAfterPruning( order_by, column_type, order_type, null_order, storage_index, row_offset.GetIndex(), partition_stats); - if (offset_puning_result.pruned_row_group_count > 0) { - // We can prune row groups and reduce the offset + if (offset_puning_result.pruned_row_group_count > 0 || offset_puning_result.leading_null_group_offset > 0) { + // We can prune row groups and/or reduce the offset by consuming definite NULL-only groups logical_limit.offset_val = BoundLimitNode::ConstantValue(NumericCast(offset_puning_result.offset_remainder)); @@ -193,7 +185,8 @@ RowGroupPruner::CreateRowGroupReordererOptions(const optional_idx row_limit, con } return make_uniq(storage_index, order_by, order_type, null_order, column_type, - combined_limit, offset_puning_result.pruned_row_group_count); + combined_limit, offset_puning_result.pruned_row_group_count, + offset_puning_result.leading_null_group_offset); } } } diff --git a/src/duckdb/src/optimizer/rule/predicate_factoring.cpp b/src/duckdb/src/optimizer/rule/predicate_factoring.cpp new file mode 100644 index 000000000..f9f06bd07 --- /dev/null +++ b/src/duckdb/src/optimizer/rule/predicate_factoring.cpp @@ -0,0 +1,190 @@ +#include "duckdb/optimizer/rule/predicate_factoring.hpp" + +#include "duckdb/planner/expression/bound_conjunction_expression.hpp" +#include "duckdb/planner/expression/bound_columnref_expression.hpp" +#include "duckdb/planner/expression_iterator.hpp" +#include "duckdb/planner/column_binding_map.hpp" + +namespace duckdb { + +PredicateFactoringRule::PredicateFactoringRule(ExpressionRewriter &rewriter) : Rule(rewriter) { + // Match on a ConjunctionExpression that has at least one ConjunctionExpression as a child + auto op = make_uniq(); + op->matchers.push_back(make_uniq()); + op->policy = SetMatcher::Policy::SOME; + root = std::move(op); +} + +static bool ExpressionIsDisjunction(const Expression &expr) { + return expr.GetExpressionClass() == ExpressionClass::BOUND_CONJUNCTION && + expr.GetExpressionType() == ExpressionType::CONJUNCTION_OR; +} + +static void ExtractDisjunctedPredicates(Expression &expression, vector> &disjuncted_children) { + if (ExpressionIsDisjunction(expression)) { + auto &disjunction = expression.Cast(); + for (auto &child : disjunction.children) { + ExtractDisjunctedPredicates(*child, disjuncted_children); + } + } else { + disjuncted_children.push_back(expression); + } +} + +static bool ColumnBindingIsvalid(const ColumnBinding &column_binding) { + return column_binding.table_index != DConstants::INVALID_INDEX && + column_binding.column_index != DConstants::INVALID_INDEX; +} + +static bool GetSingleColumnBinding(const Expression &expr, ColumnBinding &column_binding) { + if (expr.IsVolatile()) { + return false; + } + + column_binding.table_index = DConstants::INVALID_INDEX; + column_binding.column_index = DConstants::INVALID_INDEX; + + bool found_multiple = false; + ExpressionIterator::VisitExpression(expr, [&](const BoundColumnRefExpression &colref) { + if (!ColumnBindingIsvalid(column_binding)) { + column_binding = colref.binding; + } else if (column_binding != colref.binding) { + found_multiple = true; + } + }); + + return ColumnBindingIsvalid(column_binding) && !found_multiple; +} + +static void ExtractDisjunctedPredicates(Expression &expression, + column_binding_map_t>> &binding_map); + +static void ExtractConjunctedPredicates(BoundConjunctionExpression &conjunction, + column_binding_map_t>> &binding_map) { + D_ASSERT(conjunction.GetExpressionType() == ExpressionType::CONJUNCTION_AND); + for (auto &conjunction_child : conjunction.children) { + if (conjunction_child->GetExpressionClass() == ExpressionClass::BOUND_CONJUNCTION && + conjunction_child->GetExpressionType() == ExpressionType::CONJUNCTION_AND) { + ExtractConjunctedPredicates(conjunction_child->Cast(), binding_map); + } else { + ExtractDisjunctedPredicates(*conjunction_child, binding_map); + } + } +} + +static void ExtractDisjunctedPredicates(Expression &expression, + column_binding_map_t>> &binding_map) { + if (expression.GetExpressionClass() == ExpressionClass::BOUND_CONJUNCTION && + expression.GetExpressionType() == ExpressionType::CONJUNCTION_AND) { + ExtractConjunctedPredicates(expression.Cast(), binding_map); + } else { + ColumnBinding single_binding; + if (GetSingleColumnBinding(expression, single_binding)) { + binding_map[single_binding].push_back(expression); + } + } +} + +static column_binding_map_t>> GetDisjunctedPredicateMap(Expression &expression) { + vector> disjuncted_children; + ExtractDisjunctedPredicates(expression, disjuncted_children); + D_ASSERT(disjuncted_children.size() > 1); + + // Extract predicates of the first child + auto &first_child = disjuncted_children[0].get(); + D_ASSERT(!ExpressionIsDisjunction(first_child)); + column_binding_map_t>> remaining_binding_map; + ExtractDisjunctedPredicates(first_child, remaining_binding_map); + + for (idx_t child_idx = 1; child_idx < disjuncted_children.size(); child_idx++) { + auto &child = disjuncted_children[child_idx].get(); + D_ASSERT(!ExpressionIsDisjunction(child)); + column_binding_map_t>> child_binding_map; + ExtractDisjunctedPredicates(child, child_binding_map); + + // Bindings must appear in both maps to be considered for predicate factoring + for (auto it = remaining_binding_map.begin(); it != remaining_binding_map.end();) { + auto child_it = child_binding_map.find(it->first); + if (child_it == child_binding_map.end()) { + it = remaining_binding_map.erase(it); + } else { + for (auto &new_predicate : child_it->second) { + bool found = false; + for (auto &existing_predicate : it->second) { + if (new_predicate.get().Equals(existing_predicate.get())) { + found = true; + break; + } + } + if (!found) { + it->second.push_back(new_predicate.get()); + } + } + it++; + } + } + } + + return remaining_binding_map; +} + +unique_ptr PredicateFactoringRule::Apply(LogicalOperator &op, vector> &bindings, + bool &changes_made, bool is_root) { + // Only applies to top-level FILTER expressions + if ((op.type != LogicalOperatorType::LOGICAL_FILTER && op.type != LogicalOperatorType::LOGICAL_ANY_JOIN) || + !is_root) { + return nullptr; + } + + // The expression must be an OR TODO: could also implement some common expression extraction for AND + if (!ExpressionIsDisjunction(bindings[0])) { + return nullptr; + } + + ColumnBinding column_binding; + if (GetSingleColumnBinding(bindings[0], column_binding)) { + return nullptr; // If it only applies to one column binding it can be pushed down already + } + + // Extract map from single column binding to expression it is contained in + const auto binding_map = GetDisjunctedPredicateMap(bindings[0].get()); + if (binding_map.empty()) { + return nullptr; // None qualify + } + + unique_ptr derived_filter; + for (auto &entry : binding_map) { + D_ASSERT(!entry.second.empty()); + + // Create disjunction on single-column predicates + unique_ptr column_filter; + for (auto &expr : entry.second) { + if (!column_filter) { + column_filter = expr.get().Copy(); + } else { + auto new_disjunction = make_uniq(ExpressionType::CONJUNCTION_OR); + new_disjunction->children.push_back(std::move(column_filter)); + new_disjunction->children.push_back(expr.get().Copy()); + column_filter = std::move(new_disjunction); + } + } + + // Conjunct each single-column predicate together + if (!derived_filter) { + derived_filter = std::move(column_filter); + } else { + auto new_conjunction = make_uniq(ExpressionType::CONJUNCTION_AND); + new_conjunction->children.push_back(std::move(column_filter)); + new_conjunction->children.push_back(std::move(derived_filter)); + derived_filter = std::move(new_conjunction); + } + } + + // Now add the derived filter as an AND to the original expression + auto result = make_uniq(ExpressionType::CONJUNCTION_AND); + result->children.push_back(bindings[0].get().Copy()); + result->children.push_back(std::move(derived_filter)); + return std::move(result); +} + +} // namespace duckdb diff --git a/src/duckdb/src/optimizer/rule/timestamp_comparison.cpp b/src/duckdb/src/optimizer/rule/timestamp_comparison.cpp index 2a0f67ebf..89e9c13bb 100644 --- a/src/duckdb/src/optimizer/rule/timestamp_comparison.cpp +++ b/src/duckdb/src/optimizer/rule/timestamp_comparison.cpp @@ -16,8 +16,7 @@ namespace duckdb { -TimeStampComparison::TimeStampComparison(ClientContext &context, ExpressionRewriter &rewriter) - : Rule(rewriter), context(context) { +TimeStampComparison::TimeStampComparison(ExpressionRewriter &rewriter) : Rule(rewriter), context(rewriter.context) { // match on a ComparisonExpression that is an Equality and has a VARCHAR and ENUM as its children auto op = make_uniq(); op->policy = SetMatcher::Policy::UNORDERED; diff --git a/src/duckdb/src/optimizer/statistics/operator/propagate_aggregate.cpp b/src/duckdb/src/optimizer/statistics/operator/propagate_aggregate.cpp index b44a9f5d5..53b50ad21 100644 --- a/src/duckdb/src/optimizer/statistics/operator/propagate_aggregate.cpp +++ b/src/duckdb/src/optimizer/statistics/operator/propagate_aggregate.cpp @@ -74,6 +74,9 @@ bool TryGetValueFromStats(const PartitionStatistics &stats, const StorageIndex & return false; } auto column_stats = stats.partition_row_group->GetColumnStatistics(storage_index); + if (!column_stats) { + return false; + } if (!stats.partition_row_group->MinMaxIsExact(*column_stats, storage_index)) { return false; } diff --git a/src/duckdb/src/optimizer/topn_window_elimination.cpp b/src/duckdb/src/optimizer/topn_window_elimination.cpp index be5f194ee..6dbf692a3 100644 --- a/src/duckdb/src/optimizer/topn_window_elimination.cpp +++ b/src/duckdb/src/optimizer/topn_window_elimination.cpp @@ -223,7 +223,7 @@ unique_ptr TopNWindowElimination::OptimizeInternal(unique_ptr(std::move(op)); @@ -673,7 +673,9 @@ void TopNWindowElimination::UpdateTopmostBindings(const idx_t window_idx, const const vector &topmost_bindings, vector &new_bindings, ColumnBindingReplacer &replacer) { - // The top-most operator's column order is [group][aggregate args][row number]. Now, set the new resulting bindings. + // The top-most operator's column order is: + // [projected groups][aggregate args/value][row number] + // Now set the new bindings according to this order and remember replacements in replacer D_ASSERT(topmost_bindings.size() == new_bindings.size()); replacer.replacement_bindings.reserve(new_bindings.size()); set row_id_binding_idxs; @@ -682,17 +684,20 @@ void TopNWindowElimination::UpdateTopmostBindings(const idx_t window_idx, const const idx_t aggregate_table_idx = GetAggregateIdx(op); // Project the group columns + const auto compact_group_columns = group_table_idx == aggregate_table_idx; idx_t current_column_idx = 0; for (auto group_idx : group_idxs) { - const idx_t group_referencing_idx = group_idx.first; + const auto group_referencing_idx = group_idx.first; + const auto column_idx = compact_group_columns ? current_column_idx : group_idx.second; new_bindings[group_referencing_idx].table_index = group_table_idx; - new_bindings[group_referencing_idx].column_index = group_idx.second; + new_bindings[group_referencing_idx].column_index = column_idx; + replacer.replacement_bindings.emplace_back(topmost_bindings[group_referencing_idx], new_bindings[group_referencing_idx]); current_column_idx++; } - if (group_table_idx != aggregate_table_idx) { + if (!compact_group_columns) { // If the topmost operator is an aggregate, the table indexes are different, and we start back from 0 current_column_idx = 0; } diff --git a/src/duckdb/src/optimizer/window_self_join.cpp b/src/duckdb/src/optimizer/window_self_join.cpp index c0c8d2a51..adcb7715e 100644 --- a/src/duckdb/src/optimizer/window_self_join.cpp +++ b/src/duckdb/src/optimizer/window_self_join.cpp @@ -192,9 +192,15 @@ unique_ptr WindowSelfJoinOptimizer::OptimizeInternal(unique_ptr auto &partitions = w_expr0.partitions; // --- Transformation --- - + // try to copy the LHS + unique_ptr copy_child; + try { + copy_child = window.children[0]->Copy(optimizer.context); + } catch (...) { + // failed to copy the LHS - cannot run this optimizer + return op; + } auto original_child = std::move(window.children[0]); - auto copy_child = original_child->Copy(optimizer.context); // Rebind copy_child to avoid duplicate table indices WindowSelfJoinTableRebinder rebinder(optimizer); diff --git a/src/duckdb/src/parallel/async_result.cpp b/src/duckdb/src/parallel/async_result.cpp index a32086b84..150e791ef 100644 --- a/src/duckdb/src/parallel/async_result.cpp +++ b/src/duckdb/src/parallel/async_result.cpp @@ -66,11 +66,15 @@ AsyncResult::AsyncResult(vector> &&tasks) } AsyncResult &AsyncResult::operator=(duckdb::SourceResultType t) { - return operator=(AsyncResult(t)); + result_type = GetAsyncResultType(t); + async_tasks.clear(); + return *this; } AsyncResult &AsyncResult::operator=(duckdb::AsyncResultType t) { - return operator=(AsyncResult(t)); + result_type = t; + async_tasks.clear(); + return *this; } AsyncResult &AsyncResult::operator=(AsyncResult &&other) noexcept { diff --git a/src/duckdb/src/parallel/pipeline_executor.cpp b/src/duckdb/src/parallel/pipeline_executor.cpp index 7b1ad5424..cb37fdb31 100644 --- a/src/duckdb/src/parallel/pipeline_executor.cpp +++ b/src/duckdb/src/parallel/pipeline_executor.cpp @@ -111,14 +111,12 @@ bool PipelineExecutor::TryFlushCachingOperators(ExecutionBudget &chunk_budget) { return false; } case OperatorResultType::NEED_MORE_INPUT: - continue; case OperatorResultType::FINISHED: break; default: throw InternalException("Unexpected OperatorResultType (%s) in TryFlushCachingOperators", EnumUtil::ToString(push_result)); } - break; } return true; } diff --git a/src/duckdb/src/planner/binder/statement/bind_insert.cpp b/src/duckdb/src/planner/binder/statement/bind_insert.cpp index 0d44ac5ac..5579d73d7 100644 --- a/src/duckdb/src/planner/binder/statement/bind_insert.cpp +++ b/src/duckdb/src/planner/binder/statement/bind_insert.cpp @@ -174,34 +174,36 @@ void DoUpdateSetQualify(unique_ptr &expr, const string &table_ } unique_ptr CreateSetInfoForReplace(TableCatalogEntry &table, InsertStatement &insert, - TableStorageInfo &storage_info) { + const TableStorageInfo &storage_info) { auto set_info = make_uniq(); auto &columns = set_info->columns; - // Figure out which columns are indexed on - - unordered_set indexed_columns; + // REPLACE is rewritten to UPDATE. Conflict-key columns are used to match existing rows and + // should not be part of the SET list. + unordered_set conflict_columns; for (auto &index : storage_info.index_info) { + if (!index.is_unique) { + continue; + } for (auto &column_id : index.column_set) { - indexed_columns.insert(column_id); + conflict_columns.insert(column_id); } } auto &column_list = table.GetColumns(); if (insert.columns.empty()) { for (auto &column : column_list.Physical()) { - auto &name = column.Name(); // FIXME: can these column names be aliased somehow? - if (indexed_columns.count(column.Oid())) { + if (conflict_columns.count(column.Oid())) { continue; } - columns.push_back(name); + columns.push_back(column.Name()); } } else { // a list of columns was explicitly supplied, only update those for (auto &name : insert.columns) { auto &column = column_list.GetColumn(name); - if (indexed_columns.count(column.Oid())) { + if (conflict_columns.count(column.Oid())) { continue; } columns.push_back(name); diff --git a/src/duckdb/src/storage/serialization/serialize_nodes.cpp b/src/duckdb/src/storage/serialization/serialize_nodes.cpp index f6e4f714e..b38fcc7f1 100644 --- a/src/duckdb/src/storage/serialization/serialize_nodes.cpp +++ b/src/duckdb/src/storage/serialization/serialize_nodes.cpp @@ -467,6 +467,7 @@ void RowGroupOrderOptions::Serialize(Serializer &serializer) const { serializer.WriteProperty(104, "column_type", column_type); serializer.WriteProperty(105, "row_limit", row_limit); serializer.WritePropertyWithDefault(106, "row_group_offset", row_group_offset); + serializer.WritePropertyWithDefault(107, "leading_null_group_offset", leading_null_group_offset); } unique_ptr RowGroupOrderOptions::Deserialize(Deserializer &deserializer) { @@ -477,7 +478,8 @@ unique_ptr RowGroupOrderOptions::Deserialize(Deserializer auto column_type = deserializer.ReadProperty(104, "column_type"); auto row_limit = deserializer.ReadProperty(105, "row_limit"); auto row_group_offset = deserializer.ReadPropertyWithDefault(106, "row_group_offset"); - auto result = duckdb::unique_ptr(new RowGroupOrderOptions(column_idx, order_by, order_type, null_order, column_type, row_limit, row_group_offset)); + auto leading_null_group_offset = deserializer.ReadPropertyWithDefault(107, "leading_null_group_offset"); + auto result = duckdb::unique_ptr(new RowGroupOrderOptions(column_idx, order_by, order_type, null_order, column_type, row_limit, row_group_offset, leading_null_group_offset)); return result; } diff --git a/src/duckdb/src/storage/table/row_group_reorderer.cpp b/src/duckdb/src/storage/table/row_group_reorderer.cpp index 7c093a0ff..314848d59 100644 --- a/src/duckdb/src/storage/table/row_group_reorderer.cpp +++ b/src/duckdb/src/storage/table/row_group_reorderer.cpp @@ -14,6 +14,10 @@ struct RowGroupOffsetEntry { unique_ptr stats; }; +bool IsNullOnly(const BaseStatistics &stats) { + return !stats.CanHaveNoNull(); +} + bool CompareValues(const Value &v1, const Value &v2, const OrderByStatistics order) { return (order == OrderByStatistics::MAX && v1 < v2) || (order == OrderByStatistics::MIN && v1 > v2); } @@ -88,10 +92,11 @@ void AddRowGroups(multimap &row_group_map, It i } } -template -It SkipOffsetPrunedRowGroups(It it, const idx_t row_group_offset) { - for (idx_t i = 0; i < row_group_offset; i++) { +template +It SkipOffsetPrunedRowGroups(It it, End end, idx_t row_group_offset) { + while (row_group_offset > 0 && it != end) { ++it; + row_group_offset--; } return it; } @@ -107,20 +112,23 @@ void SetRowGroupVector(multimap &row_group_map, const idx_t row_group_offset, const OrderType order_type, const OrderByColumnType column_type, vector>> &ordered_row_groups) { const auto stat_type = order_type == OrderType::ASCENDING ? OrderByStatistics::MIN : OrderByStatistics::MAX; - ordered_row_groups.reserve(row_group_map.size()); - - Value previous_key; if (order_type == OrderType::ASCENDING) { - auto it = SkipOffsetPrunedRowGroups(row_group_map.begin(), row_group_offset); auto end = row_group_map.end(); + auto it = SkipOffsetPrunedRowGroups(row_group_map.begin(), end, row_group_offset); + if (it == end) { + return; + } if (row_limit.IsValid()) { AddRowGroups(row_group_map, it, end, ordered_row_groups, row_limit.GetIndex(), column_type, stat_type); } else { InsertAllRowGroups(it, end, ordered_row_groups); } } else { - auto it = SkipOffsetPrunedRowGroups(row_group_map.rbegin(), row_group_offset); auto end = row_group_map.rend(); + auto it = SkipOffsetPrunedRowGroups(row_group_map.rbegin(), end, row_group_offset); + if (it == end) { + return; + } if (row_limit.IsValid()) { AddRowGroups(row_group_map, it, end, ordered_row_groups, row_limit.GetIndex(), column_type, stat_type); } else { @@ -129,9 +137,19 @@ void SetRowGroupVector(multimap &row_group_map, } } +void AppendRowGroups(const vector>> &source, idx_t offset, + vector>> &target) { + for (idx_t i = offset; i < source.size(); i++) { + target.push_back(source[i]); + } +} + template OffsetPruningResult FindOffsetPrunableChunks(It it, End end, const OrderByStatistics order_by, const OrderByColumnType column_type, const idx_t row_offset) { + if (it == end) { + return {row_offset, 0, 0}; + } const auto opposite_stat_type = order_by == OrderByStatistics::MAX ? OrderByStatistics::MIN : OrderByStatistics::MAX; @@ -164,6 +182,9 @@ OffsetPruningResult FindOffsetPrunableChunks(It it, End end, const OrderByStatis new_row_offset -= last_unresolved_entry->second.count; ++last_unresolved_entry; + if (last_unresolved_entry == end) { + return {new_row_offset, pruned_row_group_count, 0}; + } auto &upcoming_stats = *last_unresolved_entry->second.stats; last_unresolved_boundary = RowGroupReorderer::RetrieveStat(upcoming_stats, opposite_stat_type, column_type); } @@ -229,35 +250,64 @@ OffsetPruningResult RowGroupReorderer::GetOffsetAfterPruning(const OrderByStatis const StorageIndex &storage_index, const idx_t row_offset, vector &stats) { multimap ordered_row_groups; + idx_t new_row_offset = row_offset; + idx_t leading_null_group_offset = 0; + bool encountered_maybe_null_group = false; for (auto &partition_stats : stats) { if (partition_stats.count_type == CountType::COUNT_APPROXIMATE || !partition_stats.partition_row_group) { - return {row_offset, 0}; + return {row_offset, 0, 0}; } auto column_stats = partition_stats.partition_row_group->GetColumnStatistics(storage_index); + if (null_order == OrderByNullType::NULLS_FIRST && IsNullOnly(*column_stats)) { + if (new_row_offset < partition_stats.count) { + return {new_row_offset, 0, leading_null_group_offset}; + } + new_row_offset -= partition_stats.count; + leading_null_group_offset++; + continue; + } Value comparison_value = RetrieveStat(*column_stats, order_by, column_type); if (comparison_value.IsNull()) { - if (null_order == OrderByNullType::NULLS_LAST) { + if (null_order == OrderByNullType::NULLS_LAST && IsNullOnly(*column_stats)) { // With NULLS_LAST, null-stats row groups are scanned after all non-null row groups. // They fall outside the offset range being pruned here, so skip them. continue; } - // With NULLS_FIRST, null-stats row groups come first and would need to be counted - // toward the offset before non-null groups. This case is not yet supported. - return {row_offset, 0}; + // The row group might still contribute ordered values, but we cannot position it safely. + if (null_order == OrderByNullType::NULLS_FIRST) { + encountered_maybe_null_group = true; + continue; + } + return {new_row_offset, 0, leading_null_group_offset}; + } + if (null_order == OrderByNullType::NULLS_FIRST && column_stats->CanHaveNull()) { + // Groups that might contain NULLs are scanned before definitely non-null groups with NULLS_FIRST. + // Without exact NULL counts, they block further offset pruning into the non-null region. + encountered_maybe_null_group = true; + continue; } auto entry = RowGroupOffsetEntry {partition_stats.count, std::move(column_stats)}; ordered_row_groups.emplace(comparison_value, std::move(entry)); } + if (null_order == OrderByNullType::NULLS_FIRST && encountered_maybe_null_group) { + return {new_row_offset, 0, leading_null_group_offset}; + } switch (order_type) { - case OrderType::ASCENDING: - return FindOffsetPrunableChunks(ordered_row_groups.begin(), ordered_row_groups.end(), order_by, column_type, - row_offset); - case OrderType::DESCENDING: - return FindOffsetPrunableChunks(ordered_row_groups.rbegin(), ordered_row_groups.rend(), order_by, column_type, - row_offset); + case OrderType::ASCENDING: { + auto result = FindOffsetPrunableChunks(ordered_row_groups.begin(), ordered_row_groups.end(), order_by, + column_type, new_row_offset); + result.leading_null_group_offset = leading_null_group_offset; + return result; + } + case OrderType::DESCENDING: { + auto result = FindOffsetPrunableChunks(ordered_row_groups.rbegin(), ordered_row_groups.rend(), order_by, + column_type, new_row_offset); + result.leading_null_group_offset = leading_null_group_offset; + return result; + } default: throw InternalException("Unsupported order type in GetOffsetAfterPruning"); } @@ -273,42 +323,40 @@ optional_ptr> RowGroupReorderer::GetRootSegment(RowGroupSe initialized = true; - vector>> remaining_row_groups; + vector>> null_only_groups; + vector>> ambiguous_groups; multimap row_group_map; for (auto &row_group : row_groups.SegmentNodes()) { auto stats = row_group.GetNode().GetStatistics(options.column_idx); + if (IsNullOnly(*stats)) { + null_only_groups.push_back(row_group); + continue; + } Value comparison_value = RetrieveStat(*stats, options.order_by, options.column_type); - if (comparison_value.IsNull()) { - // no stats for this row group - push to remaining - remaining_row_groups.push_back(row_group); + if (comparison_value.IsNull() || (options.null_order == OrderByNullType::NULLS_FIRST && stats->CanHaveNull())) { + // No trustworthy ordering statistics, or the row group might still contribute NULLs that must be scanned + // before definitely non-null groups with NULLS_FIRST. + ambiguous_groups.push_back(row_group); continue; } auto entry = RowGroupSegmentNodeEntry {row_group, std::move(stats)}; row_group_map.emplace(comparison_value, std::move(entry)); } - if (row_group_map.empty()) { - // All row groups have unknown/null stats - scan them all without reordering - for (auto &remaining_row_group : remaining_row_groups) { - ordered_row_groups.push_back(remaining_row_group); - } - if (ordered_row_groups.empty()) { - return nullptr; - } - return ordered_row_groups[0].get(); - } - - D_ASSERT(row_group_map.size() > options.row_group_offset); - SetRowGroupVector(row_group_map, options.row_limit, options.row_group_offset, options.order_type, - options.column_type, ordered_row_groups); - // Push null-stats row groups in the correct position based on null ordering if (options.null_order == OrderByNullType::NULLS_FIRST) { - ordered_row_groups.insert(ordered_row_groups.begin(), remaining_row_groups.begin(), remaining_row_groups.end()); + AppendRowGroups(null_only_groups, options.leading_null_group_offset, ordered_row_groups); + AppendRowGroups(ambiguous_groups, 0, ordered_row_groups); + SetRowGroupVector(row_group_map, options.row_limit, options.row_group_offset, options.order_type, + options.column_type, ordered_row_groups); } else { - // NULLS_LAST: append null-stats row groups after sorted row groups - for (auto &remaining_row_group : remaining_row_groups) { - ordered_row_groups.push_back(remaining_row_group); - } + SetRowGroupVector(row_group_map, options.row_limit, options.row_group_offset, options.order_type, + options.column_type, ordered_row_groups); + AppendRowGroups(ambiguous_groups, 0, ordered_row_groups); + AppendRowGroups(null_only_groups, 0, ordered_row_groups); + } + + if (ordered_row_groups.empty()) { + return nullptr; } return ordered_row_groups[0].get(); diff --git a/src/duckdb/src/storage/table/struct_column_data.cpp b/src/duckdb/src/storage/table/struct_column_data.cpp index 0ab6ab90e..8f6b13162 100644 --- a/src/duckdb/src/storage/table/struct_column_data.cpp +++ b/src/duckdb/src/storage/table/struct_column_data.cpp @@ -144,7 +144,11 @@ static void ScanChild(ColumnScanState &state, Vector &result, const std::functio idx_t StructColumnData::Scan(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result, idx_t target_count) { - auto scan_count = validity->Scan(transaction, vector_index, state.child_states[0], result, target_count); + idx_t scan_count; + if (!state.storage_index.IsPushdownExtract()) { + // if we are scanning the entire struct we need to scan the validity + scan_count = validity->Scan(transaction, vector_index, state.child_states[0], result, target_count); + } auto struct_children = GetStructChildren(state); for (auto &child : struct_children) { auto &target_vector = GetFieldVectorForScan(result, child.vector_index); @@ -155,15 +159,19 @@ idx_t StructColumnData::Scan(TransactionData transaction, idx_t vector_index, Co continue; } ScanChild(state, target_vector, [&](Vector &child_result) { - return child.col.Scan(transaction, vector_index, child.state, child_result, target_count); + scan_count = child.col.Scan(transaction, vector_index, child.state, child_result, target_count); + return scan_count; }); } return scan_count; } idx_t StructColumnData::ScanCount(ColumnScanState &state, Vector &result, idx_t count, idx_t result_offset) { - auto scan_count = validity->ScanCount(state.child_states[0], result, count); - + idx_t scan_count; + if (!state.storage_index.IsPushdownExtract()) { + // if we are scanning the entire struct we need to scan the validity + scan_count = validity->ScanCount(state.child_states[0], result, count); + } auto struct_children = GetStructChildren(state); for (auto &child : struct_children) { auto &target_vector = GetFieldVectorForScan(result, child.vector_index); @@ -174,7 +182,8 @@ idx_t StructColumnData::ScanCount(ColumnScanState &state, Vector &result, idx_t continue; } ScanChild(state, target_vector, [&](Vector &child_result) { - return child.col.ScanCount(child.state, child_result, count, result_offset); + scan_count = child.col.ScanCount(child.state, child_result, count, result_offset); + return scan_count; }); } return scan_count; diff --git a/src/duckdb/src/storage/table/validity_column_data.cpp b/src/duckdb/src/storage/table/validity_column_data.cpp index 51101786e..9971c9b10 100644 --- a/src/duckdb/src/storage/table/validity_column_data.cpp +++ b/src/duckdb/src/storage/table/validity_column_data.cpp @@ -28,7 +28,8 @@ void ValidityColumnData::UpdateWithBase(TransactionData transaction, DataTable & CompressionType::COMPRESSION_EMPTY) { // The validity is actually covered by the data, so we read it to get the validity for UpdateInternal. ColumnScanState data_scan_state(nullptr); - auto fetch_count = base.Fetch(data_scan_state, row_ids[0], base_vector); + auto fetch_count = + base.Fetch(data_scan_state, row_ids[0] - UnsafeNumericCast(row_group_start), base_vector); base_vector.Flatten(fetch_count); } diff --git a/src/duckdb/third_party/httplib/httplib.hpp b/src/duckdb/third_party/httplib/httplib.hpp index 0fa028598..06e2d9406 100644 --- a/src/duckdb/third_party/httplib/httplib.hpp +++ b/src/duckdb/third_party/httplib/httplib.hpp @@ -203,6 +203,12 @@ "cpp-httplib doesn't support platforms where size_t is less than 64 bits." #endif +#ifdef __MVS__ +#define REGEX_SCOPE static +#else +#define REGEX_SCOPE thread_local +#endif + /* * Headers */ @@ -266,6 +272,7 @@ using socklen_t = int; #endif #ifdef __MVS__ #include +#include #ifndef NI_MAXHOST #define NI_MAXHOST 1025 #endif @@ -2946,7 +2953,7 @@ inline std::string encode_path(const std::string &s) { inline std::string file_extension(const std::string &path) { Match m; - thread_local auto re = Regex("\\.([a-zA-Z0-9]+)$"); + REGEX_SCOPE auto re = Regex("\\.([a-zA-Z0-9]+)$"); if (duckdb_re2::RegexSearch(path, m, re)) { return m[1].str(); } return std::string(); } @@ -4927,7 +4934,7 @@ inline ReadContentResult read_content_chunked(Stream &strm, T &x, if (!line_reader.getline()) { return ReadContentResult::Success; } // RFC 7230 Section 4.1.2 - Headers prohibited in trailers - thread_local case_ignore::unordered_set prohibited_trailers = { + static const case_ignore::unordered_set prohibited_trailers = { // Message framing "transfer-encoding", "content-length", @@ -5715,7 +5722,7 @@ class FormDataParser { file_.content_type = trim_copy(header.substr(str_len(header_content_type))); } else { - thread_local const Regex re_content_disposition( + REGEX_SCOPE const Regex re_content_disposition( R"~(^Content-Disposition:\s*form-data;\s*(.*)$)~", duckdb_re2::RegexOptions::CASE_INSENSITIVE); @@ -5738,7 +5745,7 @@ class FormDataParser { it = params.find("filename*"); if (it != params.end()) { // Only allow UTF-8 encoding... - thread_local const Regex re_rfc5987_encoding( + REGEX_SCOPE const Regex re_rfc5987_encoding( R"~(^UTF-8''(.+?)$)~", duckdb_re2::RegexOptions::CASE_INSENSITIVE); Match m2; @@ -6475,7 +6482,7 @@ inline bool parse_www_authenticate(const Response &res, bool is_proxy) { auto auth_key = is_proxy ? "Proxy-Authenticate" : "WWW-Authenticate"; if (res.has_header(auth_key)) { - thread_local auto re = + REGEX_SCOPE auto re = Regex(R"~((?:(?:,\s*)?(.+?)=(?:"(.*?)"|([^,]*))))~"); auto s = res.get_header_value(auth_key); auto pos = s.find(' '); @@ -6793,7 +6800,7 @@ inline std::string decode_query_component(const std::string &component, inline std::string append_query_params(const std::string &path, const Params ¶ms) { std::string path_with_query = path; - thread_local const Regex re("[^?]+\\?.*"); + REGEX_SCOPE const Regex re("[^?]+\\?.*"); auto delm = RegexMatch(path, re) ? '&' : '?'; path_with_query += delm + detail::params_to_query_str(params); return path_with_query; @@ -8814,9 +8821,9 @@ inline bool ClientImpl::read_response_line(Stream &strm, const Request &req, if (!line_reader.getline()) { return false; } #ifdef CPPHTTPLIB_ALLOW_LF_AS_LINE_TERMINATOR - thread_local const Regex re("(HTTP/1\\.[01]) (\\d{3})(?: (.*?))?\r?\n"); + REGEX_SCOPE const Regex re("(HTTP/1\\.[01]) (\\d{3})(?: (.*?))?\r?\n"); #else - thread_local const Regex re("(HTTP/1\\.[01]) (\\d{3})(?: (.*?))?\r\n"); + REGEX_SCOPE const Regex re("(HTTP/1\\.[01]) (\\d{3})(?: (.*?))?\r\n"); #endif Match m; @@ -9071,7 +9078,7 @@ inline bool ClientImpl::redirect(Request &req, Response &res, Error &error) { return true; } - thread_local const Regex re( + REGEX_SCOPE const Regex re( R"((?:(https?):)?(?://(?:\[([a-fA-F\d:]+)\]|([^:/?#]+))(?::(\d+))?)?([^?#]*)(\?[^#]*)?(?:#.*)?)"); Match m; @@ -12127,4 +12134,4 @@ inline SSL_CTX *Client::ssl_context() const { #undef poll #endif -#endif // CPPHTTPLIB_HTTPLIB_H \ No newline at end of file +#endif // CPPHTTPLIB_HTTPLIB_H diff --git a/src/duckdb/third_party/utf8proc/utf8proc_wrapper.cpp b/src/duckdb/third_party/utf8proc/utf8proc_wrapper.cpp index 7aba4df0f..be7991978 100644 --- a/src/duckdb/third_party/utf8proc/utf8proc_wrapper.cpp +++ b/src/duckdb/third_party/utf8proc/utf8proc_wrapper.cpp @@ -397,13 +397,12 @@ size_t Utf8Proc::RenderWidth(const char *s, size_t len, size_t pos) { size_t Utf8Proc::RenderWidth(const std::string &str) { size_t render_width = 0; - size_t pos = 0; - while (pos < str.size()) { - int sz; - auto codepoint = Utf8Proc::UTF8ToCodepoint(str.c_str() + pos, sz); - auto properties = duckdb::utf8proc_get_property(codepoint); - render_width += properties->charwidth; - pos += sz; + for (auto cluster : Utf8Proc::GraphemeClusters(str.c_str(), str.size())) { + // use the width of the first codepoint in the grapheme cluster + // combining marks, ZWJ, variation selectors, etc. have charwidth 0 + // and multi-codepoint clusters (e.g. ZWJ emoji sequences) should only + // count the base character's width, not the sum of all codepoints + render_width += Utf8Proc::RenderWidth(str.c_str(), str.size(), cluster.start); } return render_width; } diff --git a/src/duckdb/ub_src_optimizer_rule.cpp b/src/duckdb/ub_src_optimizer_rule.cpp index 2a2c56c3c..017e42221 100644 --- a/src/duckdb/ub_src_optimizer_rule.cpp +++ b/src/duckdb/ub_src_optimizer_rule.cpp @@ -32,6 +32,8 @@ #include "src/optimizer/rule/ordered_aggregate_optimizer.cpp" +#include "src/optimizer/rule/predicate_factoring.cpp" + #include "src/optimizer/rule/regex_optimizations.cpp" #include "src/optimizer/rule/timestamp_comparison.cpp"