@@ -157,20 +157,20 @@ func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[strin
157157 return nil , err
158158 }
159159
160- // Delete files below the prune anchor that were filtered out by
161- // loadPersistedState. Also reset the CommitQC persister's cursor to
162- // match the post-prune range.
163- laneFirsts := make (map [types.LaneID ]types.BlockNumber , len (inner .blocks ))
164- for lane , q := range inner .blocks {
165- laneFirsts [lane ] = q .first
166- }
167- if err := pers .blocks .DeleteBefore (laneFirsts ); err != nil {
168- return nil , fmt .Errorf ("prune stale block files: %w" , err )
169- }
170- if err := pers .commitQCs .DeleteBefore (inner .commitQCs .first ); err != nil {
171- return nil , fmt .Errorf ("prune stale commitQC files: %w" , err )
160+ // Truncate WAL entries below the prune anchor that were filtered out by
161+ // loadPersistedState.
162+ if ls , ok := loaded .Get (); ok {
163+ if anchor , ok := ls .pruneAnchor .Get (); ok {
164+ for _ , lane := range data .Committee ().Lanes ().All () {
165+ if err := pers .blocks .MaybePruneAndPersistLane (lane , utils .Some (anchor .CommitQC ), nil , utils .None [func (* types.Signed [* types.LaneProposal ])]()); err != nil {
166+ return nil , fmt .Errorf ("prune stale block WAL entries: %w" , err )
167+ }
168+ }
169+ if err := pers .commitQCs .MaybePruneAndPersist (utils .Some (anchor .CommitQC ), nil , utils .None [func (* types.CommitQC )]()); err != nil {
170+ return nil , fmt .Errorf ("prune stale commitQC WAL entries: %w" , err )
171+ }
172+ }
172173 }
173- pers .commitQCs .ResetNext (inner .commitQCs .next )
174174
175175 return & State {
176176 key : key ,
@@ -588,6 +588,12 @@ func (s *State) produceBlock(ctx context.Context, key types.SecretKey, payload *
588588}
589589
590590// Run runs the background tasks of the state.
591+ //
592+ // Goroutines: this method spawns long-lived goroutines via scope.SpawnNamed
593+ // (the persist loop and the FullCommitQC→data-state pusher). Inside
594+ // runPersist, scope.Parallel spawns short-lived goroutines for concurrent
595+ // per-lane block and commit-QC persistence. The persist package itself does
596+ // not spawn goroutines.
591597func (s * State ) Run (ctx context.Context ) error {
592598 return scope .Run (ctx , func (ctx context.Context , scope scope.Scope ) error {
593599 scope .SpawnNamed ("persist" , func () error {
@@ -631,18 +637,15 @@ func (s *State) Run(ctx context.Context) error {
631637
632638// runPersist is the main loop for the persist goroutine.
633639// Write order:
634- // 1. Prune anchor (AppQC + CommitQC pair) — the crash-recovery watermark.
635- // 2. CommitQCs in order, then publish LastCommitQC immediately
636- // so consensus can advance without waiting for block writes.
637- // 3. Blocks per lane in order, markBlockPersisted after each.
638- // 4. Prune old blocks and CommitQCs.
640+ // 1. Prune anchor (AppQC + CommitQC pair) — the crash-recovery watermark (sequential).
641+ // 2. commitQCs.MaybePruneAndPersist and each lane's blocks.MaybePruneAndPersistLane run
642+ // concurrently via scope.Parallel (separate WALs, no early cancellation; first error
643+ // is returned after all tasks finish).
644+ // Each path publishes (markCommitQCsPersisted / markBlockPersisted) per entry so voting
645+ // unblocks ASAP.
639646//
640647// The prune anchor is a pruning watermark: on restart we resume from it.
641648//
642- // Blocks are persisted one at a time with inner.nextBlockToPersist
643- // updated after each write, so vote latency equals single-block write
644- // time regardless of batch size.
645- //
646649// TODO: use a single WAL for anchor and CommitQCs to make
647650// this atomic rather than relying on write order.
648651func (s * State ) runPersist (ctx context.Context , pers persisters ) error {
@@ -653,52 +656,56 @@ func (s *State) runPersist(ctx context.Context, pers persisters) error {
653656 return err
654657 }
655658
659+ // Prune CommitQC anchor: same Option drives commit-QC WAL and per-lane block WAL
660+ // (truncate-then-append below this QC).
661+ var anchorQC utils.Option [* types.CommitQC ]
656662 // 1. Persist prune anchor first — establishes the crash-recovery watermark.
657663 if anchor , ok := batch .pruneAnchor .Get (); ok {
658664 if err := pers .pruneAnchor .Persist (PruneAnchorConv .Encode (anchor )); err != nil {
659665 return fmt .Errorf ("persist prune anchor: %w" , err )
660666 }
661667 s .advancePersistedBlockStart (anchor .CommitQC )
662668 lastPersistedAppQCNext = anchor .CommitQC .Proposal ().Index () + 1
669+ anchorQC = utils .Some (anchor .CommitQC )
663670 }
664671
665- // 2. Persist new CommitQCs, then publish immediately so consensus
666- // can advance without waiting for block writes or pruning.
667- for _ , qc := range batch .commitQCs {
668- if err := pers .commitQCs .PersistCommitQC (qc ); err != nil {
669- return fmt .Errorf ("persist commitqc %d: %w" , qc .Index (), err )
670- }
671- }
672- if len (batch .commitQCs ) > 0 {
673- s .markCommitQCsPersisted (batch .commitQCs [len (batch .commitQCs )- 1 ])
672+ markBlock := func (p * types.Signed [* types.LaneProposal ]) {
673+ header := p .Msg ().Block ().Header ()
674+ s .markBlockPersisted (header .Lane (), header .BlockNumber ()+ 1 )
674675 }
675676
676- // 3. Persist blocks (mark each individually for vote latency).
677+ blocksByLane := make ( map [types. LaneID ][] * types. Signed [ * types. LaneProposal ], s . data . Committee (). Lanes (). Len ())
677678 for _ , proposal := range batch .blocks {
678- h := proposal .Msg ().Block ().Header ()
679- if err := pers .blocks .PersistBlock (proposal ); err != nil {
680- return fmt .Errorf ("persist block %s/%d: %w" , h .Lane (), h .BlockNumber (), err )
679+ lane := proposal .Msg ().Block ().Header ().Lane ()
680+ blocksByLane [lane ] = append (blocksByLane [lane ], proposal )
681+ }
682+
683+ // 2. Persist commit-QCs and per-lane blocks in parallel.
684+ // Callees handle empty inputs gracefully (no-op when nothing to write/truncate).
685+ if err := scope .Parallel (func (ps scope.ParallelScope ) error {
686+ ps .Spawn (func () error {
687+ return pers .commitQCs .MaybePruneAndPersist (anchorQC , batch .commitQCs , utils .Some (func (qc * types.CommitQC ) {
688+ s .markCommitQCsPersisted (qc )
689+ }))
690+ })
691+ for _ , lane := range s .data .Committee ().Lanes ().All () {
692+ proposals := blocksByLane [lane ]
693+ ps .Spawn (func () error {
694+ return pers .blocks .MaybePruneAndPersistLane (lane , anchorQC , proposals , utils .Some (markBlock ))
695+ })
681696 }
682- s .markBlockPersisted (h .Lane (), h .BlockNumber ()+ 1 )
683- }
684-
685- // 4. Prune old data.
686- if err := pers .blocks .DeleteBefore (batch .laneFirsts ); err != nil {
687- return fmt .Errorf ("block deleteBefore: %w" , err )
688- }
689- if err := pers .commitQCs .DeleteBefore (batch .commitQCFirst ); err != nil {
690- return fmt .Errorf ("commitqc deleteBefore: %w" , err )
697+ return nil
698+ }); err != nil {
699+ return err
691700 }
692701 }
693702}
694703
695704// persistBatch holds the data collected under lock for one persist iteration.
696705type persistBatch struct {
697- blocks []* types.Signed [* types.LaneProposal ]
698- commitQCs []* types.CommitQC
699- pruneAnchor utils.Option [* PruneAnchor ]
700- laneFirsts map [types.LaneID ]types.BlockNumber
701- commitQCFirst types.RoadIndex
706+ blocks []* types.Signed [* types.LaneProposal ]
707+ commitQCs []* types.CommitQC
708+ pruneAnchor utils.Option [* PruneAnchor ]
702709}
703710
704711// advancePersistedBlockStart updates the per-lane block admission watermark
@@ -717,8 +724,9 @@ func (s *State) advancePersistedBlockStart(commitQC *types.CommitQC) {
717724}
718725
719726// markBlockPersisted advances the per-lane block persistence cursor.
720- // Called after each individual block write so that RecvBatch (and therefore
721- // voting) unblocks with single-block latency regardless of batch size.
727+ // Called after each block is persisted so that RecvBatch (and therefore
728+ // voting) can unblock as soon as the block is durable. Safe for concurrent
729+ // callers (acquires s.inner lock internally).
722730func (s * State ) markBlockPersisted (lane types.LaneID , next types.BlockNumber ) {
723731 for inner , ctrl := range s .inner .Lock () {
724732 inner .nextBlockToPersist [lane ] = next
@@ -759,16 +767,13 @@ func (s *State) collectPersistBatch(ctx context.Context, lastPersistedAppQCNext
759767 }); err != nil {
760768 return b , err
761769 }
762- b .laneFirsts = make (map [types.LaneID ]types.BlockNumber , len (inner .blocks ))
763770 for lane , q := range inner .blocks {
764771 start := max (inner .nextBlockToPersist [lane ], q .first )
765772 for n := start ; n < q .next ; n ++ {
766773 b .blocks = append (b .blocks , q .q [n ])
767774 }
768- b .laneFirsts [lane ] = q .first
769775 }
770776 commitQCNext = max (commitQCNext , inner .commitQCs .first )
771- b .commitQCFirst = inner .commitQCs .first
772777 for n := commitQCNext ; n < inner .commitQCs .next ; n ++ {
773778 b .commitQCs = append (b .commitQCs , inner .commitQCs .q [n ])
774779 }
0 commit comments