Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions sei-tendermint/internal/autobahn/avail/inner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import (
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
)

// TODO: when dynamic committee changes are supported, newly joined members
// must be added to blocks, votes, nextBlockToPersist, and persistedBlockStart.
// Currently all four are initialized once in newInner from c.Lanes().All().
// BlockPersister creates lane WALs lazily inside MaybePruneAndPersistLane, but the new
// member must also appear in inner.blocks before the next persist cycle.
type inner struct {
latestAppQC utils.Option[*types.AppQC]
latestCommitQC utils.AtomicSend[utils.Option[*types.CommitQC]]
Expand All @@ -23,8 +28,10 @@ type inner struct {
// reconstructed from the blocks already on disk (see newInner).
//
// TODO: consider giving this its own AtomicSend to avoid waking unrelated
// inner waiters (PushVote, PushCommitQC, etc.) on every markBlockPersisted
// call. Only RecvBatch needs to be notified of cursor changes;
// inner waiters (PushVote, PushCommitQC, etc.) on markBlockPersisted calls.
// Now that blocks are persisted concurrently by lane (one notification per
// lane per batch, not per block), the frequency is lower, but still not
// ideal. Only RecvBatch needs to be notified of cursor changes;
// collectPersistBatch is in the same goroutine and reads it directly.
nextBlockToPersist map[types.LaneID]types.BlockNumber

Expand Down
113 changes: 59 additions & 54 deletions sei-tendermint/internal/autobahn/avail/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,20 @@ func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[strin
return nil, err
}

// Delete files below the prune anchor that were filtered out by
// loadPersistedState. Also reset the CommitQC persister's cursor to
// match the post-prune range.
laneFirsts := make(map[types.LaneID]types.BlockNumber, len(inner.blocks))
for lane, q := range inner.blocks {
laneFirsts[lane] = q.first
}
if err := pers.blocks.DeleteBefore(laneFirsts); err != nil {
return nil, fmt.Errorf("prune stale block files: %w", err)
}
if err := pers.commitQCs.DeleteBefore(inner.commitQCs.first); err != nil {
return nil, fmt.Errorf("prune stale commitQC files: %w", err)
// Truncate WAL entries below the prune anchor that were filtered out by
// loadPersistedState.
if ls, ok := loaded.Get(); ok {
if anchor, ok := ls.pruneAnchor.Get(); ok {
for _, lane := range data.Committee().Lanes().All() {
if err := pers.blocks.MaybePruneAndPersistLane(lane, utils.Some(anchor.CommitQC), nil, utils.None[func(*types.Signed[*types.LaneProposal])]()); err != nil {
return nil, fmt.Errorf("prune stale block WAL entries: %w", err)
}
}
if err := pers.commitQCs.MaybePruneAndPersist(utils.Some(anchor.CommitQC), nil, utils.None[func(*types.CommitQC)]()); err != nil {
return nil, fmt.Errorf("prune stale commitQC WAL entries: %w", err)
}
}
}
pers.commitQCs.ResetNext(inner.commitQCs.next)

return &State{
key: key,
Expand Down Expand Up @@ -588,6 +588,12 @@ func (s *State) produceBlock(ctx context.Context, key types.SecretKey, payload *
}

// Run runs the background tasks of the state.
//
// Goroutines: this method spawns long-lived goroutines via scope.SpawnNamed
// (the persist loop and the FullCommitQC→data-state pusher). Inside
// runPersist, scope.Parallel spawns short-lived goroutines for concurrent
// per-lane block and commit-QC persistence. The persist package itself does
// not spawn goroutines.
func (s *State) Run(ctx context.Context) error {
return scope.Run(ctx, func(ctx context.Context, scope scope.Scope) error {
scope.SpawnNamed("persist", func() error {
Expand Down Expand Up @@ -631,18 +637,15 @@ func (s *State) Run(ctx context.Context) error {

// runPersist is the main loop for the persist goroutine.
// Write order:
// 1. Prune anchor (AppQC + CommitQC pair) — the crash-recovery watermark.
// 2. CommitQCs in order, then publish LastCommitQC immediately
// so consensus can advance without waiting for block writes.
// 3. Blocks per lane in order, markBlockPersisted after each.
// 4. Prune old blocks and CommitQCs.
// 1. Prune anchor (AppQC + CommitQC pair) — the crash-recovery watermark (sequential).
// 2. commitQCs.MaybePruneAndPersist and each lane's blocks.MaybePruneAndPersistLane run
// concurrently via scope.Parallel (separate WALs, no early cancellation; first error
// is returned after all tasks finish).
// Each path publishes (markCommitQCsPersisted / markBlockPersisted) per entry so voting
// unblocks ASAP.
//
// The prune anchor is a pruning watermark: on restart we resume from it.
//
// Blocks are persisted one at a time with inner.nextBlockToPersist
// updated after each write, so vote latency equals single-block write
// time regardless of batch size.
//
// TODO: use a single WAL for anchor and CommitQCs to make
// this atomic rather than relying on write order.
func (s *State) runPersist(ctx context.Context, pers persisters) error {
Expand All @@ -653,52 +656,56 @@ func (s *State) runPersist(ctx context.Context, pers persisters) error {
return err
}

// Prune CommitQC anchor: same Option drives commit-QC WAL and per-lane block WAL
// (truncate-then-append below this QC).
var anchorQC utils.Option[*types.CommitQC]
// 1. Persist prune anchor first — establishes the crash-recovery watermark.
if anchor, ok := batch.pruneAnchor.Get(); ok {
if err := pers.pruneAnchor.Persist(PruneAnchorConv.Encode(anchor)); err != nil {
return fmt.Errorf("persist prune anchor: %w", err)
}
s.advancePersistedBlockStart(anchor.CommitQC)
lastPersistedAppQCNext = anchor.CommitQC.Proposal().Index() + 1
anchorQC = utils.Some(anchor.CommitQC)
}

// 2. Persist new CommitQCs, then publish immediately so consensus
// can advance without waiting for block writes or pruning.
for _, qc := range batch.commitQCs {
if err := pers.commitQCs.PersistCommitQC(qc); err != nil {
return fmt.Errorf("persist commitqc %d: %w", qc.Index(), err)
}
}
if len(batch.commitQCs) > 0 {
s.markCommitQCsPersisted(batch.commitQCs[len(batch.commitQCs)-1])
markBlock := func(p *types.Signed[*types.LaneProposal]) {
header := p.Msg().Block().Header()
s.markBlockPersisted(header.Lane(), header.BlockNumber()+1)
}

// 3. Persist blocks (mark each individually for vote latency).
blocksByLane := make(map[types.LaneID][]*types.Signed[*types.LaneProposal], s.data.Committee().Lanes().Len())
for _, proposal := range batch.blocks {
h := proposal.Msg().Block().Header()
if err := pers.blocks.PersistBlock(proposal); err != nil {
return fmt.Errorf("persist block %s/%d: %w", h.Lane(), h.BlockNumber(), err)
lane := proposal.Msg().Block().Header().Lane()
blocksByLane[lane] = append(blocksByLane[lane], proposal)
}

// 2. Persist commit-QCs and per-lane blocks in parallel.
// Callees handle empty inputs gracefully (no-op when nothing to write/truncate).
if err := scope.Parallel(func(ps scope.ParallelScope) error {
ps.Spawn(func() error {
return pers.commitQCs.MaybePruneAndPersist(anchorQC, batch.commitQCs, utils.Some(func(qc *types.CommitQC) {
s.markCommitQCsPersisted(qc)
}))
})
for _, lane := range s.data.Committee().Lanes().All() {
proposals := blocksByLane[lane]
ps.Spawn(func() error {
return pers.blocks.MaybePruneAndPersistLane(lane, anchorQC, proposals, utils.Some(markBlock))
})
}
s.markBlockPersisted(h.Lane(), h.BlockNumber()+1)
}

// 4. Prune old data.
if err := pers.blocks.DeleteBefore(batch.laneFirsts); err != nil {
return fmt.Errorf("block deleteBefore: %w", err)
}
if err := pers.commitQCs.DeleteBefore(batch.commitQCFirst); err != nil {
return fmt.Errorf("commitqc deleteBefore: %w", err)
return nil
}); err != nil {
return err
}
}
}

// persistBatch holds the data collected under lock for one persist iteration.
type persistBatch struct {
blocks []*types.Signed[*types.LaneProposal]
commitQCs []*types.CommitQC
pruneAnchor utils.Option[*PruneAnchor]
laneFirsts map[types.LaneID]types.BlockNumber
commitQCFirst types.RoadIndex
blocks []*types.Signed[*types.LaneProposal]
commitQCs []*types.CommitQC
pruneAnchor utils.Option[*PruneAnchor]
}

// advancePersistedBlockStart updates the per-lane block admission watermark
Expand All @@ -717,8 +724,9 @@ func (s *State) advancePersistedBlockStart(commitQC *types.CommitQC) {
}

// markBlockPersisted advances the per-lane block persistence cursor.
// Called after each individual block write so that RecvBatch (and therefore
// voting) unblocks with single-block latency regardless of batch size.
// Called after each block is persisted so that RecvBatch (and therefore
// voting) can unblock as soon as the block is durable. Safe for concurrent
// callers (acquires s.inner lock internally).
func (s *State) markBlockPersisted(lane types.LaneID, next types.BlockNumber) {
for inner, ctrl := range s.inner.Lock() {
inner.nextBlockToPersist[lane] = next
Expand Down Expand Up @@ -759,16 +767,13 @@ func (s *State) collectPersistBatch(ctx context.Context, lastPersistedAppQCNext
}); err != nil {
return b, err
}
b.laneFirsts = make(map[types.LaneID]types.BlockNumber, len(inner.blocks))
for lane, q := range inner.blocks {
start := max(inner.nextBlockToPersist[lane], q.first)
for n := start; n < q.next; n++ {
b.blocks = append(b.blocks, q.q[n])
}
b.laneFirsts[lane] = q.first
}
commitQCNext = max(commitQCNext, inner.commitQCs.first)
b.commitQCFirst = inner.commitQCs.first
for n := commitQCNext; n < inner.commitQCs.next; n++ {
b.commitQCs = append(b.commitQCs, inner.commitQCs.q[n])
}
Expand Down
Loading
Loading