diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index ac49f269d9..ae933e63ad 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -30,6 +30,7 @@ #include "bthread/timer_thread.h" #include "bthread/list_of_abafree_id.h" #include "bthread/bthread.h" +#include "bthread/worker_idle.h" namespace bthread { extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace, @@ -597,6 +598,17 @@ int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) { return 0; } +int bthread_register_worker_idle_function(int (*init_fn)(void), + bool (*idle_fn)(void), + uint64_t timeout_us, + int* handle) { + return bthread::register_worker_idle_function(init_fn, idle_fn, timeout_us, handle); +} + +int bthread_unregister_worker_idle_function(int handle) { + return bthread::unregister_worker_idle_function(handle); +} + int bthread_set_create_span_func(void* (*func)()) { if (func == NULL) { return EINVAL; diff --git a/src/bthread/parking_lot.h b/src/bthread/parking_lot.h index bbc9a7c3fd..0f31b843c7 100644 --- a/src/bthread/parking_lot.h +++ b/src/bthread/parking_lot.h @@ -64,7 +64,7 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot { // Wait for tasks. // If the `expected_state' does not match, wait() may finish directly. - void wait(const State& expected_state) { + void wait(const State& expected_state, const timespec* timeout = NULL) { if (get_state().val != expected_state.val) { // Fast path, no need to futex_wait. return; @@ -72,7 +72,7 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot { if (_no_signal_when_no_waiter) { _waiter_num.fetch_add(1, butil::memory_order_relaxed); } - futex_wait_private(&_pending_signal, expected_state.val, NULL); + futex_wait_private(&_pending_signal, expected_state.val, timeout); if (_no_signal_when_no_waiter) { _waiter_num.fetch_sub(1, butil::memory_order_relaxed); } diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 877a5d406e..7ffe5a89d9 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -37,6 +37,7 @@ #include "bthread/task_group.h" #include "bthread/timer_thread.h" #include "bthread/bthread.h" +#include "bthread/worker_idle.h" #ifdef __x86_64__ #include @@ -167,7 +168,10 @@ bool TaskGroup::wait_task(bthread_t* tid) { if (_last_pl_state.stopped()) { return false; } - _pl->wait(_last_pl_state); + run_worker_idle_functions(); + const timespec timeout = get_worker_idle_timeout(); + const bool empty_time = (timeout.tv_sec == 0 && timeout.tv_nsec == 0); + _pl->wait(_last_pl_state, empty_time ? NULL : &timeout); if (steal_task(tid)) { return true; } @@ -176,10 +180,13 @@ bool TaskGroup::wait_task(bthread_t* tid) { if (st.stopped()) { return false; } + run_worker_idle_functions(); if (steal_task(tid)) { return true; } - _pl->wait(st); + const timespec timeout = get_worker_idle_timeout(); + const bool empty_time = (timeout.tv_sec == 0 && timeout.tv_nsec == 0); + _pl->wait(st, empty_time ? NULL : &timeout); #endif } while (true); } diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h index 4580202f87..bea3077f77 100644 --- a/src/bthread/unstable.h +++ b/src/bthread/unstable.h @@ -92,6 +92,46 @@ extern int bthread_set_worker_startfn(void (*start_fn)()); // Add a startup function with tag extern int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)); +// Registers a per-worker init function and an idle function. +// +// The init function is called at most once per worker thread, before the first +// invocation of idle_fn in that worker. +// +// The idle function is called when a worker has no task to run. +// The return value of idle_fn is ignored. +// If no idle function is registered, the worker waits indefinitely. Otherwise +// the worker waits for at most the minimal timeout among registered functions +// before trying again. +// +// This function is thread-safe. +// +// Args: +// init_fn: Optional. Called once per worker thread. Return 0 on success. A +// non-zero return value disables idle_fn for that worker thread. +// idle_fn: Required. Must not be NULL. Return true if any work is done. +// timeout_us: Required. Must be > 0. Maximum waiting time when worker is idle. +// handle: Optional output. On success, set to a positive handle for later +// unregistration. +// +// Returns: +// 0 on success, error code otherwise. +extern int bthread_register_worker_idle_function(int (*init_fn)(void), + bool (*idle_fn)(void), + uint64_t timeout_us, + int* handle); + +// Unregisters an idle function by handle returned by +// bthread_register_worker_idle_function(). +// +// This function is thread-safe. +// +// Args: +// handle: Handle returned by bthread_register_worker_idle_function(). +// +// Returns: +// 0 on success, error code otherwise. +extern int bthread_unregister_worker_idle_function(int handle); + // Add a create span function extern int bthread_set_create_span_func(void* (*func)()); diff --git a/src/bthread/worker_idle.cpp b/src/bthread/worker_idle.cpp new file mode 100644 index 0000000000..592b8e6b3f --- /dev/null +++ b/src/bthread/worker_idle.cpp @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "bthread/worker_idle.h" + +#include + +#include +#include +#include + +#include "butil/atomicops.h" +#include "butil/containers/doubly_buffered_data.h" +#include "butil/time.h" +#include "butil/thread_local.h" + +namespace bthread { +namespace { + +enum InitState : uint8_t { + INIT_STATE_NOT_RUN = 0, + INIT_STATE_OK = 1, + INIT_STATE_FAILED = 2, +}; + +struct WorkerIdleEntry { + int id; + int (*init_fn)(void); + bool (*idle_fn)(void); + uint64_t timeout_us; +}; + +typedef std::vector WorkerIdleEntryList; + +static butil::DoublyBufferedData g_entries; +static butil::atomic g_next_id(1); + +struct WorkerIdleTLS { + std::vector init_states; +}; + +BAIDU_THREAD_LOCAL WorkerIdleTLS* tls_worker_idle = NULL; + +static WorkerIdleTLS* get_or_create_tls() { + if (tls_worker_idle) { + return tls_worker_idle; + } + tls_worker_idle = new (std::nothrow) WorkerIdleTLS; + return tls_worker_idle; +} + +} // namespace + +int register_worker_idle_function(int (*init_fn)(void), + bool (*idle_fn)(void), + uint64_t timeout_us, + int* handle) { + if (idle_fn == NULL) { + return EINVAL; + } + if (timeout_us == 0) { + return EINVAL; + } + const int id = g_next_id.fetch_add(1, butil::memory_order_relaxed); + WorkerIdleEntry e; + e.id = id; + e.init_fn = init_fn; + e.idle_fn = idle_fn; + e.timeout_us = timeout_us; + g_entries.Modify([&](WorkerIdleEntryList& bg) { + bg.push_back(e); + return static_cast(1); + }); + if (handle) { + *handle = id; + } + return 0; +} + +int unregister_worker_idle_function(int handle) { + if (handle <= 0) { + return EINVAL; + } + size_t removed = g_entries.Modify([&](WorkerIdleEntryList& bg) { + const size_t old_size = bg.size(); + bg.erase(std::remove_if(bg.begin(), bg.end(), + [&](const WorkerIdleEntry& e) { + return e.id == handle; + }), + bg.end()); + return old_size - bg.size(); + }); + return removed ? 0 : EINVAL; +} + +bool has_worker_idle_functions() { + butil::DoublyBufferedData::ScopedPtr p; + if (g_entries.Read(&p) != 0) { + return false; + } + return !p->empty(); +} + +void run_worker_idle_functions() { + if (!has_worker_idle_functions()) { + return; + } + butil::DoublyBufferedData::ScopedPtr p; + if (g_entries.Read(&p) != 0) { + return; + } + if (p->empty()) { + return; + } + + WorkerIdleTLS* tls = get_or_create_tls(); + if (tls == NULL) { + return; + } + + // Step 1: Ensure per-worker init is called at most once for each entry. + // Step 2: Run idle callbacks for initialized entries. + // Step 3: Ignore callback return values. The caller decides how to proceed. + for (const auto& e : *p) { + if (e.id <= 0 || e.idle_fn == NULL) { + continue; + } + if (tls->init_states.size() <= static_cast(e.id)) { + tls->init_states.resize(static_cast(e.id) + 1, INIT_STATE_NOT_RUN); + } + uint8_t& st = tls->init_states[static_cast(e.id)]; + if (st == INIT_STATE_NOT_RUN) { + // Run the init callback function once. + if (e.init_fn) { + const int rc = e.init_fn(); + st = (rc == 0) ? INIT_STATE_OK : INIT_STATE_FAILED; + } else { + st = INIT_STATE_OK; + } + } + if (st != INIT_STATE_OK) { + continue; + } + // Run the idle callback function. + e.idle_fn(); + } +} + +timespec get_worker_idle_timeout() { + butil::DoublyBufferedData::ScopedPtr p; + if (g_entries.Read(&p) != 0) { + return {0, 0}; + } + if (p->empty()) { + return {0, 0}; + } + uint64_t min_us = 0; + for (const auto& e : *p) { + if (e.timeout_us == 0) { + continue; + } + if (min_us == 0 || e.timeout_us < min_us) { + min_us = e.timeout_us; + } + } + if (min_us == 0) { + return {0, 0}; + } + return butil::microseconds_to_timespec(min_us); +} + +} // namespace bthread + + diff --git a/src/bthread/worker_idle.h b/src/bthread/worker_idle.h new file mode 100644 index 0000000000..148f9630d7 --- /dev/null +++ b/src/bthread/worker_idle.h @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BTHREAD_WORKER_IDLE_H +#define BTHREAD_WORKER_IDLE_H + +#include +#include + +namespace bthread { + +// Registers a per-worker init function and an idle function. +// +// The init function is called at most once per worker thread, before running +// the idle function in that worker thread. +// +// Args: +// init_fn: Optional. Can be NULL. +// idle_fn: Required. Must not be NULL. +// timeout_us: Required. Must be > 0. +// handle: Optional output handle for unregistering later. +// +// Returns: +// 0 on success, error code otherwise. +int register_worker_idle_function(int (*init_fn)(void), + bool (*idle_fn)(void), + uint64_t timeout_us, + int* handle); + +// Unregisters a previously registered idle function by handle. +// +// Args: +// handle: Handle returned by register_worker_idle_function(). +// +// Returns: +// 0 on success, error code otherwise. +int unregister_worker_idle_function(int handle); + +// Returns true if any idle function is registered. +bool has_worker_idle_functions(); + +// Runs all registered idle functions for current worker thread. +void run_worker_idle_functions(); + +// Get the minimal timeout among all registered functions. +// Returns {0,0} if no idle function is registered. +timespec get_worker_idle_timeout(); + +} // namespace bthread + +#endif // BTHREAD_WORKER_IDLE_H + + diff --git a/test/bthread_idle_unittest.cpp b/test/bthread_idle_unittest.cpp new file mode 100644 index 0000000000..8a38dea983 --- /dev/null +++ b/test/bthread_idle_unittest.cpp @@ -0,0 +1,505 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +// Per-worker state for module callbacks. +struct WorkerModuleTLS { + bool inited_a; + bool inited_b; + int poll_a; + int poll_b; + + WorkerModuleTLS() : inited_a(false), inited_b(false), poll_a(0), poll_b(0) {} +}; + +// Thread-local storage for simulating per-worker resources. +static __thread WorkerModuleTLS* tls_modules = nullptr; + +// Global stats for validating execution. +static std::atomic init_calls_a(0); +static std::atomic init_calls_b(0); +static std::atomic idle_calls_a(0); +static std::atomic idle_calls_b(0); +static std::atomic init_twice_a(0); +static std::atomic init_twice_b(0); +static std::atomic idle_without_init_a(0); +static std::atomic idle_without_init_b(0); + +// Set to collect all unique worker IDs we've seen. +static std::set observed_worker_ids; +static std::mutex stats_mutex; +static std::atomic global_worker_counter(0); + +// Stats for init failure test. +static std::atomic init_failure_calls(0); +static std::atomic idle_after_init_failure(0); + +// Stats for always-return-true test. +static std::atomic always_true_calls(0); + +// Stats for timeout test. +static std::atomic timeout_test_calls(0); + +// Stats for unregister test. +static std::atomic unregister_test_idle_calls(0); + +// Stats for thread safety test. +static std::atomic concurrent_register_count(0); + +int MockInitConcurrent() { + concurrent_register_count.fetch_add(1, std::memory_order_relaxed); + return 0; +} + +bool MockIdleConcurrent() { + return false; +} + +// Init for module A. Runs at most once per worker thread by design. +int MockWorkerInitA() { + if (tls_modules == nullptr) { + tls_modules = new WorkerModuleTLS(); + } + if (tls_modules->inited_a) { + init_twice_a.fetch_add(1, std::memory_order_relaxed); + return 0; + } + tls_modules->inited_a = true; + init_calls_a.fetch_add(1, std::memory_order_relaxed); + std::lock_guard lock(stats_mutex); + observed_worker_ids.insert(global_worker_counter.fetch_add(1, std::memory_order_relaxed)); + LOG(INFO) << "MockWorkerInitA: inited_a=" << tls_modules->inited_a; + return 0; +} + +// Init for module B. Runs at most once per worker thread by design. +int MockWorkerInitB() { + if (tls_modules == nullptr) { + tls_modules = new WorkerModuleTLS(); + } + if (tls_modules->inited_b) { + init_twice_b.fetch_add(1, std::memory_order_relaxed); + return 0; + } + tls_modules->inited_b = true; + init_calls_b.fetch_add(1, std::memory_order_relaxed); + LOG(INFO) << "MockWorkerInitB: inited_b=" << tls_modules->inited_b; + return 0; +} + +// Idle callback for module A. Must run only after init succeeded. +bool MockIdlePollerA() { + idle_calls_a.fetch_add(1, std::memory_order_relaxed); + if (tls_modules == nullptr || !tls_modules->inited_a) { + idle_without_init_a.fetch_add(1, std::memory_order_relaxed); + return false; + } + ++tls_modules->poll_a; + if (tls_modules->poll_a % 64 == 0) { + LOG(INFO) << "MockIdlePollerA: poll_a=" << tls_modules->poll_a; + return true; + } + return false; +} + +// Idle callback for module B. Must run only after init succeeded. +bool MockIdlePollerB() { + idle_calls_b.fetch_add(1, std::memory_order_relaxed); + if (tls_modules == nullptr || !tls_modules->inited_b) { + idle_without_init_b.fetch_add(1, std::memory_order_relaxed); + return false; + } + ++tls_modules->poll_b; + if (tls_modules->poll_b % 32 == 0) { + LOG(INFO) << "MockIdlePollerB: poll_b=" << tls_modules->poll_b; + return true; + } + return false; +} + +class IdleCallbackTest : public ::testing::Test { +protected: + void SetUp() override { + // Reset global state. + observed_worker_ids.clear(); + global_worker_counter.store(0, std::memory_order_relaxed); + init_calls_a.store(0, std::memory_order_relaxed); + init_calls_b.store(0, std::memory_order_relaxed); + idle_calls_a.store(0, std::memory_order_relaxed); + idle_calls_b.store(0, std::memory_order_relaxed); + init_twice_a.store(0, std::memory_order_relaxed); + init_twice_b.store(0, std::memory_order_relaxed); + idle_without_init_a.store(0, std::memory_order_relaxed); + idle_without_init_b.store(0, std::memory_order_relaxed); + init_failure_calls.store(0, std::memory_order_relaxed); + idle_after_init_failure.store(0, std::memory_order_relaxed); + always_true_calls.store(0, std::memory_order_relaxed); + timeout_test_calls.store(0, std::memory_order_relaxed); + unregister_test_idle_calls.store(0, std::memory_order_relaxed); + concurrent_register_count.store(0, std::memory_order_relaxed); + } + + void TearDown() override { + // Clean up registered callbacks to avoid affecting other tests. + if (_handle_a > 0) { + bthread_unregister_worker_idle_function(_handle_a); + _handle_a = 0; + } + if (_handle_b > 0) { + bthread_unregister_worker_idle_function(_handle_b); + _handle_b = 0; + } + if (_handle_c > 0) { + bthread_unregister_worker_idle_function(_handle_c); + _handle_c = 0; + } + if (_handle_d > 0) { + bthread_unregister_worker_idle_function(_handle_d); + _handle_d = 0; + } + if (_handle_e > 0) { + bthread_unregister_worker_idle_function(_handle_e); + _handle_e = 0; + } + } + + int _handle_a = 0; + int _handle_b = 0; + int _handle_c = 0; + int _handle_d = 0; + int _handle_e = 0; +}; + +void* dummy_task(void* arg) { + // Sleep to allow workers to enter idle loop. + bthread_usleep(1000); + return nullptr; +} + +TEST_F(IdleCallbackTest, WorkerIsolationAndExecution) { + // 1. Register multiple (init, idle) pairs from different "modules". + ASSERT_EQ(0, bthread_register_worker_idle_function( + MockWorkerInitA, MockIdlePollerA, 1000, &_handle_a)); + ASSERT_EQ(0, bthread_register_worker_idle_function( + MockWorkerInitB, MockIdlePollerB, 1000, &_handle_b)); + + // 2. Determine number of workers (concurrency) + int concurrency = bthread_getconcurrency(); + LOG(INFO) << "Current concurrency: " << concurrency; + + // 3. Create enough bthreads to ensure all workers are activated at least once + // but also give them time to become idle. + std::vector tids; + for (int i = 0; i < concurrency * 2; ++i) { + bthread_t tid; + bthread_start_background(&tid, nullptr, dummy_task, nullptr); + tids.push_back(tid); + } + + // 4. Wait for all tasks to complete + for (bthread_t tid : tids) { + bthread_join(tid, nullptr); + } + + // 5. Sleep a bit to ensure all workers have had a chance to hit the idle loop. + usleep(50 * 1000); + + // 6. Verify results. + std::lock_guard lock(stats_mutex); + LOG(INFO) << "Observed " << observed_worker_ids.size() << " unique worker contexts."; + + // Basic sanity: both module callbacks should have been executed at least once. + EXPECT_GT(init_calls_a.load(std::memory_order_relaxed), 0); + EXPECT_GT(init_calls_b.load(std::memory_order_relaxed), 0); + EXPECT_GT(idle_calls_a.load(std::memory_order_relaxed), 0); + EXPECT_GT(idle_calls_b.load(std::memory_order_relaxed), 0); + + // Init should not run twice in the same worker thread for the same module. + EXPECT_EQ(init_twice_a.load(std::memory_order_relaxed), 0); + EXPECT_EQ(init_twice_b.load(std::memory_order_relaxed), 0); + + // Idle should not run before init is completed. + EXPECT_EQ(idle_without_init_a.load(std::memory_order_relaxed), 0); + EXPECT_EQ(idle_without_init_b.load(std::memory_order_relaxed), 0); + + // We expect at least one worker to have initialized its context. + ASSERT_GT(observed_worker_ids.size(), 0); + + // If concurrency is larger than 1, it is likely we observed multiple workers. + if (concurrency > 1) { + EXPECT_GT(observed_worker_ids.size(), 1); + } +} + +// Test parameter validation. +TEST_F(IdleCallbackTest, ParameterValidation) { + int handle = 0; + + // NULL idle_fn should fail (init_fn can be NULL, but idle_fn cannot). + ASSERT_EQ(EINVAL, bthread_register_worker_idle_function( + MockWorkerInitA, nullptr, 1000, &handle)); + + // timeout_us = 0 should fail. + ASSERT_EQ(EINVAL, bthread_register_worker_idle_function( + MockWorkerInitA, MockIdlePollerA, 0, &handle)); + + // Valid registration should succeed. + ASSERT_EQ(0, bthread_register_worker_idle_function( + MockWorkerInitA, MockIdlePollerA, 1000, &_handle_a)); + ASSERT_GT(_handle_a, 0); + + // Unregister with invalid handle should fail. + ASSERT_EQ(EINVAL, bthread_unregister_worker_idle_function(-1)); + ASSERT_EQ(EINVAL, bthread_unregister_worker_idle_function(0)); + ASSERT_EQ(EINVAL, bthread_unregister_worker_idle_function(99999)); + + // Unregister with valid handle should succeed. + ASSERT_EQ(0, bthread_unregister_worker_idle_function(_handle_a)); + _handle_a = 0; +} + +// Test init failure scenario: idle_fn should not run if init_fn returns non-zero. +int MockWorkerInitFailure() { + init_failure_calls.fetch_add(1, std::memory_order_relaxed); + return -1; // Init fails. +} + +bool MockIdleAfterInitFailure() { + idle_after_init_failure.fetch_add(1, std::memory_order_relaxed); + return false; +} + +TEST_F(IdleCallbackTest, InitFailurePreventsIdleExecution) { + ASSERT_EQ(0, bthread_register_worker_idle_function( + MockWorkerInitFailure, MockIdleAfterInitFailure, 1000, &_handle_a)); + + // Create some bthreads to activate workers. + std::vector tids; + for (int i = 0; i < 5; ++i) { + bthread_t tid; + bthread_start_background(&tid, nullptr, dummy_task, nullptr); + tids.push_back(tid); + } + + for (bthread_t tid : tids) { + bthread_join(tid, nullptr); + } + + // Wait for idle loop. + usleep(50 * 1000); + + // Init should have been called. + EXPECT_GT(init_failure_calls.load(std::memory_order_relaxed), 0); + + // Idle should NOT have been called because init failed. + EXPECT_EQ(idle_after_init_failure.load(std::memory_order_relaxed), 0); +} + +// Test registration without init_fn. +bool MockIdleWithoutInit() { + timeout_test_calls.fetch_add(1, std::memory_order_relaxed); + return false; +} + +TEST_F(IdleCallbackTest, RegistrationWithoutInitFunction) { + // Register with NULL init_fn should work. + ASSERT_EQ(0, bthread_register_worker_idle_function( + nullptr, MockIdleWithoutInit, 2000, &_handle_a)); + + std::vector tids; + for (int i = 0; i < 5; ++i) { + bthread_t tid; + bthread_start_background(&tid, nullptr, dummy_task, nullptr); + tids.push_back(tid); + } + + for (bthread_t tid : tids) { + bthread_join(tid, nullptr); + } + + usleep(50 * 1000); + + // Idle should have been called even without init_fn. + EXPECT_GT(timeout_test_calls.load(std::memory_order_relaxed), 0); +} + +// Test multiple registrations and timeout calculation. +bool MockIdleTimeout1() { + return false; +} + +bool MockIdleTimeout2() { + return false; +} + +bool MockIdleTimeout3() { + return false; +} + +TEST_F(IdleCallbackTest, MultipleRegistrationsAndTimeout) { + int handle1 = 0, handle2 = 0, handle3 = 0; + + // Register with different timeouts: 100us, 500us, 2000us. + ASSERT_EQ(0, bthread_register_worker_idle_function( + nullptr, MockIdleTimeout1, 100, &handle1)); + ASSERT_EQ(0, bthread_register_worker_idle_function( + nullptr, MockIdleTimeout2, 500, &handle2)); + ASSERT_EQ(0, bthread_register_worker_idle_function( + nullptr, MockIdleTimeout3, 2000, &handle3)); + + // The minimal timeout should be 100us. + // We verify this indirectly by checking that idle functions are called + // frequently enough (the minimal timeout determines wait time). + + std::vector tids; + for (int i = 0; i < 3; ++i) { + bthread_t tid; + bthread_start_background(&tid, nullptr, dummy_task, nullptr); + tids.push_back(tid); + } + + for (bthread_t tid : tids) { + bthread_join(tid, nullptr); + } + + // Clean up. + bthread_unregister_worker_idle_function(handle1); + bthread_unregister_worker_idle_function(handle2); + bthread_unregister_worker_idle_function(handle3); +} + +// Test unregister functionality. +bool MockIdleForUnregister() { + unregister_test_idle_calls.fetch_add(1, std::memory_order_relaxed); + return false; +} + +TEST_F(IdleCallbackTest, UnregisterStopsIdleExecution) { + ASSERT_EQ(0, bthread_register_worker_idle_function( + nullptr, MockIdleForUnregister, 1000, &_handle_a)); + + std::vector tids; + for (int i = 0; i < 3; ++i) { + bthread_t tid; + bthread_start_background(&tid, nullptr, dummy_task, nullptr); + tids.push_back(tid); + } + + for (bthread_t tid : tids) { + bthread_join(tid, nullptr); + } + + usleep(20 * 1000); + const int calls_before = unregister_test_idle_calls.load(std::memory_order_relaxed); + EXPECT_GT(calls_before, 0); + + // Unregister. + ASSERT_EQ(0, bthread_unregister_worker_idle_function(_handle_a)); + _handle_a = 0; + + // Wait a bit more. + usleep(30 * 1000); + const int calls_after = unregister_test_idle_calls.load(std::memory_order_relaxed); + + // Calls should not increase much after unregister (may increase slightly + // due to in-flight calls, but should stabilize). + EXPECT_LE(calls_after - calls_before, 5); +} + +// Test that always returning true does not cause busy loop. +bool MockIdleAlwaysTrue() { + always_true_calls.fetch_add(1, std::memory_order_relaxed); + return true; // Always report work done. +} + +TEST_F(IdleCallbackTest, AlwaysReturnTrueDoesNotBusyLoop) { + ASSERT_EQ(0, bthread_register_worker_idle_function( + nullptr, MockIdleAlwaysTrue, 10000, &_handle_a)); + + std::vector tids; + for (int i = 0; i < 3; ++i) { + bthread_t tid; + bthread_start_background(&tid, nullptr, dummy_task, nullptr); + tids.push_back(tid); + } + + for (bthread_t tid : tids) { + bthread_join(tid, nullptr); + } + + // Wait a short time. + const auto start_time = std::chrono::steady_clock::now(); + usleep(10 * 1000); // 10ms + const auto end_time = std::chrono::steady_clock::now(); + const auto elapsed_ms = std::chrono::duration_cast( + end_time - start_time) + .count(); + + const int calls = always_true_calls.load(std::memory_order_relaxed); + + LOG(INFO) << "AlwaysReturnTrue test: " << calls << " calls in " << elapsed_ms + << "ms (expected low frequency)"; + + EXPECT_GT(calls, 0); + EXPECT_LT(calls, 1000); +} + +// Test thread safety of register/unregister. +void* concurrent_register_task(void* arg) { + (void)arg; + int handle = 0; + if (bthread_register_worker_idle_function(MockInitConcurrent, MockIdleConcurrent, + 1000, &handle) == 0) { + // Small delay, then unregister. + bthread_usleep(1000); + bthread_unregister_worker_idle_function(handle); + } + return nullptr; +} + +TEST_F(IdleCallbackTest, ThreadSafety) { + // Concurrent registration/unregistration from multiple bthreads. + std::vector tids; + for (int i = 0; i < 20; ++i) { + bthread_t tid; + bthread_start_background(&tid, nullptr, concurrent_register_task, nullptr); + tids.push_back(tid); + } + + for (bthread_t tid : tids) { + bthread_join(tid, nullptr); + } + + // Should have seen some init calls from concurrent registrations. + // Exact number is non-deterministic, but should be > 0. + EXPECT_GT(concurrent_register_count.load(std::memory_order_relaxed), 0); +} + +} // namespace