Skip to content

Commit 7f121fb

Browse files
committed
persist metadata-only first batch when no symbols are selected
1 parent d44a78e commit 7f121fb

2 files changed

Lines changed: 128 additions & 0 deletions

File tree

supernode/adaptors/p2p.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,32 @@ func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, action
134134
totalBytesStored := 0
135135
metadataBytesStored := 0
136136
firstBatchProcessed := false
137+
if len(keys) == 0 && len(metadataFiles) > 0 {
138+
logtrace.Info(ctx, "store: batch send (metadata-only)", logtrace.Fields{
139+
"taskID": taskID,
140+
"metadata_count": len(metadataFiles),
141+
"metadata_bytes": metadataBytes,
142+
"metadata_mb_est": utils.BytesIntToMB(metadataBytes),
143+
})
144+
145+
bctx, cancel := context.WithTimeout(ctx, storeBatchContextTimeout)
146+
err = p.p2p.StoreBatch(bctx, metadataFiles, P2PDataRaptorQSymbol, taskID)
147+
cancel()
148+
if err != nil {
149+
return totalSymbols, totalAvailable, fmt.Errorf("p2p store batch (metadata-only): %w", err)
150+
}
151+
152+
logtrace.Info(ctx, "store: batch ok (metadata-only)", logtrace.Fields{
153+
"taskID": taskID,
154+
"metadata_count": len(metadataFiles),
155+
"metadata_bytes": metadataBytes,
156+
})
157+
158+
totalBytesStored += metadataBytes
159+
metadataBytesStored += metadataBytes
160+
firstBatchProcessed = true
161+
}
162+
137163
for start := 0; start < len(keys); {
138164
end := min(start+loadSymbolsBatchSize, len(keys))
139165
batch := keys[start:end]

supernode/adaptors/p2p_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"errors"
66
"strings"
77
"testing"
8+
"time"
89

910
"github.com/LumeraProtocol/supernode/v2/p2p"
11+
"github.com/LumeraProtocol/supernode/v2/pkg/codec"
1012
"github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore"
1113
"go.uber.org/mock/gomock"
1214
)
@@ -18,6 +20,35 @@ type clientWithPeersCount struct {
1820

1921
func (c clientWithPeersCount) PeersCount() int { return c.peers }
2022

23+
type testP2PClient struct {
24+
storeBatchFn func(ctx context.Context, values [][]byte, typ int, taskID string) error
25+
}
26+
27+
func (c testP2PClient) Retrieve(context.Context, string, ...bool) ([]byte, error) { return nil, nil }
28+
func (c testP2PClient) BatchRetrieve(context.Context, []string, int, string, ...bool) (map[string][]byte, error) {
29+
return nil, nil
30+
}
31+
func (c testP2PClient) BatchRetrieveStream(context.Context, []string, int32, string, func(string, []byte) error, ...bool) (int32, error) {
32+
return 0, nil
33+
}
34+
func (c testP2PClient) Store(context.Context, []byte, int) (string, error) { return "", nil }
35+
func (c testP2PClient) StoreBatch(ctx context.Context, values [][]byte, typ int, taskID string) error {
36+
if c.storeBatchFn != nil {
37+
return c.storeBatchFn(ctx, values, typ, taskID)
38+
}
39+
return nil
40+
}
41+
func (c testP2PClient) Delete(context.Context, string) error { return nil }
42+
func (c testP2PClient) Stats(context.Context) (*p2p.StatsSnapshot, error) { return nil, nil }
43+
func (c testP2PClient) NClosestNodes(context.Context, int, string, ...string) []string { return nil }
44+
func (c testP2PClient) NClosestNodesWithIncludingNodeList(context.Context, int, string, []string, []string) []string {
45+
return nil
46+
}
47+
func (c testP2PClient) LocalStore(context.Context, string, []byte) (string, error) { return "", nil }
48+
func (c testP2PClient) DisableKey(context.Context, string) error { return nil }
49+
func (c testP2PClient) EnableKey(context.Context, string) error { return nil }
50+
func (c testP2PClient) GetLocalKeys(context.Context, *time.Time, time.Time) ([]string, error) { return nil, nil }
51+
2152
func TestStoreArtefacts_ZeroPeers_ReturnsError(t *testing.T) {
2253
svc := NewP2PService(clientWithPeersCount{peers: 0}, nil)
2354

@@ -52,3 +83,74 @@ func TestStoreArtefacts_PeersPresent_DoesNotTripGuard(t *testing.T) {
5283
}
5384
}
5485

86+
func TestStoreCascadeSymbolsAndData_MetadataOnlyBatchWhenNoSymbols(t *testing.T) {
87+
ctrl := gomock.NewController(t)
88+
t.Cleanup(ctrl.Finish)
89+
90+
store := rqstore.NewMockStore(ctrl)
91+
store.EXPECT().StoreSymbolDirectory("task", "").Return(nil)
92+
store.EXPECT().UpdateIsFirstBatchStored("task").Return(nil)
93+
94+
metadata := [][]byte{[]byte("index-bytes"), []byte("layout-bytes")}
95+
storeBatchCalls := 0
96+
client := testP2PClient{storeBatchFn: func(_ context.Context, values [][]byte, typ int, taskID string) error {
97+
storeBatchCalls++
98+
if typ != P2PDataRaptorQSymbol {
99+
t.Fatalf("unexpected type: %d", typ)
100+
}
101+
if taskID != "task" {
102+
t.Fatalf("unexpected taskID: %s", taskID)
103+
}
104+
if len(values) != len(metadata) {
105+
t.Fatalf("expected %d metadata values, got %d", len(metadata), len(values))
106+
}
107+
for i := range metadata {
108+
if string(values[i]) != string(metadata[i]) {
109+
t.Fatalf("metadata payload mismatch at %d", i)
110+
}
111+
}
112+
return nil
113+
}}
114+
115+
svc := &p2pImpl{p2p: client, rqStore: store}
116+
stored, total, err := svc.storeCascadeSymbolsAndData(context.Background(), "task", "action", "", metadata, codec.Layout{Blocks: []codec.Block{{BlockID: 0}}})
117+
if err != nil {
118+
t.Fatalf("unexpected error: %v", err)
119+
}
120+
if stored != 0 {
121+
t.Fatalf("expected 0 stored symbols, got %d", stored)
122+
}
123+
if total != 0 {
124+
t.Fatalf("expected 0 total symbols, got %d", total)
125+
}
126+
if storeBatchCalls != 1 {
127+
t.Fatalf("expected StoreBatch to be called once, got %d", storeBatchCalls)
128+
}
129+
}
130+
131+
func TestStoreCascadeSymbolsAndData_MetadataOnlyBatchFailureSkipsFirstBatchFlag(t *testing.T) {
132+
ctrl := gomock.NewController(t)
133+
t.Cleanup(ctrl.Finish)
134+
135+
store := rqstore.NewMockStore(ctrl)
136+
store.EXPECT().StoreSymbolDirectory("task", "").Return(nil)
137+
store.EXPECT().UpdateIsFirstBatchStored("task").Times(0)
138+
139+
metadata := [][]byte{[]byte("index-bytes")}
140+
client := testP2PClient{storeBatchFn: func(_ context.Context, values [][]byte, typ int, taskID string) error {
141+
if typ != P2PDataRaptorQSymbol || taskID != "task" || len(values) != 1 {
142+
t.Fatalf("unexpected StoreBatch args")
143+
}
144+
return errors.New("p2p down")
145+
}}
146+
147+
svc := &p2pImpl{p2p: client, rqStore: store}
148+
_, _, err := svc.storeCascadeSymbolsAndData(context.Background(), "task", "action", "", metadata, codec.Layout{Blocks: []codec.Block{{BlockID: 0}}})
149+
if err == nil {
150+
t.Fatalf("expected error, got nil")
151+
}
152+
if !strings.Contains(err.Error(), "metadata-only") {
153+
t.Fatalf("expected metadata-only path error, got: %v", err)
154+
}
155+
}
156+

0 commit comments

Comments
 (0)