diff --git a/src/control/system/raft/database.go b/src/control/system/raft/database.go index 926d35610a6..0c9db392923 100644 --- a/src/control/system/raft/database.go +++ b/src/control/system/raft/database.go @@ -1,6 +1,6 @@ // // (C) Copyright 2020-2024 Intel Corporation. -// (C) Copyright 2025 Hewlett Packard Enterprise Development LP +// (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -95,6 +95,7 @@ type ( raftTransport raft.Transport raft syncRaft raftLeaderNotifyCh chan bool + cbMutex sync.Mutex onLeadershipGained []onLeadershipGainedFn onLeadershipLost []onLeadershipLostFn onRaftShutdown []onRaftShutdownFn @@ -246,11 +247,14 @@ func NewDatabase(log logging.Logger, cfg *DatabaseConfig) (*Database, error) { repAddr, _ := cfg.LocalReplicaAddr() db := &Database{ - log: log, - cfg: cfg, - replicaAddr: repAddr, - shutdownErrCh: make(chan error), - raftLeaderNotifyCh: make(chan bool), + log: log, + cfg: cfg, + replicaAddr: repAddr, + shutdownErrCh: make(chan error), + // Buffered so hashicorp/raft can post a leadership + // transition without blocking if the monitor goroutine + // is momentarily busy running callbacks. + raftLeaderNotifyCh: make(chan bool, 1), data: &dbData{ log: log, @@ -416,21 +420,47 @@ func (db *Database) IsLeader() bool { // OnLeadershipGained registers callbacks to be run when this instance // gains the leadership role. func (db *Database) OnLeadershipGained(fns ...onLeadershipGainedFn) { + db.cbMutex.Lock() + defer db.cbMutex.Unlock() db.onLeadershipGained = append(db.onLeadershipGained, fns...) } // OnLeadershipLost registers callbacks to be run when this instance // loses the leadership role. func (db *Database) OnLeadershipLost(fns ...onLeadershipLostFn) { + db.cbMutex.Lock() + defer db.cbMutex.Unlock() db.onLeadershipLost = append(db.onLeadershipLost, fns...) } // OnRaftShutdown registers callbacks to be run when this instance // shuts down. func (db *Database) OnRaftShutdown(fns ...onRaftShutdownFn) { + db.cbMutex.Lock() + defer db.cbMutex.Unlock() db.onRaftShutdown = append(db.onRaftShutdown, fns...) } +// Return copies of the registered callbacks under a lock, so that they +// can be safely retrieved and executed without holding the lock. +func (db *Database) onLeadershipGainedCbs() []onLeadershipGainedFn { + db.cbMutex.Lock() + defer db.cbMutex.Unlock() + return append([]onLeadershipGainedFn(nil), db.onLeadershipGained...) +} + +func (db *Database) onLeadershipLostCbs() []onLeadershipLostFn { + db.cbMutex.Lock() + defer db.cbMutex.Unlock() + return append([]onLeadershipLostFn(nil), db.onLeadershipLost...) +} + +func (db *Database) onRaftShutdownCbs() []onRaftShutdownFn { + db.cbMutex.Lock() + defer db.cbMutex.Unlock() + return append([]onRaftShutdownFn(nil), db.onRaftShutdown...) +} + // Start checks to see if the system is configured as a MS replica. If // not, it returns early without an error. If it is, the persistent storage // is initialized if necessary, and the replica is started to begin the @@ -490,7 +520,7 @@ func (db *Database) monitorLeadershipState(parent context.Context) { var cancelGainedCtx context.CancelFunc runOnLeadershipLost := func() { - for _, fn := range db.onLeadershipLost { + for _, fn := range db.onLeadershipLostCbs() { if err := fn(); err != nil { db.log.Errorf("failure in onLeadershipLost callback: %s", err) } @@ -544,7 +574,7 @@ func (db *Database) stepUp(ctx context.Context, cancel context.CancelFunc) { return // restart the monitoring loop } - for i, fn := range db.onLeadershipGained { + for i, fn := range db.onLeadershipGainedCbs() { db.log.Tracef("executing onLeadershipGained[%d]", i) if err := fn(ctx); err != nil { diff --git a/src/control/system/raft/database_test.go b/src/control/system/raft/database_test.go index 826af185c6b..c28e57222f3 100644 --- a/src/control/system/raft/database_test.go +++ b/src/control/system/raft/database_test.go @@ -1,6 +1,6 @@ // // (C) Copyright 2020-2024 Intel Corporation. -// (C) Copyright 2025 Hewlett Packard Enterprise Development LP +// (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -40,16 +40,22 @@ import ( func waitForLeadership(ctx context.Context, t *testing.T, db *Database, gained bool) { t.Helper() + + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + for { + if db.IsLeader() == gained { + return + } select { case <-ctx.Done(): - t.Fatal(ctx.Err()) + t.Fatalf("timed out waiting for leadership gained=%t: %s", gained, ctx.Err()) return - default: - if db.IsLeader() == gained { - return - } - time.Sleep(1 * time.Second) + case <-ticker.C: } } } @@ -131,10 +137,9 @@ func TestSystem_Database_LeadershipCallbacks(t *testing.T) { db, cleanup := TestDatabase(t, log, localhost) defer cleanup() - if err := db.Start(dbCtx); err != nil { - t.Fatal(err) - } + // Register callbacks before Start() so the monitor goroutine + // cannot race ahead and iterate an empty slice under load. var onGainedCalled, onLostCalled uint32 db.OnLeadershipGained(func(_ context.Context) error { atomic.StoreUint32(&onGainedCalled, 1) @@ -145,6 +150,10 @@ func TestSystem_Database_LeadershipCallbacks(t *testing.T) { return nil }) + if err := db.Start(dbCtx); err != nil { + t.Fatal(err) + } + waitForLeadership(ctx, t, db, true) dbCancel() waitForLeadership(ctx, t, db, false) diff --git a/src/control/system/raft/raft.go b/src/control/system/raft/raft.go index bb1911c5ea5..f26fe182acf 100644 --- a/src/control/system/raft/raft.go +++ b/src/control/system/raft/raft.go @@ -180,7 +180,7 @@ func (db *Database) ShutdownRaft() error { // run as many of them as possible in order to clean things // up. if shutdownErr == nil { - for _, cb := range db.onRaftShutdown { + for _, cb := range db.onRaftShutdownCbs() { if cbErr := cb(); cbErr != nil { db.log.Errorf("onRaftShutdown callback failed: %s", cbErr) }