diff --git a/AGENTS.md b/AGENTS.md index b40befa..e6bfd0c 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -75,3 +75,5 @@ Read the matching file in `guides/` when the task needs more detail: - `guides/concurrency.md` - thread-safety contracts, callback dispatch, mutex ordering, and shutdown invariants. - `guides/build.md` - submodules, configure/build/test/bench flow. +- `guides/ci.md` - CI environment parity, toolchain compatibility, + sanitizer/timing test guidance, and CI-failure workflow. diff --git a/README.md b/README.md index d9dffe8..8bd852a 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,8 @@ See the macro examples below or browse the `examples/` folder for focused demons Recent focused examples include: +- `examples/example_logit_memory_logger.cpp` - in-memory snapshots plus shared `ILogReader` and `ILogSubscriber` macros. +- `examples/example_logit_mdbx_logger.cpp` - persistent MDBX storage plus the same shared read/callback macros and MDBX-specific session/payload APIs. - `examples/example_logit_otlp_http.cpp` - OTLP/HTTP export with batching, retries, optional compression, and contextual trace/span fields. - `examples/example_logit_prometheus_payload.cpp` - callback-based Prometheus payload emission with custom registry metrics. - `examples/example_logit_prometheus_server.cpp` - embedded `/metrics` endpoint with built-in and application metrics. @@ -229,6 +231,57 @@ at the `Logger` layer and do not take the per-backend execution mutex, but they still synchronize on the memory backend's own mutex while copying the current buffer. +### Common stored-log API + +Use `ILogReader` and `ILogSubscriber` when application code should work with +either `MemoryLogger` or `MdbxLogger`. The shared macro helpers return +`LogRecordSnapshot` records and avoid depending on backend-specific storage: + +```cpp +#include + +int main() { + // This can be a MemoryLogger index or an MdbxLogger index. + const int backend_index = 0; + + const int64_t now_ms = LOGIT_CURRENT_TIMESTAMP_MS(); + const auto recent = LOGIT_READ_RECENT_ASC(backend_index, 100, 0); + const auto window = LOGIT_READ_RANGE( + backend_index, + now_ms - 60LL * 60 * 1000, + now_ms + 1, + 0); + + std::vector live_updates; + const uint64_t callback_id = LOGIT_ADD_LOG_CALLBACK( + backend_index, + ([&live_updates](const logit::LogRecordSnapshot& record) { + live_updates.push_back(record); + })); + + LOGIT_INFO_TO(backend_index, "visible through read and callback APIs"); + LOGIT_WAIT(); + LOGIT_REMOVE_LOG_CALLBACK(backend_index, callback_id); + + (void)recent; + (void)window; + (void)live_updates; +} +``` + +`LOGIT_READ_RANGE`, `LOGIT_READ_RECENT_ASC`, and +`LOGIT_READ_RECENT_DESC` use `ILogReader`. `LOGIT_ADD_LOG_CALLBACK` and +`LOGIT_REMOVE_LOG_CALLBACK` use `ILogSubscriber`; callbacks receive a +`LogRecordSnapshot` after the backend has written the record. Snapshots own +their string fields, so they can be copied or stored by value. Callback +dispatch follows registration order. This is the preferred fallback-friendly +API between `MemoryLogger` and `MdbxLogger`. + +`LOGIT_GET_BUFFERED_STRINGS` and `LOGIT_GET_BUFFERED_ENTRIES` are convenience +helpers for the `MemoryLogger` snapshot buffer. They are useful for local +diagnostics panes, but code that should switch between in-memory and MDBX +storage should prefer the shared `LOGIT_READ_*` and callback macros above. + File-based backends also expose persisted-file access through `LOGIT_LIST_LOG_FILES(index)`, `LOGIT_READ_LOG_FILE(index, path)`, and `LOGIT_READ_LOG_FILES(index, paths)`. These helpers read only what has already diff --git a/docs/mainpage.dox b/docs/mainpage.dox index d1c0dd7..9eaade8 100644 --- a/docs/mainpage.dox +++ b/docs/mainpage.dox @@ -13,7 +13,7 @@ Key characteristics: - **Flexible formatting and routing.** Customize output patterns, mix console/file/system backends, or supply custom logger implementations. - **Async by default.** Each backend is served by the task executor with configurable queue sizes and overflow policies, plus helpers such as `LOGIT_WARN_ONCE` or `LOGIT_ERROR_THROTTLE` to keep repeated messages under control. -See the macro examples below or browse the `examples/` directory for focused demonstrations, including queue tuning and crash handling. +See the macro examples below or browse the `examples/` directory for focused demonstrations, including queue tuning and crash handling. `examples/example_logit_memory_logger.cpp` and `examples/example_logit_mdbx_logger.cpp` show the shared read/callback API for in-memory and MDBX-backed stored logs. \section macro_first_usage_sec Macro-first usage @@ -141,6 +141,57 @@ full object footprint of each `BufferedLogEntry`. Snapshot reads avoid the `Logger` execution mutex, but they still synchronize on the memory backend's own mutex while copying the current buffer. +\subsection macro_examples_shared_reader Common stored-log API + +Use `ILogReader` and `ILogSubscriber` when application code should work with +either `MemoryLogger` or `MdbxLogger`. The shared macro helpers return +`LogRecordSnapshot` records and avoid depending on backend-specific storage: + +\code{.cpp} +#include + +int main() { + // This can be a MemoryLogger index or an MdbxLogger index. + const int backend_index = 0; + + const int64_t now_ms = LOGIT_CURRENT_TIMESTAMP_MS(); + const auto recent = LOGIT_READ_RECENT_ASC(backend_index, 100, 0); + const auto window = LOGIT_READ_RANGE( + backend_index, + now_ms - 60LL * 60 * 1000, + now_ms + 1, + 0); + + std::vector live_updates; + const uint64_t callback_id = LOGIT_ADD_LOG_CALLBACK( + backend_index, + ([&live_updates](const logit::LogRecordSnapshot& record) { + live_updates.push_back(record); + })); + + LOGIT_INFO_TO(backend_index, "visible through read and callback APIs"); + LOGIT_WAIT(); + LOGIT_REMOVE_LOG_CALLBACK(backend_index, callback_id); + + (void)recent; + (void)window; + (void)live_updates; +} +\endcode + +`LOGIT_READ_RANGE`, `LOGIT_READ_RECENT_ASC`, and +`LOGIT_READ_RECENT_DESC` use `ILogReader`. `LOGIT_ADD_LOG_CALLBACK` and +`LOGIT_REMOVE_LOG_CALLBACK` use `ILogSubscriber`; callbacks receive a +`LogRecordSnapshot` after the backend has written the record. Snapshots own +their string fields, so they can be copied or stored by value. Callback +dispatch follows registration order. This is the preferred fallback-friendly +API between `MemoryLogger` and `MdbxLogger`. + +`LOGIT_GET_BUFFERED_STRINGS` and `LOGIT_GET_BUFFERED_ENTRIES` are convenience +helpers for the `MemoryLogger` snapshot buffer. They are useful for local +diagnostics panes, but code that should switch between in-memory and MDBX +storage should prefer the shared `LOGIT_READ_*` and callback macros above. + File-based backends also expose persisted-file access through `LOGIT_LIST_LOG_FILES(index)`, `LOGIT_READ_LOG_FILE(index, path)`, and `LOGIT_READ_LOG_FILES(index, paths)`. These helpers return only what has diff --git a/examples/example_logit_mdbx_logger.cpp b/examples/example_logit_mdbx_logger.cpp index d8c36d2..fdd8549 100644 --- a/examples/example_logit_mdbx_logger.cpp +++ b/examples/example_logit_mdbx_logger.cpp @@ -54,9 +54,12 @@ int main() { std::cerr << "[MdbxLogger error callback] " << msg << std::endl; }; - // Add to the registry in single_mode so it does not duplicate console output. - // After this call the logger index depends on what was added before it. - LOGIT_ADD_LOGGER_SINGLE_MODE( + // Add a console logger for live observation (optional). + LOGIT_ADD_CONSOLE_DEFAULT(); + + // Add to the registry as a regular backend so standard LOGIT_* macros + // write both to MDBX and to the console logger above. + LOGIT_ADD_LOGGER( logit::MdbxLogger, (mdbx_config), logit::SimpleLogFormatter, @@ -65,9 +68,6 @@ int main() { // The MDBX logger is now the last added backend; its index is: const int mdbx_index = static_cast(logit::Logger::get_instance().logger_count()) - 1; - // Also add a console logger for live observation (optional). - LOGIT_ADD_CONSOLE_DEFAULT(); - // ------------------------------------------------------------------ // 2. Log messages via standard macros // ------------------------------------------------------------------ @@ -96,7 +96,7 @@ int main() { LOGIT_WAIT(); // ------------------------------------------------------------------ - // 3. Read the MDBX logger directly via typed macro helpers + // 3. Read through the common ILogReader API, then use MDBX-only extras // ------------------------------------------------------------------ LOGIT_WITH_LOGGER_AS(mdbx_index, logit::MdbxLogger, mdbx) { // Session metadata @@ -109,9 +109,14 @@ int main() { std::cout << " schema_ver: " << session_opt->schema_version << std::endl; } + // The LOGIT_READ_* macros work with both MemoryLogger and MdbxLogger. // All records in a wide time window (last 24 hours). const int64_t now_ms = LOGIT_CURRENT_TIMESTAMP_MS(); - auto all_records = mdbx->read_range(now_ms - 24 * 60 * 60 * 1000, now_ms + 1); + auto all_records = LOGIT_READ_RANGE( + mdbx_index, + now_ms - 24LL * 60 * 60 * 1000, + now_ms + 1, + 0); std::cout << "\n--- All records (" << all_records.size() << ") ---" << std::endl; for (const auto& r : all_records) { @@ -163,9 +168,11 @@ int main() { auto today_ms = std::chrono::milliseconds(static_cast(midnight_t) * 1000); auto tomorrow_ms = today_ms + std::chrono::hours(24); - auto day_records = mdbx->read_range( + auto day_records = LOGIT_READ_RANGE( + mdbx_index, today_ms.count(), - tomorrow_ms.count()); + tomorrow_ms.count(), + 0); std::cout << "\n--- Records for today (" << day_records.size() << ") ---" << std::endl; for (const auto& r : day_records) { @@ -176,20 +183,22 @@ int main() { std::cerr << "Date query example skipped: " << e.what() << std::endl; } - // read_recent: last 100 records in ascending order - auto recent = mdbx->read_recent(100, 0, logit::LogReadOrder::Ascending); - std::cout << "\n--- read_recent(100) ascending (" << recent.size() << ") ---" << std::endl; + // read_recent: last 100 records in ascending order, through ILogReader. + auto recent = LOGIT_READ_RECENT_ASC(mdbx_index, 100, 0); + std::cout << "\n--- LOGIT_READ_RECENT_ASC(100) (" << recent.size() << ") ---" << std::endl; for (const auto& r : recent) { std::cout << " [" << logit::to_string(r.level) << "] " << r.timestamp_ms << " " << r.message << std::endl; } - // Live subscription: snapshot + real-time updates - std::vector live_updates; - uint64_t cb_id = mdbx->add_log_callback( - [&live_updates](const logit::LogRecordView& v) { + // Live subscription: the LOGIT_* callback macros also work with both + // MemoryLogger and MdbxLogger. + std::vector live_updates; + const uint64_t cb_id = LOGIT_ADD_LOG_CALLBACK( + mdbx_index, + ([&live_updates](const logit::LogRecordSnapshot& v) { live_updates.push_back(v); - }); + })); LOGIT_INFO("Live event 1 via callback"); LOGIT_INFO("Live event 2 via callback"); @@ -202,7 +211,7 @@ int main() { << r.message << std::endl; } - if (mdbx->remove_log_callback(cb_id)) { + if (LOGIT_REMOVE_LOG_CALLBACK(mdbx_index, cb_id)) { std::cout << "Callback removed" << std::endl; } diff --git a/examples/example_logit_memory_logger.cpp b/examples/example_logit_memory_logger.cpp index 009b566..10b8962 100644 --- a/examples/example_logit_memory_logger.cpp +++ b/examples/example_logit_memory_logger.cpp @@ -1,6 +1,12 @@ +/// \file example_logit_memory_logger.cpp +/// \brief Demonstrates in-memory snapshots, range reads, and live subscriptions. + // #define LOGIT_BASE_PATH "E:\\_repoz\\log-it-cpp" <- set via CMake +#include #include +#include +#include #include int main() { @@ -8,17 +14,19 @@ int main() { LOGIT_ADD_CONSOLE_DEFAULT(); LOGIT_ADD_MEMORY_LOGGER_SINGLE_MODE(100, 64 * 1024, 60 * 60 * 1000); + const int memory_index = static_cast(logit::Logger::get_instance().logger_count()) - 1; LOGIT_INFO("This goes to the regular backends"); - LOGIT_INFO_TO(1, "Remote control can fetch this later"); - LOGIT_WARN_TO(1, "Latest warning for the control plane"); - LOGIT_SECTION_TO(1, "Network Settings"); - LOGIT_RAW_TO(1, "Protocol: https"); - LOGIT_RAW_TO(1, "Transport Mode: standard"); - LOGIT_RAW_TO(1, "DNS Server: 1.1.1.1"); + LOGIT_INFO_TO(memory_index, "Remote control can fetch this later"); + LOGIT_WARN_TO(memory_index, "Latest warning for the control plane"); + LOGIT_SECTION_TO(memory_index, "Network Settings"); + LOGIT_RAW_TO(memory_index, "Protocol: https"); + LOGIT_RAW_TO(memory_index, "Transport Mode: standard"); + LOGIT_RAW_TO(memory_index, "DNS Server: 1.1.1.1"); + LOGIT_WAIT(); - const auto recent_lines = LOGIT_GET_BUFFERED_STRINGS(1); - const auto recent_entries = LOGIT_GET_BUFFERED_ENTRIES(1); + const auto recent_lines = LOGIT_GET_BUFFERED_STRINGS(memory_index); + const auto recent_entries = LOGIT_GET_BUFFERED_ENTRIES(memory_index); std::cout << "Buffered strings:" << std::endl; for (const auto& line : recent_lines) { @@ -31,5 +39,60 @@ int main() { << entry.timestamp_ms << " " << entry.message << std::endl; } + const auto recent_records = LOGIT_READ_RECENT_ASC(memory_index, 3, 0); + std::cout << "Recent records via ILogReader:" << std::endl; + for (const auto& record : recent_records) { + std::cout << " [" << logit::to_string(record.level) << "] " + << record.timestamp_ms << " " << record.message << std::endl; + } + + const int64_t now_ms = LOGIT_CURRENT_TIMESTAMP_MS(); + const auto range_records = LOGIT_READ_RANGE( + memory_index, + now_ms - 60LL * 60 * 1000, + now_ms + 1, + 0); + std::cout << "Range records for the last hour: " + << range_records.size() << std::endl; + + std::vector live_updates; + std::mutex live_mutex; + const uint64_t callback_id = LOGIT_ADD_LOG_CALLBACK( + memory_index, + ([&live_updates, &live_mutex](const logit::LogRecordSnapshot& view) { + std::lock_guard lock(live_mutex); + live_updates.push_back(view); + })); + + LOGIT_INFO_TO(memory_index, "Live callback event 1"); + LOGIT_ERROR_TO(memory_index, "Live callback event 2"); + LOGIT_WAIT(); + + std::size_t live_count = 0; + { + std::lock_guard lock(live_mutex); + live_count = live_updates.size(); + std::cout << "Live updates received:" << std::endl; + for (const auto& record : live_updates) { + std::cout << " [" << logit::to_string(record.level) << "] " + << record.message << std::endl; + } + } + + if (LOGIT_REMOVE_LOG_CALLBACK(memory_index, callback_id)) { + std::cout << "Callback removed" << std::endl; + } + + LOGIT_WARN_TO(memory_index, "Stored after callback removal"); + LOGIT_WAIT(); + + { + std::lock_guard lock(live_mutex); + std::cout << "Live updates after unsubscribe: " + << live_updates.size() << " (was " << live_count << ")" + << std::endl; + } + + LOGIT_SHUTDOWN(); return 0; } diff --git a/guides/README.md b/guides/README.md index 65ce81a..11f5bf4 100644 --- a/guides/README.md +++ b/guides/README.md @@ -21,6 +21,7 @@ These files are shared AI-agent instruction files for `log-it-cpp`. | `header-impl-RU.md` | Russian localization of the header ownership playbook. | | `logging-macros.md` | Macro-first logging guidance for agents and maintainers. | | `build.md` | Build, test, example, and benchmark flows. | +| `ci.md` | CI environment parity, toolchain compatibility, sanitizer/timing test guidance, and CI-failure workflow. | | `commits.md` | Commit message conventions and grouping rules. | | `git-workflow.md` | Branch policy, naming conventions, and required steps before starting work. | diff --git a/guides/ci.md b/guides/ci.md new file mode 100644 index 0000000..0852de3 --- /dev/null +++ b/guides/ci.md @@ -0,0 +1,68 @@ +# CI Environment Parity + +Use this file when a task changes CI workflows, CMake/build configuration, +tests, examples, public headers, dependency setup, packaging, or anything that +could compile differently across the supported matrix. + +## Version Compatibility + +- Check `cmake_minimum_required` before adding CMake syntax. This project + currently declares CMake 3.18, so avoid commands, options, or policy-dependent + behavior that require newer CMake unless the minimum is intentionally raised. +- Keep the C++ standard matrix in mind. The default build supports C++11, while + MDBX, OTLP, and Prometheus server paths may require C++17. Do not use C++17 + language/library features in C++11 paths. +- When adding dependency declarations, prefer syntax supported by the declared + minimum tool versions. If newer tool behavior is needed, update the minimum + version and document why. + +## Compiler Matrix + +- Treat GCC, Clang, and MSVC as distinct compilers, not interchangeable local + substitutes. A test that passes on one compiler can still fail on another. +- Write portable C++ for public headers and tests. Be explicit in lambda + captures when there is no default capture mode; MSVC diagnoses cases that + some local GCC builds may miss. +- Include the headers that provide the standard library types you use. Avoid + relying on transitive includes that may differ between standard libraries. +- Assume CI may promote warnings or tool diagnostics that are quiet locally. + Keep casts, conversions, and unused values minimal and intentional. + +## Tests Under Instrumentation + +- Prefer deterministic synchronization over sleeps, wall-clock timing, or + scheduler assumptions. If a timing-sensitive test is unavoidable, keep the + tolerance generous and document why the timing is part of the contract. +- Sanitizers and heavy instrumentation slow execution and can change scheduling. + Timing-sensitive or stress tests should be labelled or filtered in CMake/CI + when they are not meaningful under those modes. +- Put CI behavior in executable configuration, not only in documentation. If a + test must not run under a sanitizer or platform, express that through CMake + labels, test properties, or workflow filters. + +## Clean Environment Assumptions + +- Do not assume local utilities are available in CI images. If a workflow, + packaging step, or script needs a tool such as `file`, `git`, `curl`, or an + archiver, install or check for it explicitly. +- For optional dependencies, verify both enabled and disabled paths when the + changed code has compile-time branches. +- Keep generated build directories, downloaded artifacts, and local cache files + out of commits unless the task explicitly asks to change checked-in fixtures. + +## CI Failure Workflow + +- Inspect real job logs before changing code. Start with `gh pr checks`, then + open the failing GitHub Actions run or job log. +- Identify whether the failure is from compile errors, tests, environment setup, + packaging, or an external service. Keep the fix scoped to the observed cause. +- Reproduce locally when a matching toolchain is available. If it is not, make a + conservative compatibility fix and rely on the rerun CI signal. +- After pushing a CI fix, recheck the relevant jobs. Record any jobs still + pending or failing separately from the fixed failure. + +## Cross-References + +- `guides/build.md` - configure/build/test commands and common options. +- `guides/concurrency.md` - callback dispatch, mutex ordering, and async tests. +- `guides/cpp_style.md` - small C++ style rules that reduce compiler drift. diff --git a/include/logit_cpp/logit/log_macros.hpp b/include/logit_cpp/logit/log_macros.hpp index 94fe8d1..b42a47b 100644 --- a/include/logit_cpp/logit/log_macros.hpp +++ b/include/logit_cpp/logit/log_macros.hpp @@ -2999,7 +2999,7 @@ static_assert(LOGIT_LEVEL_FATAL == static_cast(logit::LogLevel::LOG_LVL_FAT auto* _reader = LOGIT_GET_LOG_READER(_idx); \ return _reader \ ? _reader->read_range(_from, _to, _limit) \ - : std::vector<::logit::LogRecordView>{}; \ + : std::vector<::logit::LogRecordSnapshot>{}; \ }((logger_index), (from_ms), (to_ms), (limit))) /// \brief Reads recent records from a backend that supports ILogReader. @@ -3013,7 +3013,7 @@ static_assert(LOGIT_LEVEL_FATAL == static_cast(logit::LogLevel::LOG_LVL_FAT auto* _reader = LOGIT_GET_LOG_READER(_idx); \ return _reader \ ? _reader->read_recent(_limit, _period_ms, _order) \ - : std::vector<::logit::LogRecordView>{}; \ + : std::vector<::logit::LogRecordSnapshot>{}; \ }((logger_index), (limit), (period_ms), (order))) /// \brief Reads recent records in ascending order (oldest first). @@ -3046,7 +3046,7 @@ static_assert(LOGIT_LEVEL_FATAL == static_cast(logit::LogLevel::LOG_LVL_FAT /// \brief Registers a callback to receive newly written log records. /// \param logger_index Index of logger. -/// \param callback Function invoked with LogRecordView after each commit. +/// \param callback Function invoked with an owning LogRecordSnapshot after each commit. /// \return Callback id (0 if the backend does not support subscriptions). #define LOGIT_ADD_LOG_CALLBACK(logger_index, callback) \ ([](int _idx, ::logit::ILogSubscriber::Callback _cb) -> uint64_t { \ diff --git a/include/logit_cpp/logit/loggers/ILogReader.hpp b/include/logit_cpp/logit/loggers/ILogReader.hpp index 959b84e..e046445 100644 --- a/include/logit_cpp/logit/loggers/ILogReader.hpp +++ b/include/logit_cpp/logit/loggers/ILogReader.hpp @@ -16,9 +16,12 @@ namespace logit { - /// \struct LogRecordView - /// \brief Common read-only view of a stored log record. - struct LogRecordView { + /// \struct LogRecordSnapshot + /// \brief Common owning snapshot of a stored log record. + /// + /// String fields are copied from the backend, so callers may store the + /// snapshot by value without depending on backend locks or record lifetime. + struct LogRecordSnapshot { uint64_t session_id = 0; ///< Owning session id, or 0 if unused. int64_t timestamp_ms = 0; ///< Log timestamp in milliseconds. uint32_t sequence = 0; ///< Per-timestamp sequence, or 0 if unused. @@ -63,8 +66,8 @@ namespace logit { /// \brief Optional interface for backends that expose stored records. /// /// Implementations should provide read_range for time-window queries - /// and read_recent for "tail -n" style access. Both return copies - /// so the caller does not depend on backend locking details. + /// and read_recent for "tail -n" style access. Both return owning + /// snapshots so the caller does not depend on backend locking details. class ILogReader { public: virtual ~ILogReader() = default; @@ -74,18 +77,18 @@ namespace logit { /// \param to_ms Exclusive end timestamp. /// \param limit Maximum number of records (0 = unlimited). /// \return Matching records in backend-defined order (usually ascending). - virtual std::vector read_range( + virtual std::vector read_range( int64_t from_ms, int64_t to_ms, std::size_t limit = 0) const = 0; #if __cplusplus >= 201703L /// \brief Reads records and preserves backend-specific read errors. - virtual LogReadResult> read_range_result( + virtual LogReadResult> read_range_result( int64_t from_ms, int64_t to_ms, std::size_t limit = 0) const { - LogReadResult> result; + LogReadResult> result; result.value = read_range(from_ms, to_ms, limit); return result; } @@ -96,18 +99,18 @@ namespace logit { /// \param period_ms Time window in milliseconds from now backward (0 = unlimited). /// \param order Ascending (oldest first) or Descending (newest first). /// \return Matching records in the requested order. - virtual std::vector read_recent( + virtual std::vector read_recent( std::size_t limit, int64_t period_ms = 0, LogReadOrder order = LogReadOrder::Ascending) const = 0; #if __cplusplus >= 201703L /// \brief Reads recent records and preserves backend-specific read errors. - virtual LogReadResult> read_recent_result( + virtual LogReadResult> read_recent_result( std::size_t limit, int64_t period_ms = 0, LogReadOrder order = LogReadOrder::Ascending) const { - LogReadResult> result; + LogReadResult> result; result.value = read_recent(limit, period_ms, order); return result; } diff --git a/include/logit_cpp/logit/loggers/ILogSubscriber.hpp b/include/logit_cpp/logit/loggers/ILogSubscriber.hpp index 80a7ebc..41774aa 100644 --- a/include/logit_cpp/logit/loggers/ILogSubscriber.hpp +++ b/include/logit_cpp/logit/loggers/ILogSubscriber.hpp @@ -14,21 +14,25 @@ namespace logit { /// \class ILogSubscriber /// \brief Optional interface for backends that can notify callers when a record is successfully written. /// - /// Callbacks receive a \ref LogRecordView containing the persisted preview and metadata. - /// They are guaranteed to be invoked only after the record has been committed to storage. + /// Callbacks receive a \ref LogRecordSnapshot containing the persisted preview and metadata. + /// String fields inside the snapshot are owned copies and may be stored by value. + /// Callbacks are invoked in registration order after the record has been committed to storage. /// Implementations must not hold internal locks while invoking callbacks. class ILogSubscriber { public: - using Callback = std::function; + using Callback = std::function; virtual ~ILogSubscriber() = default; /// \brief Registers a callback to be invoked after each successfully written record. - /// \param callback Function called with the written LogRecordView. + /// \param callback Function called with the written LogRecordSnapshot. /// \return Stable callback id that can be passed to remove_log_callback. virtual uint64_t add_log_callback(Callback callback) = 0; /// \brief Unregisters a previously added callback. + /// + /// Removal prevents future dispatch snapshots from including the callback, but cannot + /// cancel an invocation that has already been copied for dispatch. /// \param callback_id Id returned by add_log_callback. /// \return True if the callback existed and was removed. virtual bool remove_log_callback(uint64_t callback_id) = 0; diff --git a/include/logit_cpp/logit/loggers/MdbxLogger.hpp b/include/logit_cpp/logit/loggers/MdbxLogger.hpp index 0e83e4e..355f735 100644 --- a/include/logit_cpp/logit/loggers/MdbxLogger.hpp +++ b/include/logit_cpp/logit/loggers/MdbxLogger.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -208,20 +209,20 @@ namespace logit { } /// \brief Reads records in `[from_ms, to_ms)` ordered by timestamp and sequence. - std::vector read_range( + std::vector read_range( int64_t from_ms, int64_t to_ms, std::size_t limit = 0) const override { auto result = read_range_result(from_ms, to_ms, limit); - return result.value ? *result.value : std::vector(); + return result.value ? *result.value : std::vector(); } /// \brief Reads records and preserves storage/decode errors. - LogReadResult> read_range_result( + LogReadResult> read_range_result( int64_t from_ms, int64_t to_ms, std::size_t limit = 0) const override { - LogReadResult> result; + LogReadResult> result; result.value.emplace(); if (to_ms <= from_ms) { return result; @@ -236,7 +237,7 @@ namespace logit { std::lock_guard db_lock(m_db_mutex); m_records->for_each_range(from_key, to_key, [&result, limit](const std::string&, const Record& record) -> bool { - result.value->push_back(to_view(record)); + result.value->push_back(to_log_record_snapshot(record)); return limit == 0 || result.value->size() < limit; }); } catch (const detail::MdbxReadException& e) { @@ -265,16 +266,16 @@ namespace logit { /// \param period_ms Time window in milliseconds from now backward (0 = unlimited). /// \param order Ascending or descending result order. /// \return Matching records in the requested order. - std::vector read_recent( + std::vector read_recent( std::size_t limit, int64_t period_ms = 0, LogReadOrder order = LogReadOrder::Ascending) const override { auto result = read_recent_result(limit, period_ms, order); - return result.value ? *result.value : std::vector(); + return result.value ? *result.value : std::vector(); } /// \brief Reads the most recent records and preserves storage/decode errors. - LogReadResult> read_recent_result( + LogReadResult> read_recent_result( std::size_t limit, int64_t period_ms = 0, LogReadOrder order = LogReadOrder::Ascending) const override { @@ -607,7 +608,7 @@ namespace logit { std::atomic m_shutdown = ATOMIC_VAR_INIT(false); mutable std::mutex m_callbacks_mutex; - std::unordered_map m_callbacks; + std::map m_callbacks; std::atomic m_next_callback_id{1}; template @@ -618,7 +619,7 @@ namespace logit { return result; } - void notify_callbacks(const std::vector& views) const { + void notify_callbacks(const std::vector& snapshots) const { std::vector callbacks_copy; { std::lock_guard lock(m_callbacks_mutex); @@ -627,10 +628,10 @@ namespace logit { callbacks_copy.push_back(kv.second); } } - for (const auto& view : views) { + for (const auto& snapshot : snapshots) { for (const auto& cb : callbacks_copy) { try { - cb(view); + cb(snapshot); } catch (const std::exception& e) { if (m_config.on_error) { m_config.on_error(std::string("MdbxLogger callback error: ") + e.what()); @@ -644,18 +645,18 @@ namespace logit { } } - static LogRecordView to_view(const Record& r) { - LogRecordView v; - v.session_id = r.session_id; - v.timestamp_ms = r.timestamp_ms; - v.sequence = r.sequence; - v.level = r.level; - v.message = r.message; - v.payload_id = r.payload_id; - v.file = r.file; - v.function = r.function; - v.line = r.line; - return v; + static LogRecordSnapshot to_log_record_snapshot(const Record& r) { + LogRecordSnapshot snapshot; + snapshot.session_id = r.session_id; + snapshot.timestamp_ms = r.timestamp_ms; + snapshot.sequence = r.sequence; + snapshot.level = r.level; + snapshot.message = r.message; + snapshot.payload_id = r.payload_id; + snapshot.file = r.file; + snapshot.function = r.function; + snapshot.line = r.line; + return snapshot; } static SessionView to_view(const Session& s) { @@ -945,13 +946,13 @@ namespace logit { return; } - std::vector written_views; + std::vector written_snapshots; try { std::lock_guard db_lock(m_db_mutex); auto txn = m_connection->transaction(mdbxc::TransactionMode::WRITABLE); for (size_t i = 0; i < batch.size(); ++i) { Record record = write_item_locked(batch[i], txn); - written_views.push_back(to_view(record)); + written_snapshots.push_back(to_log_record_snapshot(record)); } txn.commit(); } catch (const std::exception& e) { @@ -968,7 +969,7 @@ namespace logit { return; } - notify_callbacks(written_views); + notify_callbacks(written_snapshots); } Record write_item_locked(const MdbxLogItem& item, mdbxc::Transaction& txn) { diff --git a/include/logit_cpp/logit/loggers/MemoryLogger.hpp b/include/logit_cpp/logit/loggers/MemoryLogger.hpp index 7cb4921..42e924b 100644 --- a/include/logit_cpp/logit/loggers/MemoryLogger.hpp +++ b/include/logit_cpp/logit/loggers/MemoryLogger.hpp @@ -7,6 +7,9 @@ #include "ILogger.hpp" #include "ILogReader.hpp" +#include "ILogSubscriber.hpp" + +#include namespace logit { @@ -16,7 +19,7 @@ namespace logit { /// \details Snapshots are returned oldest-to-newest. Read operations avoid /// `Logger`-level execution serialization, but still synchronize on this /// backend's own mutex while copying the current buffer. - class MemoryLogger : public ILogger, public ILogReader { + class MemoryLogger : public ILogger, public ILogReader, public ILogSubscriber { public: /// \struct Config /// \brief Retention limits for the in-memory buffer. @@ -58,15 +61,23 @@ namespace logit { entry.function = record.function; entry.message = message; - std::lock_guard lock(m_mutex); - m_evict_expired_locked(record.timestamp_ms); + LogRecordSnapshot written_snapshot; + { + std::lock_guard lock(m_mutex); + m_evict_expired_locked(record.timestamp_ms); - m_total_bytes += m_entry_bytes(entry); - m_entries.push_back(std::move(entry)); - m_last_log_ts.store(record.timestamp_ms, std::memory_order_relaxed); - m_last_log_mono_ts.store(LOGIT_MONOTONIC_MS(), std::memory_order_relaxed); + // Capture an owning snapshot before moving the entry; callbacks + // are dispatched only after the buffer update finishes. + written_snapshot = to_snapshot(entry); + m_total_bytes += m_entry_bytes(entry); + m_entries.push_back(std::move(entry)); + m_last_log_ts.store(record.timestamp_ms, std::memory_order_relaxed); + m_last_log_mono_ts.store(LOGIT_MONOTONIC_MS(), std::memory_order_relaxed); + + m_enforce_limits_locked(record.timestamp_ms); + } - m_enforce_limits_locked(record.timestamp_ms); + notify_callbacks(written_snapshot); } /// \brief Retrieve legacy string-based metadata. @@ -143,6 +154,18 @@ namespace logit { /// \brief Memory logger is synchronous, so no flush step is needed. void wait() override {} + uint64_t add_log_callback(Callback callback) override { + std::lock_guard lock(m_callbacks_mutex); + const uint64_t id = m_next_callback_id.fetch_add(1, std::memory_order_relaxed); + m_callbacks.emplace(id, std::move(callback)); + return id; + } + + bool remove_log_callback(uint64_t callback_id) override { + std::lock_guard lock(m_callbacks_mutex); + return m_callbacks.erase(callback_id) > 0; + } + /// \brief Clears buffered entries. LogClearResult clear_logs(const LogClearOptions& options = LogClearOptions()) override { (void)options; @@ -160,15 +183,15 @@ namespace logit { } /// \brief Reads records in `[from_ms, to_ms)` ordered by timestamp. - std::vector read_range( + std::vector read_range( int64_t from_ms, int64_t to_ms, std::size_t limit = 0) const override { #if __cplusplus >= 201703L auto result = read_range_result(from_ms, to_ms, limit); - return result.value ? *result.value : std::vector(); + return result.value ? *result.value : std::vector(); #else - std::vector out; + std::vector out; if (to_ms <= from_ms) { return out; } @@ -178,7 +201,7 @@ namespace logit { for (const auto& entry : m_entries) { if (entry.timestamp_ms >= from_ms && entry.timestamp_ms < to_ms) { - out.push_back(to_view(entry)); + out.push_back(to_snapshot(entry)); if (limit > 0 && out.size() >= limit) { break; } @@ -190,11 +213,11 @@ namespace logit { #if __cplusplus >= 201703L /// \brief Reads records and reports MemoryLogger read status. - LogReadResult> read_range_result( + LogReadResult> read_range_result( int64_t from_ms, int64_t to_ms, std::size_t limit = 0) const override { - LogReadResult> result; + LogReadResult> result; result.value.emplace(); if (to_ms <= from_ms) { return result; @@ -205,7 +228,7 @@ namespace logit { for (const auto& entry : m_entries) { if (entry.timestamp_ms >= from_ms && entry.timestamp_ms < to_ms) { - result.value->push_back(to_view(entry)); + result.value->push_back(to_snapshot(entry)); if (limit > 0 && result.value->size() >= limit) { break; } @@ -216,15 +239,15 @@ namespace logit { #endif /// \brief Reads the most recent records. - std::vector read_recent( + std::vector read_recent( std::size_t limit, int64_t period_ms = 0, LogReadOrder order = LogReadOrder::Ascending) const override { #if __cplusplus >= 201703L auto result = read_recent_result(limit, period_ms, order); - return result.value ? *result.value : std::vector(); + return result.value ? *result.value : std::vector(); #else - std::vector out; + std::vector out; std::lock_guard lock(m_mutex); m_evict_expired_locked(LOGIT_CURRENT_TIMESTAMP_MS()); @@ -233,7 +256,7 @@ namespace logit { for (auto it = m_entries.rbegin(); it != m_entries.rend(); ++it) { if (period_ms <= 0 || it->timestamp_ms >= from_ms) { - out.push_back(to_view(*it)); + out.push_back(to_snapshot(*it)); if (limit > 0 && out.size() >= limit) { break; } @@ -249,11 +272,11 @@ namespace logit { #if __cplusplus >= 201703L /// \brief Reads recent records and reports MemoryLogger read status. - LogReadResult> read_recent_result( + LogReadResult> read_recent_result( std::size_t limit, int64_t period_ms = 0, LogReadOrder order = LogReadOrder::Ascending) const override { - LogReadResult> result; + LogReadResult> result; result.value.emplace(); std::lock_guard lock(m_mutex); m_evict_expired_locked(LOGIT_CURRENT_TIMESTAMP_MS()); @@ -263,7 +286,7 @@ namespace logit { for (auto it = m_entries.rbegin(); it != m_entries.rend(); ++it) { if (period_ms <= 0 || it->timestamp_ms >= from_ms) { - result.value->push_back(to_view(*it)); + result.value->push_back(to_snapshot(*it)); if (limit > 0 && result.value->size() >= limit) { break; } @@ -278,16 +301,34 @@ namespace logit { #endif private: - static LogRecordView to_view(const BufferedLogEntry& e) { - LogRecordView v; - v.level = e.level; - v.timestamp_ms = e.timestamp_ms; - v.file = e.file; - v.line = e.line; - v.function = e.function; - v.message = e.message; - return v; + static LogRecordSnapshot to_snapshot(const BufferedLogEntry& e) { + LogRecordSnapshot snapshot; + snapshot.level = e.level; + snapshot.timestamp_ms = e.timestamp_ms; + snapshot.file = e.file; + snapshot.line = e.line; + snapshot.function = e.function; + snapshot.message = e.message; + return snapshot; + } + + void notify_callbacks(const LogRecordSnapshot& snapshot) const { + std::vector callbacks_copy; + { + std::lock_guard lock(m_callbacks_mutex); + callbacks_copy.reserve(m_callbacks.size()); + for (const auto& kv : m_callbacks) { + callbacks_copy.push_back(kv.second); + } + } + for (const auto& cb : callbacks_copy) { + try { + cb(snapshot); + } catch (...) { + } + } } + // Count only the retained formatted payload, not the full object footprint. static std::size_t m_entry_bytes(const BufferedLogEntry& entry) { return entry.message.size(); @@ -349,6 +390,9 @@ namespace logit { std::atomic m_last_log_ts = ATOMIC_VAR_INIT(0); std::atomic m_last_log_mono_ts = ATOMIC_VAR_INIT(0); std::atomic m_log_level = ATOMIC_VAR_INIT(static_cast(LogLevel::LOG_LVL_TRACE)); + mutable std::mutex m_callbacks_mutex; + std::map m_callbacks; + std::atomic m_next_callback_id{1}; }; } // namespace logit diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index fe7b362..d67a706 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -48,6 +48,7 @@ else() logger_snapshot_read_path_test.cpp logger_clear_api_test.cpp memory_logger_backend_test.cpp + memory_logger_callback_test.cpp memory_logger_concurrency_test.cpp memory_logger_integration_test.cpp mdbx_logger_test.cpp diff --git a/tests/mdbx_logger_test.cpp b/tests/mdbx_logger_test.cpp index 85072b6..b727cbe 100644 --- a/tests/mdbx_logger_test.cpp +++ b/tests/mdbx_logger_test.cpp @@ -643,9 +643,9 @@ void test_callback_sync() { config.async = false; logit::MdbxLogger logger(config); - std::vector received; + std::vector received; const uint64_t cb_id = logger.add_log_callback( - [&received](const logit::LogRecordView& v) { + [&received](const logit::LogRecordSnapshot& v) { received.push_back(v); }); assert(cb_id != 0); @@ -677,9 +677,9 @@ void test_callback_async() { config.max_batch_size = 64; logit::MdbxLogger logger(config); - std::vector received; + std::vector received; const uint64_t cb_id = logger.add_log_callback( - [&received](const logit::LogRecordView& v) { + [&received](const logit::LogRecordSnapshot& v) { received.push_back(v); }); assert(cb_id != 0); @@ -699,6 +699,38 @@ void test_callback_async() { cleanup_db(path); } +void test_callback_order() { + const std::string path = make_db_path("cb_order"); + cleanup_db(path); + + { + logit::MdbxLogger::Config config; + config.path = path; + config.async = false; + + logit::MdbxLogger logger(config); + std::vector order; + + logger.add_log_callback([&order](const logit::LogRecordSnapshot&) { + order.push_back(1); + }); + logger.add_log_callback([&order](const logit::LogRecordSnapshot&) { + order.push_back(2); + }); + logger.add_log_callback([&order](const logit::LogRecordSnapshot&) { + order.push_back(3); + }); + + logger.log(make_record(logit::LogLevel::LOG_LVL_INFO, 9500, 95), "ordered"); + const std::vector expected{1, 2, 3}; + assert(order == expected); + + logger.shutdown(); + } + + cleanup_db(path); +} + void test_callback_exception_safe() { const std::string path = make_db_path("cb_ex"); cleanup_db(path); @@ -716,11 +748,11 @@ void test_callback_exception_safe() { bool second_called = false; logger.add_log_callback( - [](const logit::LogRecordView&) { + [](const logit::LogRecordSnapshot&) { throw std::runtime_error("boom"); }); logger.add_log_callback( - [&second_called](const logit::LogRecordView&) { + [&second_called](const logit::LogRecordSnapshot&) { second_called = true; }); @@ -754,6 +786,7 @@ int main() { test_read_recent(); test_callback_sync(); test_callback_async(); + test_callback_order(); test_callback_exception_safe(); std::cout << "PASS: mdbx_logger_test" << std::endl; return 0; diff --git a/tests/memory_logger_callback_test.cpp b/tests/memory_logger_callback_test.cpp new file mode 100644 index 0000000..f5c6376 --- /dev/null +++ b/tests/memory_logger_callback_test.cpp @@ -0,0 +1,182 @@ +#include +#include +#include +#include +#include + +#include + +namespace { +logit::LogRecord make_record(int64_t ts, int line, const char* function = "writer") { + return logit::LogRecord( + logit::LogLevel::LOG_LVL_INFO, + ts, + "memory_logger_callback_test.cpp", + line, + function, + "", + "", + -1, + false); +} + +bool test_add_remove_and_snapshot() { + logit::MemoryLogger logger(logit::MemoryLogger::Config{0, 0, 0}); + std::vector received; + + const uint64_t callback_id = logger.add_log_callback( + [&received](const logit::LogRecordSnapshot& view) { + received.push_back(view); + }); + + logger.log(make_record(100, 10, "first"), "alpha"); + + if (received.size() != 1 || + received[0].timestamp_ms != 100 || + received[0].level != logit::LogLevel::LOG_LVL_INFO || + received[0].file != "memory_logger_callback_test.cpp" || + received[0].line != 10 || + received[0].function != "first" || + received[0].message != "alpha") { + return false; + } + + if (!logger.remove_log_callback(callback_id)) { + return false; + } + if (logger.remove_log_callback(callback_id)) { + return false; + } + + logger.log(make_record(101, 11, "after_remove"), "beta"); + return received.size() == 1; +} + +bool test_callback_order() { + logit::MemoryLogger logger(logit::MemoryLogger::Config{0, 0, 0}); + std::vector order; + + logger.add_log_callback([&order](const logit::LogRecordSnapshot&) { order.push_back(1); }); + logger.add_log_callback([&order](const logit::LogRecordSnapshot&) { order.push_back(2); }); + logger.add_log_callback([&order](const logit::LogRecordSnapshot&) { order.push_back(3); }); + + logger.log(make_record(200, 20, "ordered"), "ordered"); + + const std::vector expected{1, 2, 3}; + return order == expected; +} + +bool test_callbacks_can_reenter_logger() { + logit::MemoryLogger logger(logit::MemoryLogger::Config{0, 0, 0}); + std::atomic calls(0); + std::atomic failed(false); + uint64_t callback_id = 0; + + callback_id = logger.add_log_callback( + [&logger, &calls, &failed, &callback_id](const logit::LogRecordSnapshot& view) { + const auto entries = logger.get_buffered_entries(); + if (entries.empty() || entries.back().message != view.message) { + failed = true; + } + if (!logger.remove_log_callback(callback_id)) { + failed = true; + } + calls.fetch_add(1, std::memory_order_relaxed); + }); + + logger.log(make_record(300, 30, "reenter"), "self-remove"); + logger.log(make_record(301, 31, "reenter_after_remove"), "after"); + + return !failed.load() && calls.load() == 1; +} + +bool test_thread_safety() { + logit::MemoryLogger logger(logit::MemoryLogger::Config{0, 0, 0}); + const int writer_count = 4; + const int logs_per_writer = 250; + const int expected_logs = writer_count * logs_per_writer; + + std::atomic sequence(1); + std::atomic primary_calls(0); + std::atomic transient_calls(0); + std::atomic start(false); + std::atomic failed(false); + + const uint64_t primary_id = logger.add_log_callback( + [&primary_calls, &failed](const logit::LogRecordSnapshot& view) { + if (view.message.empty()) { + failed = true; + } + primary_calls.fetch_add(1, std::memory_order_relaxed); + }); + + std::vector writers; + for (int t = 0; t < writer_count; ++t) { + writers.push_back(std::thread([&logger, &sequence, &start, logs_per_writer, t]() { + while (!start.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + for (int i = 0; i < logs_per_writer; ++i) { + const int64_t ts = sequence.fetch_add(1, std::memory_order_relaxed); + logger.log(make_record(ts, t * logs_per_writer + i), std::to_string(ts)); + } + })); + } + + std::thread churner([&logger, &transient_calls, &start, &failed]() { + while (!start.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + for (int i = 0; i < 500; ++i) { + const uint64_t id = logger.add_log_callback( + [&transient_calls](const logit::LogRecordSnapshot&) { + transient_calls.fetch_add(1, std::memory_order_relaxed); + }); + if (!logger.remove_log_callback(id)) { + failed = true; + } + } + }); + + start.store(true, std::memory_order_release); + for (auto& writer : writers) { + writer.join(); + } + churner.join(); + + if (failed.load()) { + return false; + } + if (primary_calls.load() != expected_logs) { + return false; + } + + const auto entries = logger.get_buffered_entries(); + if (entries.size() != static_cast(expected_logs)) { + return false; + } + + if (!logger.remove_log_callback(primary_id)) { + return false; + } + const int calls_after_remove = primary_calls.load(); + logger.log(make_record(sequence.fetch_add(1, std::memory_order_relaxed), 9999), "after-remove"); + return primary_calls.load() == calls_after_remove; +} +} // namespace + +int main() { + if (!test_add_remove_and_snapshot()) { + return 1; + } + if (!test_callback_order()) { + return 1; + } + if (!test_callbacks_can_reenter_logger()) { + return 1; + } + if (!test_thread_safety()) { + return 1; + } + return 0; +}