Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation),
logger,
nil,
nil,
nil, // sequencerVerifier
nil, // sequencerSigner
nil, // ha: no HA in default node
)
}

Expand Down Expand Up @@ -250,6 +251,8 @@ type Node struct {
// Sequencer mode (after upgrade)
stateV2 *sequencer.StateV2
blockBroadcastReactor *sequencer.BlockBroadcastReactor
sigStore *sequencer.SignatureStore
ha sequencer.SequencerHA
}

func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, sigStore *sequencer.SignatureStore, err error) {
Expand Down Expand Up @@ -523,7 +526,6 @@ func createSequencerComponents(
// Create StateV2
stateV2, err := sequencer.NewStateV2(
l2Node,
sequencer.DefaultBlockInterval,
logger,
verifier,
signer,
Expand Down Expand Up @@ -798,6 +800,7 @@ func NewNode(
logger log.Logger,
sequencerVerifier sequencer.SequencerVerifier,
sequencerSigner sequencer.Signer,
ha sequencer.SequencerHA,
options ...Option,
) (
*Node, error,
Expand Down Expand Up @@ -1014,6 +1017,8 @@ func NewNode(
indexerService: indexerService,
blockIndexer: blockIndexer,
eventBus: eventBus,
sigStore: sigStore,
ha: ha,
}
node.BaseService = *service.NewBaseService(logger, "Node", node)

Expand All @@ -1030,11 +1035,16 @@ func NewNode(
sequencerVerifier,
sequencerSigner,
sigStore,
nil, // ha: nil for now, Raft HA will be injected in a future milestone
ha, // HA service injected from NewNode caller; nil disables HA mode
); err != nil {
return nil, err
}

// Wire HA FSM callback: ApplyBlock handles geth apply + SaveSignature.
if ha != nil {
ha.SetOnBlockApplied(node.stateV2.ApplyBlock)
}

// Set stateV2&verifier&sigStore on blocksync reactor for post-upgrade
bcR.SetStateV2(node.stateV2)
bcR.SetVerifier(sequencerVerifier)
Expand Down
3 changes: 3 additions & 0 deletions rpc/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node {
nm.DefaultDBProvider,
nm.DefaultMetricsProvider(config.Instrumentation),
logger,
nil, // sequencerVerifier
nil, // sequencerSigner
nil, // ha: no HA in RPC test node
)
if err != nil {
panic(err)
Expand Down
13 changes: 4 additions & 9 deletions sequencer/block_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,17 +239,12 @@ func TestBlockRingBuffer_RollbackTo_All(t *testing.T) {
rb.Add(makeBlock(i, byte(i)))
}

// Rollback to before first block
// Rollback to before first block — removes all blocks with h > 99
rb.RollbackTo(99)

// All blocks removed, but minHeight stays at 100
// (rollbackTo only removes > targetHeight)
assert.Equal(t, 1, rb.Count()) // Block 100 remains (not > 99, it's == 100)

// Actually let's re-read the logic... rollbackTo removes h > targetHeight
// So rollbackTo(99) removes 100, 101, 102, 103, 104
// Wait, the loop is: for h := rb.maxHeight; h > targetHeight; h--
// So it removes 104, 103, 102, 101, 100 (all > 99)
// rollbackTo loop: for h := rb.maxHeight; h > targetHeight; h--
// removes 104, 103, 102, 101, 100 (all > 99)
assert.Equal(t, 0, rb.Count())
}

func TestBlockRingBuffer_RollbackTo_All_Correct(t *testing.T) {
Expand Down
14 changes: 5 additions & 9 deletions sequencer/broadcast_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,13 @@ func (r *BlockBroadcastReactor) broadcastRoutine() {
case <-r.Quit():
return
case block := <-source:
// HA Follower: consume channel but do not broadcast.
// When promoted to Leader, the next block automatically starts broadcasting.
// Always populate recentBlocks so the cache is warm if this follower
// is promoted to leader and starts serving block requests from fullnodes.
r.recentBlocks.Add(block)
// HA Follower: do not broadcast, only drain the channel.
if r.stateV2.IsHAMode() && !r.stateV2.IsHALeader() {
continue
}
r.recentBlocks.Add(block)
r.broadcast(block)
}
}
Expand Down Expand Up @@ -478,16 +479,11 @@ func (r *BlockBroadcastReactor) applyBlock(block *BlockV2) error {
return fmt.Errorf("parent mismatch")
}

// Update state via stateV2 (unified entry point)
// ApplyBlock handles geth apply + SaveSignature in one call.
if err := r.stateV2.ApplyBlock(block); err != nil {
return err
}

// Persist signature for historical block serving
if err := r.sigStore.SaveSignature(block.Hash, block.Signature); err != nil {
panic(fmt.Sprintf("failed to save signature at height %d: %v", block.Number, err))
}

// Add to recent blocks
r.recentBlocks.Add(block)

Expand Down
16 changes: 16 additions & 0 deletions sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ type Signer interface {
// SequencerHA is the abstraction for Raft HA cluster.
// In single-node mode, ha == nil and all HA-related logic is skipped.
type SequencerHA interface {
// Start initializes and starts the Raft service.
// leader (bootstrap=true): starts as a single-node cluster, immediately becomes Leader.
// follower (bootstrap=false): initializes Raft, then asynchronously retries Join() in the
// background until it successfully joins the cluster or the service is stopped.
// Called by StateV2.OnStart() when the upgrade height is reached.
Start() error

// Stop gracefully shuts down the Raft service.
// Called by StateV2.OnStop() when the upgrade height has been passed.
Stop()

// IsLeader returns whether the current node is the Raft leader (sole block producer).
IsLeader() bool

Expand All @@ -52,4 +63,9 @@ type SequencerHA interface {
// Subscribe returns a channel that delivers blocks after Raft commit.
// Both leader and follower subscribe; used by broadcastRoutine for P2P broadcast.
Subscribe() <-chan *BlockV2

// SetOnBlockApplied registers the callback invoked by the Raft FSM on every
// committed log entry. The callback should execute ApplyBlock + SaveSignature.
// Must be called before Start().
SetOnBlockApplied(fn func(*BlockV2) error)
}
Loading
Loading