Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions sei-tendermint/internal/autobahn/autobahn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,6 @@ message PersistedInner {
optional FullTimeoutVote timeout_vote = 6;
}

// Wrapper for persisted data with sequence number.
message PersistedWrapper {
optional uint64 seq = 1;
optional bytes data = 2;
}

// Persisted availability prune anchor (AppQC + matching CommitQC pair).
// Stored atomically in an A/B file; used as the crash-recovery pruning watermark.
message PersistedAvailPruneAnchor {
Expand Down
19 changes: 10 additions & 9 deletions sei-tendermint/internal/autobahn/avail/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"google.golang.org/protobuf/proto"

"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus/persist"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/data"
pb "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/pb"
Expand Down Expand Up @@ -759,14 +759,15 @@ func TestNewStateWithPersistence(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())

// Write a valid PersistedWrapper whose Data payload is garbage.
// This simulates corruption at the application data level while
// keeping the outer A/B wrapper intact.
seq := uint64(1)
wrapper := &pb.PersistedWrapper{Seq: &seq, Data: []byte("not a valid protobuf")}
bz, err := proto.Marshal(wrapper)
// Create a throwaway persister to discover the A/B filenames,
// then corrupt them so NewState fails on load.
_, _, err := persist.NewPersister[*pb.PersistedAvailPruneAnchor](utils.Some(dir), innerFile)
require.NoError(t, err)
entries, err := os.ReadDir(dir)
require.NoError(t, err)
require.NoError(t, persist.WriteRawFile(dir, innerFile, bz))
for _, e := range entries {
require.NoError(t, os.WriteFile(filepath.Join(dir, e.Name()), []byte("corrupt"), 0600))
}

_, err = NewState(keys[0], ds, utils.Some(dir))
require.Error(t, err)
Expand Down
150 changes: 85 additions & 65 deletions sei-tendermint/internal/autobahn/consensus/persist/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@
package persist

import (
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"os"
"path/filepath"

"google.golang.org/protobuf/proto"

"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/pb"
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/protoutils"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
)
Expand All @@ -52,11 +51,13 @@ const (
suffixB = "_b.pb"
)

// WriteRawFile writes raw bytes to the A file for a given prefix.
// Intended for tests that need to simulate corruption from outside the package.
func WriteRawFile(dir, prefix string, data []byte) error {
return os.WriteFile(filepath.Join(dir, prefix+suffixA), data, 0600)
}
var crc32c = crc32.MakeTable(crc32.Castagnoli)

const (
crcSize = 4 // CRC32-C prefix length
seqSize = 8 // uint64 little-endian
headerSize = crcSize + seqSize // file header: [4-byte CRC32-C BE][8-byte seq LE]
)

// ErrNoData is returned by loadPersisted when no persisted files exist for the prefix.
var ErrNoData = errors.New("no persisted data")
Expand All @@ -82,7 +83,7 @@ func newNoOpPersister[T protoutils.Message]() Persister[T] {
}

// abPersister writes data to A/B files with automatic seq management.
// Uses PersistedWrapper protobuf for crash-safe persistence.
// File format: [4-byte CRC32-C BE] [8-byte seq LE] [proto-marshalled message].
// Only created when config has a state dir; dir is always a valid path.
// File selection is derived from seq: odd seq → A, even seq → B.
type abPersister[T protoutils.Message] struct {
Expand Down Expand Up @@ -121,13 +122,13 @@ func NewPersister[T protoutils.Message](dir utils.Option[string], prefix string)
_ = probe.Close()
_ = os.Remove(probe.Name())

wrapper, err := loadPersisted(d, prefix)
seq, data, err := loadPersisted(d, prefix)
if err != nil && !errors.Is(err, ErrNoData) {
return nil, none, err
}

// Ensure both A/B files exist and are writable so Persist never creates new
// directory entries. Empty files are treated as non-existent by loadWrapped,
// directory entries. Empty files are treated as non-existent by loadFile,
// so they won't interfere with loading on restart.
for _, suffix := range []string{suffixA, suffixB} {
path := filepath.Join(d, prefix+suffix)
Expand All @@ -147,10 +148,9 @@ func NewPersister[T protoutils.Message](dir utils.Option[string], prefix string)
_ = df.Close()
}

// wrapper is nil on fresh start (ErrNoData); protobuf Get methods return zero values for nil.
var loaded utils.Option[T]
if bz := wrapper.GetData(); bz != nil {
msg, err := protoutils.Unmarshal[T](bz)
if data != nil {
msg, err := protoutils.Unmarshal[T](data)
if err != nil {
return nil, none, fmt.Errorf("unmarshal persisted %s: %w", prefix, err)
}
Expand All @@ -159,13 +159,13 @@ func NewPersister[T protoutils.Message](dir utils.Option[string], prefix string)
return &abPersister[T]{
dir: d,
prefix: prefix,
seq: wrapper.GetSeq(),
seq: seq,
}, loaded, nil
}

// Persist writes a proto message to persistent storage with seq wrapper.
// Not safe for concurrent use.
// Returns error on marshal or write failure.
// Persist writes a proto message to persistent storage.
// File format: [4-byte CRC32-C BE] [8-byte seq LE] [proto bytes].
// CRC covers the seq + proto bytes. Not safe for concurrent use.
func (w *abPersister[T]) Persist(msg T) error {
data := protoutils.Marshal(msg)
seq := w.seq + 1
Expand All @@ -175,61 +175,66 @@ func (w *abPersister[T]) Persist(msg T) error {
if seq%2 == 1 {
suffix = suffixA
}
filename := w.prefix + suffix

wrapper := &pb.PersistedWrapper{
Seq: &seq,
Data: data,
}
bz, err := proto.Marshal(wrapper)
if err != nil {
return fmt.Errorf("marshal wrapper: %w", err)
}
// Compute CRC over [seq LE || proto bytes] without an extra copy.
var seqBuf [seqSize]byte
binary.LittleEndian.PutUint64(seqBuf[:], seq)
// hash.Hash.Write never returns an error.
h := crc32.New(crc32c)
_, _ = h.Write(seqBuf[:])
_, _ = h.Write(data)

if err := writeAndSync(filepath.Join(w.dir, filename), bz); err != nil {
filename := w.prefix + suffix
if err := writeFile(filepath.Join(w.dir, filename), h.Sum32(), seqBuf[:], data); err != nil {
return fmt.Errorf("persist to %s: %w", filename, err)
}
w.seq = seq
return nil
}

// loadWrapped loads a wrapped file, returning the PersistedWrapper proto.
// Returns os.ErrNotExist when the file does not exist (caller can use errors.Is).
// Returns other error on read or unmarshal failure. loadPersisted calls loadWrapped
// for both A and B and only fails when both fail; one corrupt file is tolerated.
// stateDir must be an existing directory (we do not create it).
func loadWrapped(stateDir, filename string) (*pb.PersistedWrapper, error) {
// loadFile reads an A/B file and returns (seq, proto data, error).
// Returns os.ErrNotExist when the file does not exist.
// Returns ErrCorrupt on CRC mismatch or truncated header.
// OS-level errors (permission denied, I/O) are returned unwrapped.
func loadFile(stateDir, filename string) (uint64, []byte, error) {
path := filepath.Join(stateDir, filename)
bz, err := os.ReadFile(path) //nolint:gosec // path is constructed from operator-configured stateDir + hardcoded filename suffix; no user-controlled input
if errors.Is(err, os.ErrNotExist) {
return nil, os.ErrNotExist
return 0, nil, os.ErrNotExist
}
if err != nil {
// OS-level read error (permission denied, I/O error, etc.) —
// not wrapped with ErrCorrupt so loadPersisted propagates it.
return nil, fmt.Errorf("read %s: %w", filename, err)
return 0, nil, fmt.Errorf("read %s: %w", filename, err)
}
// Treat empty files as non-existent. A valid wrapper must contain at least
// a seq number. Empty files are created by NewPersister to pre-populate
// directory entries so that Persist never needs to dir-sync.
// Empty files are created by NewPersister to pre-populate directory entries.
if len(bz) == 0 {
return nil, os.ErrNotExist
return 0, nil, os.ErrNotExist
}
if len(bz) < headerSize {
return 0, nil, fmt.Errorf("%s: truncated (len %d < header %d): %w", filename, len(bz), headerSize, ErrCorrupt)
}
var wrapper pb.PersistedWrapper
if err := proto.Unmarshal(bz, &wrapper); err != nil {
return nil, fmt.Errorf("unmarshal %s: %w", filename, fmt.Errorf("%v: %w", err, ErrCorrupt))

wantCRC := binary.BigEndian.Uint32(bz[:crcSize])
payload := bz[crcSize:]
if got := crc32.Checksum(payload, crc32c); got != wantCRC {
return 0, nil, fmt.Errorf("%s: crc32 mismatch (got %08x, want %08x): %w", filename, got, wantCRC, ErrCorrupt)
}

seq := binary.LittleEndian.Uint64(payload[:seqSize])
if seq == 0 {
return 0, nil, fmt.Errorf("%s: zero seq: %w", filename, ErrCorrupt)
}
return &wrapper, nil
data := payload[seqSize:]
return seq, data, nil
}

// loadPersisted loads persisted data for the given directory and prefix.
// Tries both A and B files; if one is corrupt (e.g. crash during write), the other is used
// so the validator can restart. Returns ErrNoData when no persisted files exist (use errors.Is).
// Returns other error only when both files fail to load or state is inconsistent (same seq).
func loadPersisted(dir string, prefix string) (*pb.PersistedWrapper, error) {
func loadPersisted(dir string, prefix string) (uint64, []byte, error) {
fileA, fileB := prefix+suffixA, prefix+suffixB
wrapperA, errA := loadWrapped(dir, fileA)
wrapperB, errB := loadWrapped(dir, fileB)
seqA, dataA, errA := loadFile(dir, fileA)
seqB, dataB, errB := loadFile(dir, fileB)

// Fail fast on OS-level errors (permission denied, I/O errors).
// Only ErrNotExist (fresh start) and ErrCorrupt (crash mid-write) are tolerable.
Expand All @@ -247,45 +252,60 @@ func loadPersisted(dir string, prefix string) (*pb.PersistedWrapper, error) {
logger.Warn("corrupt state file", "file", fe.file, "err", fe.err)
continue
}
return nil, fmt.Errorf("load %s: %w", fe.file, fe.err)
return 0, nil, fmt.Errorf("load %s: %w", fe.file, fe.err)
}

switch {
case errA == nil && errB == nil:
switch {
case wrapperA.GetSeq() > wrapperB.GetSeq():
return wrapperA, nil
case wrapperB.GetSeq() > wrapperA.GetSeq():
return wrapperB, nil
case seqA > seqB:
return seqA, dataA, nil
case seqB > seqA:
return seqB, dataB, nil
default:
return nil, fmt.Errorf("corrupt state: both %s and %s have same seq; remove %s if acceptable", fileA, fileB, fileB)
return 0, nil, fmt.Errorf("corrupt state: both %s and %s have same seq; remove %s if acceptable", fileA, fileB, fileB)
}
case errA == nil:
return wrapperA, nil
return seqA, dataA, nil
case errB == nil:
return wrapperB, nil
return seqB, dataB, nil
default:
if errors.Is(errA, os.ErrNotExist) && errors.Is(errB, os.ErrNotExist) {
return nil, ErrNoData
return 0, nil, ErrNoData
}
return nil, fmt.Errorf("no valid state: %s: %v; %s: %v", fileA, errA, fileB, errB)
return 0, nil, fmt.Errorf("no valid state: %s: %v; %s: %v", fileA, errA, fileB, errB)
}
}

// writeAndSync writes data to a file path and fsyncs. No dir sync needed because
// NewPersister pre-creates both A/B files at startup.
// writeAndSync atomically replaces path contents with data (O_TRUNC) and fsyncs.
// Used by WAL persistence (blocks, commitqcs).
func writeAndSync(path string, data []byte) error {
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) //nolint:gosec // path is stateDir + hardcoded suffix; not user-controlled
if err != nil {
return err
}
defer func() { _ = f.Close() }()
if _, err := f.Write(data); err != nil {
_ = f.Close()
return err
}
if err := f.Sync(); err != nil {
_ = f.Close()
return f.Sync()
}

// writeFile writes an A/B state file: [4-byte CRC32-C BE][8-byte seq LE][proto data].
// Writes CRC, seq, and data directly to avoid copying data into an intermediate buffer.
// Used by abPersister.Persist for crash-safe state updates.
func writeFile(path string, crc uint32, seq, data []byte) error {
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) //nolint:gosec // path is stateDir + hardcoded suffix; not user-controlled
if err != nil {
return err
}
return f.Close()
defer func() { _ = f.Close() }()
var crcBuf [crcSize]byte
binary.BigEndian.PutUint32(crcBuf[:], crc)
for _, chunk := range [][]byte{crcBuf[:], seq, data} {
if _, err := f.Write(chunk); err != nil {
return err
}
}
return f.Sync()
}
Loading
Loading