Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
bf7bbc2
LOGBROKER-10206 Fix after review (#34064)
kuzin57 Mar 10, 2026
88c8805
[NBS-6956] Add nbs partition in-mem (#33486)
vazhem Mar 10, 2026
6871c15
PQv1: passthrough the '_advanced_monitoring' attribute (#34128)
ubyte Mar 10, 2026
89f1b24
LOGBROKER-10206 Add include (#34147)
kuzin57 Mar 10, 2026
677081e
Consumers partition metrics: add unittest for the '_advanced_monitori…
ubyte Mar 10, 2026
37d15e9
remove unused TThrRefBase base class from the public interface ISimpl…
ubyte Mar 10, 2026
b5b065e
LOGBROKER-10206 Add metrics & some fixes (#34200)
kuzin57 Mar 10, 2026
83de7c7
NBS2: add "get load actor adapter actor id" request to dstool; return…
BarkovBG Mar 10, 2026
ac437d4
Fixed sanitizer error in ydb/core/persqueue/pqtablet/partition/mlp/ut…
nshestakov Mar 10, 2026
dcd3acb
LOGBROKER-7430 fixed partition reading hanging in topic sdk (#34362)
GrigoriyPA Mar 10, 2026
7ebd8d8
Fixed SDK oss build (#34423)
Gazizonoki Mar 10, 2026
57f8f85
LOGBROKER-7430 added unit test on reading with restarts (#34487)
GrigoriyPA Mar 10, 2026
10bd0aa
[C++ SDK] Fixed tsan fail in kqp/ut/effects (#34319)
Gazizonoki Mar 10, 2026
5df3d0b
[C++ SDK] Created CHANGELOG.md (#34599)
Gazizonoki Mar 10, 2026
e4310f8
[C++ SDK] Added missing include in topics (#34591)
Gazizonoki Mar 10, 2026
2dcf985
Support ALTER TABLE COMPACT in kqp and rpc_alter_table (#34516)
lex007in Mar 10, 2026
c1f1dce
Allow to specify impl table proto settings for fulltext_relevance ind…
vitalif Mar 10, 2026
15acb5f
Columnshard bloom skip index support (#34458)
xyliganSereja Mar 10, 2026
249d24c
Specify user SID in change (CDC) records (#33262)
kseleznyov Mar 10, 2026
e6f9deb
Support include_index_data in ExportFs (#34641)
stanislav-shchetinin Mar 10, 2026
cb8a06f
[NBS-6905] Make nbs2 volume and partition tablets (#34315)
vazhem Mar 10, 2026
70ff80a
Support cancel/forget/list forced compaction methods in RPC (#34676)
lex007in Mar 10, 2026
1d48ceb
Support index_population_mode in ImportFs (#34832)
stanislav-shchetinin Mar 10, 2026
4824cbe
[NBS2]: introduce batch sync and erase (#34613)
BarkovBG Mar 10, 2026
734a794
Remove Layout (FLAT/FLAT_RELEVANCE) from fulltext settings (#34802)
vitalif Mar 10, 2026
a546a41
YQ-5091 rename PQ partitions balancer and fix registration (#34667)
GrigoriyPA Mar 10, 2026
c8087bd
Revert "YQ-5091 rename PQ partitions balancer and fix registration" (…
maximyurchuk Mar 10, 2026
4adf0fb
Add a note about removed full text Layout field (#34998)
vitalif Mar 10, 2026
a55d649
Revert "Revert "YQ-5091 rename PQ partitions balancer and fix registr…
GrigoriyPA Mar 10, 2026
50f73e2
Fixed TSAN error in tests (#35091)
nshestakov Mar 10, 2026
ef64fce
[C++ SDK] Added grpc load balancing policy option (#35137)
Gazizonoki Mar 10, 2026
16e9452
Added changelog entry for grpc load balancing policy (#35150)
Gazizonoki Mar 10, 2026
27531c0
[C++ SDK] Moved private executors to adapters (#35197)
Gazizonoki Mar 10, 2026
f2bac30
LOGBROKER-10206 New interface (#34369)
kuzin57 Mar 10, 2026
26c5dae
SchemeShard: add metrics and status handling for forced compaction (#…
lex007in Mar 10, 2026
ab49b65
LOGBROKER-10314 Fix race in read session (#35470)
kuzin57 Mar 10, 2026
60c9294
Fix: TTableClient destructor can hang indefinitely on Drain().Wait() …
Copilot Mar 10, 2026
d10728d
Update import generation: 36
github-actions[bot] Mar 10, 2026
c7a43e3
Update CMake files
pnv1 Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/import_generation.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
36
37
2 changes: 1 addition & 1 deletion .github/last_commit.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3b235ed1f2fc3977cfc6f99a74123c0097ef9795
abde8e11540c2ccb2c8b2d9d15b2bed3d3fbaf7e
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
* EXPERIMENTAL! Added `IProducer` interface to the SDK. This interface is used to write messages to a topic.
Each message can be associated with a partitioning key, which is used to determine the partition to which the message will be written.

* Added gRPC load balancing policy option for `TDriver`. Default policy: `round_robin`.

* Removed the `layout` field from `FulltextIndexSettings` and replaced it with separate index types in `TableIndexDescription`.
`layout` was a preliminary version of API, actual YDB release 26-1 uses separate index types, so please note that creating
full text indexes via gRPC won't work with previous versions of SDK.
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ add_subdirectory(secondary_index_builtin)
add_subdirectory(time)
add_subdirectory(topic_reader)
add_subdirectory(topic_writer/transaction)
add_subdirectory(topic_writer/producer/basic_write)
add_subdirectory(ttl)
add_subdirectory(vector_index)
add_subdirectory(vector_index_builtin)
11 changes: 2 additions & 9 deletions examples/executor/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,13 @@

#include <library/cpp/getopt/last_getopt.h>

#include <util/thread/pool.h>

#include <thread>


void ExecutorExample(const std::string& endpoint, const std::string& database) {
auto driverConfig = NYdb::CreateFromEnvironment(endpoint + "/?database=" + database)
.SetExecutor(NYdb::CreateThreadPoolExecutorAdapter(
std::make_shared<TThreadPool>(TThreadPool::TParams()
.SetBlocking(true)
.SetCatching(false)
.SetForkAware(false)),
std::thread::hardware_concurrency())
);
.SetExecutor(NYdb::CreateThreadPoolExecutor(std::thread::hardware_concurrency(), 0)
);

NYdb::TDriver driver(driverConfig);
NYdb::NQuery::TQueryClient client(driver);
Expand Down
36 changes: 36 additions & 0 deletions examples/topic_writer/producer/basic_write/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
add_executable(topic_writer_producer_buffer_overloaded)

target_link_libraries(topic_writer_producer_buffer_overloaded
PUBLIC
yutil
YDB-CPP-SDK::Topic
YDB-CPP-SDK::Query
)

target_sources(topic_writer_producer_buffer_overloaded
PRIVATE
main.cpp
)

vcs_info(topic_writer_producer_buffer_overloaded)

if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64")
target_link_libraries(topic_writer_producer_buffer_overloaded PUBLIC
cpuid_check
)
endif()

if (CMAKE_SYSTEM_NAME STREQUAL "Linux")
target_link_options(topic_writer_producer_buffer_overloaded PRIVATE
-ldl
-lrt
-Wl,--no-as-needed
-lpthread
)
elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin")
target_link_options(topic_writer_producer_buffer_overloaded PRIVATE
-Wl,-platform_version,macos,11.0,11.0
-framework
CoreFoundation
)
endif()
123 changes: 123 additions & 0 deletions examples/topic_writer/producer/basic_write/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#include <thread>
#include <ydb-cpp-sdk/client/topic/client.h>
#include <ydb-cpp-sdk/client/query/client.h>

#include <util/generic/serialized_enum.h>

std::shared_ptr<NYdb::NTopic::IProducer> CreateProducer(const std::string& topic, NYdb::NTopic::TTopicClient& topicClient) {
NYdb::NTopic::TProducerSettings producerSettings;
producerSettings.Path(topic);
producerSettings.Codec(NYdb::NTopic::ECodec::ZSTD);
producerSettings.ProducerIdPrefix("producer_basic");
producerSettings.PartitionChooserStrategy(NYdb::NTopic::TProducerSettings::EPartitionChooserStrategy::Bound);
producerSettings.SubSessionIdleTimeout(TDuration::Seconds(30));
producerSettings.MaxBlock(TDuration::Seconds(30));
producerSettings.MaxMemoryUsage(100_MB);
return topicClient.CreateProducer(producerSettings);
}

std::string GetResultStatus(const NYdb::NTopic::TWriteResult& result) {
switch (result.Status) {
case NYdb::NTopic::EWriteStatus::Queued:
return "Queued";
case NYdb::NTopic::EWriteStatus::Error:
return "Error";
case NYdb::NTopic::EWriteStatus::Timeout:
return "Timeout";
}
}

std::string GetResultStatus(const NYdb::NTopic::TFlushResult& result) {
switch (result.Status) {
case NYdb::NTopic::EFlushStatus::Success:
return "Success";
case NYdb::NTopic::EFlushStatus::ProducerClosed:
return "ProducerClosed";
}
}

std::string GetErrorMessage(const NYdb::NTopic::TFlushResult& result) {
std::string errorMessage = "error occurred while flushing messages";
errorMessage += ", flush status: " + GetResultStatus(result);
errorMessage += ", last written sequence number: " + ToString(result.LastWrittenSeqNo);
if (result.ClosedDescription) {
errorMessage += ", producer is closed: ";
errorMessage += result.ClosedDescription.value().DebugString();
}
return errorMessage;
}

std::string GetErrorMessage(const NYdb::NTopic::TWriteResult& result) {
std::string errorMessage = "error occurred while writing message";
errorMessage += ", write status: " + GetResultStatus(result);
if (result.ErrorMessage) {
errorMessage += ", reason: ";
errorMessage += result.ErrorMessage.value();
}
if (result.ClosedDescription) {
errorMessage += ", producer is closed: ";
errorMessage += result.ClosedDescription.value().DebugString();
}
return errorMessage;
}

void WriteWithHandlingResult(std::shared_ptr<NYdb::NTopic::IProducer> producer, NYdb::NTopic::TWriteMessage&& writeMessage) {
static constexpr size_t MAX_RETRIES = 10;

for (size_t retries = 0; retries < MAX_RETRIES; retries++) {
auto writeResult = producer->Write(std::move(writeMessage));
if (writeResult.IsSuccess()) {
// if write was successful, we can continue writing messages
continue;
}

if (writeResult.IsError()) {
// this means that some non retryable error occurred, for example, producer was closed due to user error
// in this case we need to stop retrying and see the close description (to simplify the example, we just print it to standard error)
std::cerr << GetErrorMessage(writeResult) << std::endl;
return;
}

if (writeResult.IsTimeout()) {
// when timeout occurs this means that producer's buffer is overloaded by memory (see MaxMemoryUsage setting)
// so we need to wait for some time and try to write again later
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
}

auto flushResult = producer->Flush().GetValueSync();
if (flushResult.IsSuccess()) {
// if flush was successful, we can return, because all messages were written to the server
return;
}

if (flushResult.IsClosed()) {
// if flush was not successful, this means that producer was closed due to non retryable error
// in this case we should see the close description (to simplify the example, we just print it to standard error)
std::cerr << GetErrorMessage(flushResult) << std::endl;
}
}

int main() {
const std::string ENDPOINT = "HOST:PORT";
const std::string DATABASE = "DATABASE";
const std::string TOPIC = "PATH/TO/TOPIC";

NYdb::TDriverConfig config;
config.SetEndpoint(ENDPOINT);
config.SetDatabase(DATABASE);
NYdb::TDriver driver(config);

NYdb::NTopic::TTopicClient topicClient(driver);

auto producer = CreateProducer(TOPIC, topicClient);
auto messageData = std::string(1_KB, 'a');

for (int i = 0; i < 10; i++) {
NYdb::NTopic::TWriteMessage writeMessage(messageData);
writeMessage.Key("key" + ToString(i));
WriteWithHandlingResult(producer, std::move(writeMessage));
}
return 0;
}
6 changes: 6 additions & 0 deletions include/ydb-cpp-sdk/client/driver/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class TDriverConfig {
//! Params is a optionally field to set policy settings
//! default: EBalancingPolicy::UsePreferableLocation
TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = "");

//! Set grpc level keep alive. If keepalive ping was delayed more than given timeout
//! internal grpc routine fails request with TRANSIENT_FAILURE or TRANSPORT_UNAVAILABLE error
//! Note: this timeout should not be too small to prevent fail due to
Expand All @@ -131,6 +132,11 @@ class TDriverConfig {
TDriverConfig& SetGRpcKeepAliveTimeout(TDuration timeout);
TDriverConfig& SetGRpcKeepAlivePermitWithoutCalls(bool permitWithoutCalls);

//! Set grpc load balancing policy
//! policy - name of the load balancing policy, see grpc documentation for available policies
//! default: "round_robin"
TDriverConfig& SetGRpcLoadBalancingPolicy(const std::string& policy);

//! Set inactive socket timeout.
//! Used to close connections, that were inactive for given time.
//! Closes unused connections every 1/10 of timeout, so deletion time is approximate.
Expand Down
1 change: 1 addition & 0 deletions include/ydb-cpp-sdk/client/import/import.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ struct TImportFromFsSettings : public TOperationRequestSettings<TImportFromFsSet
FLUENT_SETTING_OPTIONAL(uint32_t, NumberOfRetries);
FLUENT_SETTING_OPTIONAL(bool, NoACL);
FLUENT_SETTING_OPTIONAL(bool, SkipChecksumValidation);
FLUENT_SETTING_DEFAULT(EIndexPopulationMode, IndexPopulationMode, EIndexPopulationMode::Build);
FLUENT_SETTING_VECTOR(std::string, ExcludeRegexp);
};

Expand Down
25 changes: 16 additions & 9 deletions include/ydb-cpp-sdk/client/table/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ class TReadReplicasSettings {
};

struct TGlobalIndexSettings {
static constexpr const int VectorKMeansTreeLevelTablePosition = 0;
static constexpr const int VectorKMeansTreePostingTablePosition = 1;
static constexpr const int VectorKMeansTreePrefixTablePosition = 2;
static constexpr const int FulltextRelevanceDictTablePosition = 0;
static constexpr const int FulltextRelevanceDocsTablePosition = 1;
static constexpr const int FulltextRelevanceStatsTablePosition = 2;
static constexpr const int FulltextRelevancePostingTablePosition = 3;

using TUniformOrExplicitPartitions = std::variant<std::monostate, uint64_t, TExplicitPartitions>;

TPartitioningSettings PartitioningSettings;
Expand Down Expand Up @@ -305,12 +313,6 @@ struct TKMeansTreeSettings {

struct TFulltextIndexSettings {
public:
enum class ELayout {
Unspecified = 0,
Flat,
FlatRelevance,
};

enum class ETokenizer {
Unspecified = 0,
Whitespace,
Expand All @@ -337,7 +339,6 @@ struct TFulltextIndexSettings {
std::optional<TAnalyzers> Analyzers;
};

std::optional<ELayout> Layout;
std::vector<TColumnAnalyzers> Columns;

static TFulltextIndexSettings FromProto(const Ydb::Table::FulltextIndexSettings& proto);
Expand Down Expand Up @@ -458,6 +459,8 @@ class TChangefeedDescription {
TChangefeedDescription& WithRetentionPeriod(const TDuration& value);
// Initial scan will output the current state of the table first
TChangefeedDescription& WithInitialScan();
// Enable UserSIDs
TChangefeedDescription& WithUserSIDs();
// Attributes
TChangefeedDescription& AddAttribute(const std::string& key, const std::string& value);
TChangefeedDescription& SetAttributes(const std::unordered_map<std::string, std::string>& attrs);
Expand All @@ -473,6 +476,7 @@ class TChangefeedDescription {
bool GetSchemaChanges() const;
const std::optional<TDuration>& GetResolvedTimestamps() const;
bool GetInitialScan() const;
bool GetUserSIDs() const;
const std::unordered_map<std::string, std::string>& GetAttributes() const;
const std::string& GetAwsRegion() const;
const std::optional<TInitialScanProgress>& GetInitialScanProgress() const;
Expand Down Expand Up @@ -502,6 +506,7 @@ class TChangefeedDescription {
std::optional<TDuration> ResolvedTimestamps_;
std::optional<TDuration> RetentionPeriod_;
bool InitialScan_ = false;
bool UserSIDs_ = false;
std::unordered_map<std::string, std::string> Attributes_;
std::string AwsRegion_;
std::optional<TInitialScanProgress> InitialScanProgress_;
Expand Down Expand Up @@ -808,8 +813,8 @@ class TTableDescription {
void AddVectorKMeansTreeIndex(const std::string& indexName, const std::vector<std::string>& indexColumns, const TKMeansTreeSettings& indexSettings);
void AddVectorKMeansTreeIndex(const std::string& indexName, const std::vector<std::string>& indexColumns, const std::vector<std::string>& dataColumns, const TKMeansTreeSettings& indexSettings);
// fulltext
void AddFulltextIndex(const std::string& indexName, const std::vector<std::string>& indexColumns, const TFulltextIndexSettings& indexSettings);
void AddFulltextIndex(const std::string& indexName, const std::vector<std::string>& indexColumns, const std::vector<std::string>& dataColumns, const TFulltextIndexSettings& indexSettings);
void AddFulltextIndex(const std::string& indexName, EIndexType type, const std::vector<std::string>& indexColumns, const TFulltextIndexSettings& indexSettings);
void AddFulltextIndex(const std::string& indexName, EIndexType type, const std::vector<std::string>& indexColumns, const std::vector<std::string>& dataColumns, const TFulltextIndexSettings& indexSettings);

// default
void AddSecondaryIndex(const std::string& indexName, const std::vector<std::string>& indexColumns);
Expand Down Expand Up @@ -1054,6 +1059,8 @@ class TTableBuilder {
// fulltext
TTableBuilder& AddFulltextIndex(const std::string& indexName, const std::vector<std::string>& indexColumns, const TFulltextIndexSettings& indexSettings);
TTableBuilder& AddFulltextIndex(const std::string& indexName, const std::vector<std::string>& indexColumns, const std::vector<std::string>& dataColumns, const TFulltextIndexSettings& indexSettings);
TTableBuilder& AddFulltextRelevanceIndex(const std::string& indexName, const std::vector<std::string>& indexColumns, const TFulltextIndexSettings& indexSettings);
TTableBuilder& AddFulltextRelevanceIndex(const std::string& indexName, const std::vector<std::string>& indexColumns, const std::vector<std::string>& dataColumns, const TFulltextIndexSettings& indexSettings);

// default
TTableBuilder& AddSecondaryIndex(const std::string& indexName, const std::vector<std::string>& indexColumns, const std::vector<std::string>& dataColumns);
Expand Down
6 changes: 6 additions & 0 deletions include/ydb-cpp-sdk/client/topic/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ generate_enum_serilization(client-ydb_topic-include
include/ydb-cpp-sdk/client/topic/write_events.h
)

generate_enum_serilization(client-ydb_topic-include
${YDB_SDK_SOURCE_DIR}/include/ydb-cpp-sdk/client/topic/producer.h
INCLUDE_HEADERS
include/ydb-cpp-sdk/client/topic/producer.h
)

_ydb_sdk_install_targets(TARGETS client-ydb_topic-include)
12 changes: 8 additions & 4 deletions include/ydb-cpp-sdk/client/topic/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "control_plane.h"
#include "read_session.h"
#include "write_session.h"
#include "producer.h"

namespace NYdb::inline V3::NTopic {

Expand Down Expand Up @@ -49,11 +50,14 @@ class TTopicClient {
//! Create write session.
std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TWriteSessionSettings& settings);

//! Create simple blocking keyed write session. Experimental feature. DO NOT USE IN PRODUCTION.
std::shared_ptr<ISimpleBlockingKeyedWriteSession> CreateSimpleBlockingKeyedWriteSession(const TKeyedWriteSessionSettings& settings);
//! Create producer. Experimental feature. DO NOT USE IN PRODUCTION.
std::shared_ptr<IProducer> CreateProducer(const TProducerSettings& settings);

//! Create keyed write session. Experimental feature. DO NOT USE IN PRODUCTION.
std::shared_ptr<IKeyedWriteSession> CreateKeyedWriteSession(const TKeyedWriteSessionSettings& settings);
//! Create typed producer.
template<typename T>
std::shared_ptr<TTypedProducer<T>> CreateTypedProducer(const TProducerSettings& settings) {
return std::make_shared<TTypedProducer<T>>(CreateProducer(settings));
}

//! Create write session.
std::shared_ptr<IWriteSession> CreateWriteSession(const TWriteSessionSettings& settings);
Expand Down
Loading
Loading