Conversation
…DA client, and update related dependencies.
📝 WalkthroughWalkthroughAdds zstd-based compression support and integration: new compression package (pkg/da), DA client now compresses blobs before Submit and decompresses on retrieval, dependency bumps for klauspost/compress, tests and small doc guideline change added. Changes
Sequence DiagramsequenceDiagram
participant Client as DA Client
participant Compress as pkg/da (Compress/IsCompressed)
participant Submitter as DA Submit/Storage
participant Retriever as DA Retrieve
participant Decompress as pkg/da (Decompress)
Client->>Client: determine compression level (batch size)
Client->>Compress: Compress(blob, level)
Compress-->>Client: compressed blob (magic prefix)
Client->>Submitter: Submit(compressed blob, options)
Submitter-->>Client: submission result
Retriever->>Submitter: fetch stored blob(s)
Submitter-->>Retriever: blob bytes
Retriever->>Compress: IsCompressed(blob)
alt compressed
Retriever->>Decompress: Decompress(blob)
Decompress-->>Retriever: original data
else not compressed
Compress-->>Retriever: blob unchanged
end
Retriever-->>Client: returned blob data (or skipped on corruption)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Claude finished @alpe's task in 1m 51s —— View job Code ReviewOverall this is a well-structured implementation with good security defenses (frame-header pre-check, memory cap, decompression timeout). The backward-compatible passthrough is implemented correctly. A few items need attention before merging. 🔴 Critical: Performance RegressionThe CI benchmark shows a ~1.7–1.9× slowdown in the DA submission path:
Root cause: 🟠 Major Issues1. Double allocation in The current code allocates the compressed output then copies it into a new buffer with the magic prefix — two allocations when one suffices: // Current: two allocations
compressed := encoders[level].EncodeAll(data, nil)
result := make([]byte, len(magic)+len(compressed))
copy(result, magic)
copy(result[len(magic):], compressed)
// Better: one allocation — let EncodeAll append onto the prefix slice
dst := make([]byte, len(magic))
copy(dst, magic)
return encoders[level].EncodeAll(data, dst), nil2. Silent blob skipping on decompression failure ( When a blob fails decompression in 3. Missing Layer 2 ( The code has a comment 🟡 Minor Issues4. Compression threshold magic numbers ( The values const (
highBacklogThreshold = 10 // batches larger than this use fastest compression
moderateBacklogThreshold = 3 // batches larger than this use default compression
)5. Error message style violations (
(There are also several pre-existing violations in the same file that are out of scope for this PR.) 6.
7. The test pre-cancels the context and then calls ✅ Good Practices
|
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
Summary of ChangesHello @alpe, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request integrates ZSTD compression for Data Availability (DA) blobs, enhancing the efficiency of data handling by reducing the size of blobs transmitted and stored. The changes involve updating the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3104 +/- ##
==========================================
+ Coverage 60.91% 60.96% +0.04%
==========================================
Files 113 114 +1
Lines 11617 11685 +68
==========================================
+ Hits 7077 7124 +47
- Misses 3742 3757 +15
- Partials 798 804 +6
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Code Review
This pull request introduces ZSTD compression for DA blobs, which is a great feature for reducing bandwidth and storage costs. The implementation is well-contained within a new pkg/da package and includes a comprehensive set of tests. The changes in block/internal/da/client.go correctly integrate the compression and decompression logic. I have a few suggestions to improve the efficiency, readability, and style of the new compression package.
pkg/da/compression.go
Outdated
| compressed := encoder.EncodeAll(data, nil) | ||
|
|
||
| // Prepend magic prefix | ||
| result := make([]byte, len(magic)+len(compressed)) | ||
| copy(result, magic) | ||
| copy(result[len(magic):], compressed) | ||
|
|
||
| return result, nil |
There was a problem hiding this comment.
The current implementation of Compress can be made more memory-efficient. It currently involves at least two allocations. You can reduce this by creating a destination buffer with the magic prefix and then having EncodeAll append the compressed data to it.
| compressed := encoder.EncodeAll(data, nil) | |
| // Prepend magic prefix | |
| result := make([]byte, len(magic)+len(compressed)) | |
| copy(result, magic) | |
| copy(result[len(magic):], compressed) | |
| return result, nil | |
| dst := make([]byte, len(magic)) | |
| copy(dst, magic) | |
| return encoder.EncodeAll(data, dst), nil |
pkg/da/compression.go
Outdated
| // Strip magic prefix and decompress | ||
| decompressed, err := decoder.DecodeAll(data[len(magic):], nil) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("compression: zstd decompress failed: %w", err) |
There was a problem hiding this comment.
The new error handling guideline in CLAUDE.md suggests avoiding "failed to" in error messages. While this message doesn't use that exact phrase, "zstd decompress failed" can be improved to be more direct. The err != nil check already implies failure. Consider rephrasing to state the operation and wrap the underlying error.
| return nil, fmt.Errorf("compression: zstd decompress failed: %w", err) | |
| return nil, fmt.Errorf("compression: zstd decompression: %w", err) |
References
- The project's coding guidelines in
CLAUDE.mdstate to 'Never start an error message with "failed to"'. (link)
| return data[0] == magic[0] && | ||
| data[1] == magic[1] && | ||
| data[2] == magic[2] && | ||
| data[3] == magic[3] |
There was a problem hiding this comment.
The manual byte-by-byte comparison to check for the magic prefix is correct, but using bytes.HasPrefix is more idiomatic and readable. You'll need to add import "bytes" to the file.
| return data[0] == magic[0] && | |
| data[1] == magic[1] && | |
| data[2] == magic[2] && | |
| data[3] == magic[3] | |
| return bytes.HasPrefix(data, magic) |
block/internal/da/client.go
Outdated
| data[i] = b.Data() | ||
| decompressed, decompErr := da.Decompress(b.Data()) | ||
| if decompErr != nil { | ||
| return datypes.ResultRetrieve{ |
There was a problem hiding this comment.
For backward compat, we should still fallback to the uncompressed.
There was a problem hiding this comment.
👍 This is implemented in the Decompress method. It checks for a "magic byte" prefix and returns raw data when not found.
…on based on batch size. (cherry picked from commit 3ff6211ec69d482437de79a617b5b39a88c91835)
|
E2e tests failed. I am not sure how this relates to the changes, yet. |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (3)
block/internal/da/client.go (2)
74-79:BlobSizereports uncompressed size — document this or add a compressed-size field.
blobSizeis calculated from the originaldata(pre-compression), but the bytes actually submitted to the DA layer are the compressed blobs. For operators relying onBlobSizefor cost estimation or quota tracking, this mismatch can be misleading. Consider either documenting thatBlobSizeis the uncompressed size, or adding a separate field (e.g.,CompressedBlobSize) so callers can distinguish.Also applies to: 200-210
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/da/client.go` around lines 74 - 79, The current client.Submit implementation computes blobSize from the original data slices (variable blobSize) which reflects the uncompressed size while DA submissions use compressed blobs; update Submit to either record and return both sizes by adding a new CompressedBlobSize field to the datypes.ResultSubmit (in addition to the existing BlobSize) and populate it with the length of the compressed payloads before sending, or explicitly document in the Submit comment and datatypes that BlobSize is the uncompressed size; change references in Submit and any callers of datypes.ResultSubmit to use the new CompressedBlobSize when estimating costs/quota.
91-100: Consider extracting compression-level thresholds as named constants.The magic numbers
10and3silently encode the backlog-pressure policy. Named constants (or a small config struct) would make the policy self-documenting and easier to tune without reading the switch logic.Example
+const ( + // highBacklogThreshold is the batch-size above which we use fastest compression. + highBacklogThreshold = 10 + // moderateBacklogThreshold is the batch-size above which we use default compression. + moderateBacklogThreshold = 3 +) + compLevel := da.LevelBest switch { - case len(data) > 10: + case len(data) > highBacklogThreshold: compLevel = da.LevelFastest - case len(data) > 3: + case len(data) > moderateBacklogThreshold: compLevel = da.LevelDefault }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/da/client.go` around lines 91 - 100, Extract the magic numeric thresholds used in the compression selection switch into named constants (e.g., MinFastBatchSize = 10, MinDefaultBatchSize = 3) or into a small config struct, then replace the literal `10` and `3` in the switch that sets `compLevel` (which currently uses `da.LevelBest`, `da.LevelFastest`, and `da.LevelDefault`) with those constants/fields so the backlog-pressure policy is self-documenting and easily tunable without changing control flow.pkg/da/compression_test.go (1)
61-81: Sharedsizesslice across subtests is fragile if parallelized.
sizesis appended insidet.Runcallbacks and then read after the loop. This works today because subtests run sequentially, but addingt.Parallel()in the future would introduce a data race. Consider collecting sizes outside the subtest or adding a brief comment warning against parallelizing these subtests.One alternative: move the cross-level assertion into its own subtest
- var sizes []int for _, lvl := range levels { + lvl := lvl t.Run(lvl.name, func(t *testing.T) { compressed, err := Compress(data, lvl.level) require.NoError(t, err) assert.True(t, IsCompressed(compressed)) - sizes = append(sizes, len(compressed)) - decompressed, err := Decompress(compressed) require.NoError(t, err) assert.Equal(t, data, decompressed) t.Logf("level=%s compressed=%d ratio=%.4f", lvl.name, len(compressed), float64(len(compressed))/float64(len(data))) }) } - // Best should produce equal or smaller output than Fastest - if len(sizes) == 3 { - assert.LessOrEqual(t, sizes[2], sizes[0], "LevelBest should compress at least as well as LevelFastest") - } + t.Run("best_leq_fastest", func(t *testing.T) { + fastest, err := Compress(data, LevelFastest) + require.NoError(t, err) + best, err := Compress(data, LevelBest) + require.NoError(t, err) + assert.LessOrEqual(t, len(best), len(fastest), + "LevelBest should compress at least as well as LevelFastest") + })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/da/compression_test.go` around lines 61 - 81, The test currently appends to a shared sizes slice inside each t.Run which is unsafe if subtests are parallelized; change to preallocate sizes := make([]int, len(levels)) before the loop, capture the loop index (e.g., idx := i) inside the t.Run closure, and assign sizes[idx] = len(compressed) instead of appending; keep the cross-level assertion after the loop (or move it into its own t.Run) and add a short comment on not calling t.Parallel() here to avoid races if someone enables parallel subtests; refer to the t.Run loop, levels, Compress and Decompress calls and the sizes slice when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@block/internal/da/client.go`:
- Around line 314-323: The decompression-error branch in Retrieve (where
da.Decompress returns decompErr) returns a ResultRetrieve without setting
BaseResult.Timestamp; update that error return to set Timestamp to the existing
block time (use the blockTime variable if available, otherwise time.Now()) so
the ResultRetrieve mirrors other error branches; modify the return constructing
datypes.ResultRetrieve / datypes.BaseResult in that decompErr branch to include
Timestamp: blockTime (or time.Now()) to ensure non-zero timestamps for callers.
- Around line 113-118: The log computes a compression ratio using
float64(len(compressed))/float64(len(raw)) which yields NaN for zero-length raw;
fix by computing the ratio before logging and guarding when len(raw) == 0 (e.g.,
set ratio := 0.0 or omit the field), then call
c.logger.Debug().Int("original_size", len(raw)).Int("compressed_size",
len(compressed)).Float64("ratio", ratio).Int("level",
int(compLevel)).Msg("compressed blob for DA submission"); this ensures no
division by zero produces NaN in the logs while keeping the same c.logger.Debug
call and the raw/compressed/compLevel identifiers.
In `@pkg/da/compression.go`:
- Around line 47-51: The zstd decoder is created without a max-memory cap
(decoder, err = zstd.NewReader(nil)), which risks OOM on untrusted DA blobs;
update the decoder initialization in pkg/da/compression.go to pass
zstd.WithDecoderMaxMemory(...) (e.g., 7*1024*1024 for a 7MB cap matching the DA
max blob size) so the decoder enforces a safe decompression memory limit.
---
Nitpick comments:
In `@block/internal/da/client.go`:
- Around line 74-79: The current client.Submit implementation computes blobSize
from the original data slices (variable blobSize) which reflects the
uncompressed size while DA submissions use compressed blobs; update Submit to
either record and return both sizes by adding a new CompressedBlobSize field to
the datypes.ResultSubmit (in addition to the existing BlobSize) and populate it
with the length of the compressed payloads before sending, or explicitly
document in the Submit comment and datatypes that BlobSize is the uncompressed
size; change references in Submit and any callers of datypes.ResultSubmit to use
the new CompressedBlobSize when estimating costs/quota.
- Around line 91-100: Extract the magic numeric thresholds used in the
compression selection switch into named constants (e.g., MinFastBatchSize = 10,
MinDefaultBatchSize = 3) or into a small config struct, then replace the literal
`10` and `3` in the switch that sets `compLevel` (which currently uses
`da.LevelBest`, `da.LevelFastest`, and `da.LevelDefault`) with those
constants/fields so the backlog-pressure policy is self-documenting and easily
tunable without changing control flow.
In `@pkg/da/compression_test.go`:
- Around line 61-81: The test currently appends to a shared sizes slice inside
each t.Run which is unsafe if subtests are parallelized; change to preallocate
sizes := make([]int, len(levels)) before the loop, capture the loop index (e.g.,
idx := i) inside the t.Run closure, and assign sizes[idx] = len(compressed)
instead of appending; keep the cross-level assertion after the loop (or move it
into its own t.Run) and add a short comment on not calling t.Parallel() here to
avoid races if someone enables parallel subtests; refer to the t.Run loop,
levels, Compress and Decompress calls and the sizes slice when making the
change.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (7)
apps/evm/go.sumis excluded by!**/*.sumapps/grpc/go.sumis excluded by!**/*.sumapps/testapp/go.sumis excluded by!**/*.sumexecution/evm/go.sumis excluded by!**/*.sumexecution/evm/test/go.sumis excluded by!**/*.sumgo.sumis excluded by!**/*.sumtest/e2e/go.sumis excluded by!**/*.sum
📒 Files selected for processing (10)
CLAUDE.mdapps/evm/go.modapps/grpc/go.modapps/testapp/go.modblock/internal/da/client.goexecution/evm/test/go.modgo.modpkg/da/compression.gopkg/da/compression_test.gotest/e2e/go.mod
There was a problem hiding this comment.
⚠️ Performance Alert ⚠️
Possible performance regression was detected for benchmark 'Spamoor Trace Benchmarks'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.
| Benchmark suite | Current: db89458 | Previous: 212ac08 | Ratio |
|---|---|---|---|
SpamoorSmoke - DA.Submit (avg) |
1089.3372093023256 us |
636.5232558139535 us |
1.71 |
SpamoorSmoke - DASubmitter.SubmitData (avg) |
1761.9375 us |
931.4242424242424 us |
1.89 |
This comment was automatically generated by workflow using github-action-benchmark.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
pkg/da/compression.go (1)
19-20: Use a single constant for decoder-memory cap and declared-size cap.
7 MiBis duplicated in two places. Centralizing avoids accidental drift between header checks and decoder limits.♻️ Proposed refactor
const maxDecompressedSize = 7 * 1024 * 1024 // 7 MiB +const maxDecoderMemory = maxDecompressedSize @@ - const maxDecoderMemory = 7 * 1024 * 1024 // 7 MiB cap var err error decoder, err = zstd.NewReader(nil, zstd.WithDecoderMaxMemory(maxDecoderMemory))Also applies to: 64-67
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/da/compression.go` around lines 19 - 20, The duplicated 7 MiB value should be centralized: introduce a single exported or package-level constant (e.g., maxDecompressedSize) and use it for both the header declared-size cap and the decoder memory cap instead of repeating the literal; update all occurrences (including the existing const maxDecompressedSize and the checks in the decoder/reader code paths referenced around the functions that validate header size and set decoder limits) to reference that constant so both checks stay in sync.block/internal/da/client.go (1)
91-100: Externalize compression-level thresholds instead of hard-coding batch cutoffs.The
>3/>10cutoffs are policy knobs. Moving them into config (or constants wired from config) will make tuning easier under real traffic without code changes.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/da/client.go` around lines 91 - 100, The compression-level batch cutoffs are hard-coded; replace the literals used in the switch (the len(data) > 10 and len(data) > 3 checks that set compLevel to da.LevelFastest/da.LevelDefault/da.LevelBest) with configurable thresholds (e.g., constants or values read from the existing config object) so operators can tune them without code changes; update the code that computes compLevel to read two named thresholds (e.g., fastBatchThreshold and defaultBatchThreshold) from config or constants and use those in the switch, and add validation/defaults for those thresholds where configuration is loaded.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@block/internal/da/client.go`:
- Around line 314-323: In Retrieve/Get in client.go, don't swallow decompression
failures: when da.Decompress(ctx, b.Data()) returns an error (the branch that
currently logs via c.logger.Warn() and continues), return an error immediately
(or propagate a wrapped error) instead of skipping that blob and returning a
successful but incomplete payload; update the code paths around da.Decompress,
c.logger.Warn(), and the calling functions Retrieve/Get to propagate the
decompression error (or explicit partial-result sentinel) so callers can detect
corrupted/missing data.
In `@pkg/da/compression.go`:
- Around line 123-137: The goroutine that calls decoder.DecodeAll(payload, nil)
can continue running after Decompress returns on ctx.Done(), causing goroutine
leaks; modify the goroutine (the closure that currently writes to ch) to select
when sending the decodeResult so it respects ctx cancellation — i.e., capture
ctx in the closure, run out, err := decoder.DecodeAll(...), then use select {
case ch <- decodeResult{data: out, err: err}: case <-ctx.Done(): } so the worker
exits without blocking if the caller timed out; keep the existing select in the
caller unchanged.
---
Nitpick comments:
In `@block/internal/da/client.go`:
- Around line 91-100: The compression-level batch cutoffs are hard-coded;
replace the literals used in the switch (the len(data) > 10 and len(data) > 3
checks that set compLevel to da.LevelFastest/da.LevelDefault/da.LevelBest) with
configurable thresholds (e.g., constants or values read from the existing config
object) so operators can tune them without code changes; update the code that
computes compLevel to read two named thresholds (e.g., fastBatchThreshold and
defaultBatchThreshold) from config or constants and use those in the switch, and
add validation/defaults for those thresholds where configuration is loaded.
In `@pkg/da/compression.go`:
- Around line 19-20: The duplicated 7 MiB value should be centralized: introduce
a single exported or package-level constant (e.g., maxDecompressedSize) and use
it for both the header declared-size cap and the decoder memory cap instead of
repeating the literal; update all occurrences (including the existing const
maxDecompressedSize and the checks in the decoder/reader code paths referenced
around the functions that validate header size and set decoder limits) to
reference that constant so both checks stay in sync.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
block/internal/da/client.gonode/execution_test.gopkg/da/compression.gopkg/da/compression_test.gopkg/da/types/namespace_test.gotest/e2e/failover_e2e_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/da/compression_test.go
| decompressed, decompErr := da.Decompress(ctx, b.Data()) | ||
| if decompErr != nil { | ||
| c.logger.Warn(). | ||
| Err(decompErr). | ||
| Uint64("height", height). | ||
| Int("blob_index", i). | ||
| Int("blob_size", len(b.Data())). | ||
| Msg("skipping malicious or corrupt DA blob") | ||
| continue | ||
| } |
There was a problem hiding this comment.
Do not return success when decompression dropped all requested data.
Retrieve/Get currently swallow decompression errors and can return a “successful” empty/partial payload with no explicit failure signal. That can mask corruption and break caller assumptions on completeness.
💡 Proposed fix
ids := make([]datypes.ID, 0, len(blobs))
data := make([]datypes.Blob, 0, len(blobs))
+ failedDecompress := 0
for i, b := range blobs {
decompressed, decompErr := da.Decompress(ctx, b.Data())
if decompErr != nil {
+ failedDecompress++
c.logger.Warn().
Err(decompErr).
Uint64("height", height).
Int("blob_index", i).
Int("blob_size", len(b.Data())).
Msg("skipping malicious or corrupt DA blob")
continue
}
ids = append(ids, blobrpc.MakeID(height, b.Commitment))
data = append(data, decompressed)
}
+ if len(data) == 0 && len(blobs) > 0 {
+ return datypes.ResultRetrieve{
+ BaseResult: datypes.BaseResult{
+ Code: datypes.StatusError,
+ Message: fmt.Sprintf("all blobs failed decompression (%d/%d)", failedDecompress, len(blobs)),
+ Height: height,
+ Timestamp: blockTime,
+ },
+ }
+ }Apply the same principle in Get: if any requested ID fails decompression, return an error (or explicitly surface partial-result semantics).
As per coding guidelines, "Return errors early".
Also applies to: 330-338, 403-413
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/da/client.go` around lines 314 - 323, In Retrieve/Get in
client.go, don't swallow decompression failures: when da.Decompress(ctx,
b.Data()) returns an error (the branch that currently logs via c.logger.Warn()
and continues), return an error immediately (or propagate a wrapped error)
instead of skipping that blob and returning a successful but incomplete payload;
update the code paths around da.Decompress, c.logger.Warn(), and the calling
functions Retrieve/Get to propagate the decompression error (or explicit
partial-result sentinel) so callers can detect corrupted/missing data.
| ch := make(chan decodeResult, 1) | ||
| go func() { | ||
| out, err := decoder.DecodeAll(payload, nil) | ||
| ch <- decodeResult{data: out, err: err} | ||
| }() | ||
|
|
||
| select { | ||
| case res := <-ch: | ||
| if res.err != nil { | ||
| return nil, fmt.Errorf("zstd decompress: %w", res.err) | ||
| } | ||
| return res.data, nil | ||
| case <-ctx.Done(): | ||
| return nil, fmt.Errorf("zstd decompress timeout: %w", ctx.Err()) | ||
| } |
There was a problem hiding this comment.
Timeout does not stop decode work; detached decode goroutines can pile up.
When ctx.Done() wins, Decompress returns, but the decode goroutine continues running. Under repeated timeouts, this can create unbounded background CPU work.
💡 Proposed fix (bounded concurrent decodes)
+const maxConcurrentDecodes = 8
+var decodeSem = make(chan struct{}, maxConcurrentDecodes)
func Decompress(ctx context.Context, data []byte) ([]byte, error) {
if !IsCompressed(data) {
return data, nil
}
@@
ctx, cancel := context.WithTimeout(ctx, decompressTimeout)
defer cancel()
+ select {
+ case decodeSem <- struct{}{}:
+ case <-ctx.Done():
+ return nil, fmt.Errorf("zstd decompress timeout: %w", ctx.Err())
+ }
+
ch := make(chan decodeResult, 1)
go func() {
+ defer func() { <-decodeSem }()
out, err := decoder.DecodeAll(payload, nil)
ch <- decodeResult{data: out, err: err}
}()As per coding guidelines, "Be mindful of goroutine leaks in concurrent processing code".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/da/compression.go` around lines 123 - 137, The goroutine that calls
decoder.DecodeAll(payload, nil) can continue running after Decompress returns on
ctx.Done(), causing goroutine leaks; modify the goroutine (the closure that
currently writes to ch) to select when sending the decodeResult so it respects
ctx cancellation — i.e., capture ctx in the closure, run out, err :=
decoder.DecodeAll(...), then use select { case ch <- decodeResult{data: out,
err: err}: case <-ctx.Done(): } so the worker exits without blocking if the
caller timed out; keep the existing select in the caller unchanged.
julienrbrt
left a comment
There was a problem hiding this comment.
lgtm! some questions/nits about some default constants.
| // maxDecompressedSize is the maximum allowed decompressed output size. | ||
| // Matches the WithDecoderMaxMemory cap and provides early rejection | ||
| // by inspecting the zstd frame header before allocating anything. | ||
| const maxDecompressedSize = 7 * 1024 * 1024 // 7 MiB |
There was a problem hiding this comment.
this is based on our max blob size. Could we just use that variable instead of having another one to remember here? As when we increase blob size after a celestia upgrade, we may forget to bump this.
| } | ||
| encoders[i] = enc | ||
| } | ||
| const maxDecoderMemory = 7 * 1024 * 1024 // 7 MiB cap |
There was a problem hiding this comment.
why using this instead of maxDecompressedSize?
| } | ||
|
|
||
| // Layer 3: Apply the shorter of caller deadline and our hard cap. | ||
| ctx, cancel := context.WithTimeout(ctx, decompressTimeout) |
There was a problem hiding this comment.
i get the idea, but can we maybe factor it by the maxDecompressedSize? so when this one increases, we expect the decrompression time to linearly increase? (or we need to plot decompression size / decompression time, to know the real link and factor it properly)
Resolves #2532
Overview
Ref:
Benchmarks Zstd
ethereum-optimism/optimistic-specs#174
Arbitrum uses Brotli with dynamic compaction levels: Level 11 for low traffic; Level 0-1 for spikes.
https://docs.arbitrum.io/how-arbitrum-works/inside-arbitrum-nitro#:~:text=The%20data%20then%20undergoes%20compression,toward%20speed%20during%20heavy%20backlogs.
Summary by CodeRabbit
New Features
Documentation
Chores