diff --git a/src/jrd/dpm.epp b/src/jrd/dpm.epp index 277c1767bfe..3feddb21b9a 100644 --- a/src/jrd/dpm.epp +++ b/src/jrd/dpm.epp @@ -1509,7 +1509,7 @@ SINT64 DPM_gen_id(thread_db* tdbb, SLONG generator, bool initialize, SINT64 val) if (transaction) transaction->tra_flags |= TRA_write; - REPL_gen_id(tdbb, generator, value); + REPL_gen_id(tdbb, generator, value, transaction); return value; } diff --git a/src/jrd/replication/Publisher.cpp b/src/jrd/replication/Publisher.cpp index 098624802c4..c8c4caf1696 100644 --- a/src/jrd/replication/Publisher.cpp +++ b/src/jrd/replication/Publisher.cpp @@ -623,7 +623,7 @@ void REPL_erase(thread_db* tdbb, const record_param* rpb, jrd_tra* transaction) checkStatus(tdbb, status, transaction); } -void REPL_gen_id(thread_db* tdbb, SLONG genId, SINT64 value) +void REPL_gen_id(thread_db* tdbb, SLONG genId, SINT64 value, jrd_tra* transaction) { if (tdbb->tdbb_flags & (TDBB_dont_post_dfw | TDBB_repl_in_progress)) return; @@ -642,6 +642,13 @@ void REPL_gen_id(thread_db* tdbb, SLONG genId, SINT64 value) if (!replicator) return; + FbLocalStatus status; + + // Create IReplicatedTransaction object for current transaction + // without any operations before changing generator + if (transaction && !transaction->tra_replicator) + getReplicator(tdbb, status, transaction); + const auto database = tdbb->getDatabase(); QualifiedName genName; @@ -655,7 +662,6 @@ void REPL_gen_id(thread_db* tdbb, SLONG genId, SINT64 value) AutoSetRestoreFlag noRecursion(&tdbb->tdbb_flags, TDBB_repl_in_progress, true); - FbLocalStatus status; replicator->setSequence2(&status, genName.schema.c_str(), genName.object.c_str(), value); checkStatus(tdbb, status); diff --git a/src/jrd/replication/Publisher.h b/src/jrd/replication/Publisher.h index 7fee6ceee1c..c69d25215b9 100644 --- a/src/jrd/replication/Publisher.h +++ b/src/jrd/replication/Publisher.h @@ -44,7 +44,7 @@ void REPL_store(Jrd::thread_db* tdbb, const Jrd::record_param* rpb, void REPL_modify(Jrd::thread_db* tdbb, const Jrd::record_param* orgRpb, const Jrd::record_param* newRpb, Jrd::jrd_tra* transaction); void REPL_erase(Jrd::thread_db* tdbb, const Jrd::record_param* rpb, Jrd::jrd_tra* transaction); -void REPL_gen_id(Jrd::thread_db* tdbb, SLONG genId, SINT64 value); +void REPL_gen_id(Jrd::thread_db* tdbb, SLONG genId, SINT64 value, Jrd::jrd_tra* transaction); void REPL_exec_sql(Jrd::thread_db* tdbb, Jrd::jrd_tra* transaction, const Firebird::string& sql, const Firebird::ObjectsArray& schemaSearchPath); void REPL_journal_switch(Jrd::thread_db* tdbb); diff --git a/src/jrd/replication/Replicator.cpp b/src/jrd/replication/Replicator.cpp index a2e9f4b68db..e0ee95027e5 100644 --- a/src/jrd/replication/Replicator.cpp +++ b/src/jrd/replication/Replicator.cpp @@ -207,18 +207,7 @@ void Replicator::commitTransaction(CheckStatusWrapper* status, Transaction* tran const auto dataLength = txnData.buffer->getCount() - sizeof(Block); fb_assert(txnData.flushes || dataLength > sizeof(UCHAR)); - for (const auto& generator : m_generators) - { - fb_assert(generator.name.object.hasData() && generator.name.schema.hasData()); - - const auto [schemaAtom, objectAtom] = txnData.defineQualifiedAtom(generator.name); - - txnData.putTag(opSetSequence); - txnData.putInt32(schemaAtom); - txnData.putInt32(objectAtom); - txnData.putInt64(generator.value); - } - + txnData.putGenerators(m_generators); m_generators.clear(); txnData.putTag(opCommitTransaction); @@ -236,9 +225,13 @@ void Replicator::rollbackTransaction(CheckStatusWrapper* status, Transaction* tr { auto& txnData = transaction->getData(); - if (txnData.flushes) + if (txnData.flushes || m_generators.hasData()) { - txnData.putTag(opRollbackTransaction); + txnData.putGenerators(m_generators); + m_generators.clear(); + + if (txnData.flushes) + txnData.putTag(opRollbackTransaction); flush(txnData, FLUSH_SYNC, BLOCK_END_TRANS); } } diff --git a/src/jrd/replication/Replicator.h b/src/jrd/replication/Replicator.h index 38e0f0d01c0..6bbce609af7 100644 --- a/src/jrd/replication/Replicator.h +++ b/src/jrd/replication/Replicator.h @@ -42,6 +42,14 @@ namespace Replication typedef Firebird::ObjectsArray NameCache; typedef Firebird::HalfStaticArray SavepointStack; + struct GeneratorValue + { + Jrd::QualifiedName name; + SINT64 value = 0; + }; + + typedef Firebird::Array GeneratorCache; + struct BatchBlock { Block header{}; @@ -135,6 +143,21 @@ namespace Replication { buffer->add(data, length); } + + void putGenerators(const GeneratorCache& generators) + { + for (const auto& generator : generators) + { + fb_assert(generator.name.object.hasData() && generator.name.schema.hasData()); + + const auto [schemaAtom, objectAtom] = defineQualifiedAtom(generator.name); + + putTag(opSetSequence); + putInt32(schemaAtom); + putInt32(objectAtom); + putInt64(generator.value); + } + } }; class Transaction final : @@ -255,14 +278,6 @@ namespace Replication BatchBlock m_data; }; - struct GeneratorValue - { - Jrd::QualifiedName name; - SINT64 value = 0; - }; - - typedef Firebird::Array GeneratorCache; - enum FlushReason { FLUSH_OVERFLOW,