Skip to content

feat: Implement ZSTD compression for DA blobs#3104

Open
alpe wants to merge 10 commits intomainfrom
alex/blob-compression
Open

feat: Implement ZSTD compression for DA blobs#3104
alpe wants to merge 10 commits intomainfrom
alex/blob-compression

Conversation

@alpe
Copy link
Contributor

@alpe alpe commented Feb 24, 2026

Resolves #2532

Overview

  • Compress via ZSTD
  • Dynamic compression level selection by load.

Ref:
Benchmarks Zstd
ethereum-optimism/optimistic-specs#174

Arbitrum uses Brotli with dynamic compaction levels: Level 11 for low traffic; Level 0-1 for spikes.
https://docs.arbitrum.io/how-arbitrum-works/inside-arbitrum-nitro#:~:text=The%20data%20then%20undergoes%20compression,toward%20speed%20during%20heavy%20backlogs.

Summary by CodeRabbit

  • New Features

    • Added transparent compression for data submissions with automatic level selection, decompression on retrieval, compression metrics, and skipping/logging of corrupted items.
  • Documentation

    • Added guideline: error messages must not start with "failed to".
  • Chores

    • Bumped klauspost/compress dependency to v1.18.4 across modules.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 24, 2026

📝 Walkthrough

Walkthrough

Adds zstd-based compression support and integration: new compression package (pkg/da), DA client now compresses blobs before Submit and decompresses on retrieval, dependency bumps for klauspost/compress, tests and small doc guideline change added.

Changes

Cohort / File(s) Summary
Documentation & Conventions
CLAUDE.md
Added error-handling guideline: do not start error messages with "failed to".
Dependency updates
go.mod, apps/evm/go.mod, apps/grpc/go.mod, apps/testapp/go.mod, execution/evm/test/go.mod, test/e2e/go.mod
Bumped github.com/klauspost/compress from v1.18.0 → v1.18.4 (moved to direct require in root; submodules updated).
Compression implementation
pkg/da/compression.go
New zstd-based compression API: CompressionLevel enum, Compress/Decompress/IsCompressed, magic-prefix, size limits, timeout-limited decompression, and ErrDecompressedSizeExceeded.
Compression tests
pkg/da/compression_test.go
Extensive unit tests covering round-trips, levels, edge cases, corruption, context cancellation, oversized-frame detection, and IsCompressed behavior.
DA client integration
block/internal/da/client.go
Submit signature extended with options param; per-blob dynamic compression level selection, compress-before-submit, logging of compression metrics, enforce compressed-size limits, decompression on retrieval with corrupted items skipped.
Test refactors / usage
node/execution_test.go, pkg/da/types/namespace_test.go, test/e2e/failover_e2e_test.go
Replaced manual checks with testify/require in tests; added pre-unmarshal decompression (da.IsCompressed/da.Decompress) when extracting block data.

Sequence Diagram

sequenceDiagram
    participant Client as DA Client
    participant Compress as pkg/da (Compress/IsCompressed)
    participant Submitter as DA Submit/Storage
    participant Retriever as DA Retrieve
    participant Decompress as pkg/da (Decompress)

    Client->>Client: determine compression level (batch size)
    Client->>Compress: Compress(blob, level)
    Compress-->>Client: compressed blob (magic prefix)
    Client->>Submitter: Submit(compressed blob, options)
    Submitter-->>Client: submission result

    Retriever->>Submitter: fetch stored blob(s)
    Submitter-->>Retriever: blob bytes
    Retriever->>Compress: IsCompressed(blob)
    alt compressed
        Retriever->>Decompress: Decompress(blob)
        Decompress-->>Retriever: original data
    else not compressed
        Compress-->>Retriever: blob unchanged
    end
    Retriever-->>Client: returned blob data (or skipped on corruption)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested labels

T:dependencies, T:testing

Suggested reviewers

  • tac0turtle
  • tzdybal
  • gupadhyaya
  • tuxcanfly
  • randygrok

Poem

🐰 I zip and tuck each tiny blob,
A magic prefix, snug as a bob.
Fewer bytes to hop and send,
Faster trails from end to end.
Hooray — compressed, then decompressed with love.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 17.39% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and directly summarizes the main change: implementing ZSTD compression for DA blobs, which aligns perfectly with the core objectives.
Description check ✅ Passed The description adequately covers the Overview section with clear explanation of compression implementation and dynamic level selection, includes issue reference, and provides relevant benchmarking context.
Linked Issues check ✅ Passed The PR successfully implements core requirements from issue #2532: ZSTD compression algorithm, dynamic compression level selection by load, backward compatibility via magic header, and compression handling integrated into DA client.
Out of Scope Changes check ✅ Passed All changes are within scope: compression implementation (pkg/da/compression.go), DA client integration (block/internal/da/client.go), dependency updates for klauspost/compress, guideline documentation, and test refactoring for better maintainability.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch alex/blob-compression

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Contributor

claude bot commented Feb 24, 2026

Claude finished @alpe's task in 1m 51s —— View job


Code Review

Overall this is a well-structured implementation with good security defenses (frame-header pre-check, memory cap, decompression timeout). The backward-compatible passthrough is implemented correctly. A few items need attention before merging.


🔴 Critical: Performance Regression

The CI benchmark shows a ~1.7–1.9× slowdown in the DA submission path:

Benchmark Before After Ratio
DA.Submit (avg) 636µs 1065µs 1.67×
DASubmitter.SubmitData (avg) 931µs 1761µs 1.89×

Root cause: LevelBest compression is the default for batches ≤3 blobs — the common case in normal operation. zstd.SpeedBestCompression is the most CPU-intensive level. For a low-backlog sequencer, nearly every submit call takes the slowest path. Consider defaulting to LevelDefault instead of LevelBest, reserving LevelBest only for single-blob batches or when explicitly configured. Fix this →


🟠 Major Issues

1. Double allocation in Compress (pkg/da/compression.go:83-90)

The current code allocates the compressed output then copies it into a new buffer with the magic prefix — two allocations when one suffices:

// Current: two allocations
compressed := encoders[level].EncodeAll(data, nil)
result := make([]byte, len(magic)+len(compressed))
copy(result, magic)
copy(result[len(magic):], compressed)

// Better: one allocation — let EncodeAll append onto the prefix slice
dst := make([]byte, len(magic))
copy(dst, magic)
return encoders[level].EncodeAll(data, dst), nil

Fix this →

2. Silent blob skipping on decompression failure (client.go:314-326, client.go:403-411)

When a blob fails decompression in Retrieve or Get, it is silently skipped via continue. The returned IDs slice will have fewer entries than the blobs at that DA height, but callers have no way to distinguish "no data for height" from "some blobs were unreadable". This could cause subtle inconsistencies in the sync path. At minimum, the Retrieve result Data length must always match IDs length, which this code preserves — but callers that rely on total count may be surprised. Consider returning a partial-failure indicator or at least adding the blob count to the warning log.

3. Missing Layer 2 (pkg/da/compression.go:119)

The code has a comment // Layer 3: Apply the shorter of caller deadline and our hard cap. but there's no "Layer 2" anywhere. The comment numbering jumps from "Layer 1" to "Layer 3", which suggests a removed intermediate layer. Either renumber the layers or document what happened to Layer 2.


🟡 Minor Issues

4. Compression threshold magic numbers (client.go:96-100)

The values 10 and 3 encode the backlog-pressure policy but have no documentation. Extract as named constants:

const (
    highBacklogThreshold     = 10 // batches larger than this use fastest compression
    moderateBacklogThreshold = 3  // batches larger than this use default compression
)

5. Error message style violations (client.go:133, 145)

CLAUDE.md explicitly prohibits starting error messages with "failed to". Two new messages in this PR violate this:

  • Line 133: "failed to build blob %d: %v""build blob %d: %v" or "creating blob %d: %v"
  • Line 145: "failed to parse submit options: %v""parse submit options: %v"

(There are also several pre-existing violations in the same file that are out of scope for this PR.)

Fix this →

6. BlobSize reports uncompressed bytes (client.go:77-78, 207)

blobSize is computed from the original data slices, but the compressed blobs are what's actually submitted to the DA layer. Callers using BlobSize for cost estimation or quota tracking will see inflated numbers. Either document this explicitly in the field's godoc, or add a separate CompressedBlobSize field.

7. TestDecompress_ContextCanceled may be flaky (compression_test.go:190-202)

The test pre-cancels the context and then calls Decompress. Since the payload is tiny ("test data for context cancellation"), the goroutine launched inside Decompress may call DecodeAll and write to ch before the select checks ctx.Done(). The select statement has no deterministic ordering when both cases are ready. This test will pass most of the time but could theoretically return nil, nil instead of an error on a fast machine. A larger payload (e.g. a 1 MB blob) would make the race much less likely.


✅ Good Practices

  • Frame header pre-check (hdr.Decode(payload)) efficiently rejects decompression bombs before any memory allocation
  • WithDecoderMaxMemory(7MB) caps memory usage matching the DA blob size limit
  • decompressTimeout provides an independent CPU-bomb guard separate from the caller's deadline
  • Backward-compatible passthrough (IsCompressed check before decompressing) is clean and well-tested
  • Comprehensive test coverage in compression_test.go covering edge cases, all levels, random data, and invalid inputs
  • E2E test in failover_e2e_test.go correctly decompresses blobs before parsing via da.IsCompressed/da.Decompress

@github-actions
Copy link
Contributor

github-actions bot commented Feb 24, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedFeb 25, 2026, 12:50 PM

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @alpe, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request integrates ZSTD compression for Data Availability (DA) blobs, enhancing the efficiency of data handling by reducing the size of blobs transmitted and stored. The changes involve updating the klauspost/compress library, introducing a dedicated compression utility package, and modifying the DA client to transparently apply compression during submission and decompression during retrieval. This optimization aims to decrease operational costs and improve performance without altering the external interface for blob operations.

Highlights

  • ZSTD Compression for DA Blobs: Implemented ZSTD compression for Data Availability (DA) blobs, significantly reducing their size before submission and after retrieval to optimize bandwidth and storage.
  • Dependency Update: Updated the klauspost/compress Go module dependency across various applications to version v1.18.4 to support the new compression features.
  • New Compression Package: Introduced a new pkg/da/compression package containing utility functions for ZSTD compression, decompression, and a custom magic prefix check for identifying compressed data.
  • DA Client Integration: Modified the DA client (block/internal/da/client.go) to automatically compress blobs before submission and decompress them upon retrieval, including logging compression ratios for monitoring.
  • Comprehensive Testing: Added comprehensive unit tests for the new compression and decompression logic to ensure data integrity, round-trip correctness, and efficiency across various data types and sizes.
  • Documentation Update: Updated development guidelines in CLAUDE.md to include a new rule against starting error messages with 'failed to'.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • CLAUDE.md
    • Added a new guideline for error message formatting.
  • apps/evm/go.mod
    • Updated github.com/klauspost/compress dependency to v1.18.4.
  • apps/evm/go.sum
    • Updated checksums for github.com/klauspost/compress.
  • apps/grpc/go.mod
    • Updated github.com/klauspost/compress dependency to v1.18.4.
  • apps/grpc/go.sum
    • Updated checksums for github.com/klauspost/compress.
  • apps/testapp/go.mod
    • Updated github.com/klauspost/compress dependency to v1.18.4.
  • apps/testapp/go.sum
    • Updated checksums for github.com/klauspost/compress.
  • block/internal/da/client.go
    • Integrated ZSTD compression for outgoing blobs and decompression for incoming blobs.
    • Added error handling for compression and decompression failures.
    • Included logging for compression ratios during blob submission.
  • execution/evm/go.sum
    • Updated checksums for github.com/klauspost/compress.
  • execution/evm/test/go.mod
    • Updated github.com/klauspost/compress dependency to v1.18.4.
  • execution/evm/test/go.sum
    • Updated checksums for github.com/klauspost/compress.
  • go.mod
    • Added github.com/klauspost/compress v1.18.4 as a direct dependency.
    • Removed github.com/klauspost/compress v1.18.0 as an indirect dependency.
  • go.sum
    • Updated checksums for github.com/klauspost/compress.
  • pkg/da/compression.go
    • Added a new file defining ZSTD compression and decompression functions.
    • Implemented a custom 4-byte magic prefix ('ZSTD') for compressed data identification.
    • Provided IsCompressed function to check for the magic prefix.
  • pkg/da/compression_test.go
    • Added unit tests to verify the correctness and efficiency of the ZSTD compression and decompression logic.
    • Included tests for edge cases like empty data, uncompressed passthrough, and corrupt data.
  • test/e2e/go.mod
    • Updated github.com/klauspost/compress dependency to v1.18.4.
  • test/e2e/go.sum
    • Updated checksums for github.com/klauspost/compress.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@codecov
Copy link

codecov bot commented Feb 24, 2026

Codecov Report

❌ Patch coverage is 66.66667% with 24 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.96%. Comparing base (67e18bd) to head (c875c0b).

Files with missing lines Patch % Lines
block/internal/da/client.go 41.17% 16 Missing and 4 partials ⚠️
pkg/da/compression.go 89.47% 2 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3104      +/-   ##
==========================================
+ Coverage   60.91%   60.96%   +0.04%     
==========================================
  Files         113      114       +1     
  Lines       11617    11685      +68     
==========================================
+ Hits         7077     7124      +47     
- Misses       3742     3757      +15     
- Partials      798      804       +6     
Flag Coverage Δ
combined 60.96% <66.66%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces ZSTD compression for DA blobs, which is a great feature for reducing bandwidth and storage costs. The implementation is well-contained within a new pkg/da package and includes a comprehensive set of tests. The changes in block/internal/da/client.go correctly integrate the compression and decompression logic. I have a few suggestions to improve the efficiency, readability, and style of the new compression package.

Comment on lines 39 to 46
compressed := encoder.EncodeAll(data, nil)

// Prepend magic prefix
result := make([]byte, len(magic)+len(compressed))
copy(result, magic)
copy(result[len(magic):], compressed)

return result, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of Compress can be made more memory-efficient. It currently involves at least two allocations. You can reduce this by creating a destination buffer with the magic prefix and then having EncodeAll append the compressed data to it.

Suggested change
compressed := encoder.EncodeAll(data, nil)
// Prepend magic prefix
result := make([]byte, len(magic)+len(compressed))
copy(result, magic)
copy(result[len(magic):], compressed)
return result, nil
dst := make([]byte, len(magic))
copy(dst, magic)
return encoder.EncodeAll(data, dst), nil

// Strip magic prefix and decompress
decompressed, err := decoder.DecodeAll(data[len(magic):], nil)
if err != nil {
return nil, fmt.Errorf("compression: zstd decompress failed: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new error handling guideline in CLAUDE.md suggests avoiding "failed to" in error messages. While this message doesn't use that exact phrase, "zstd decompress failed" can be improved to be more direct. The err != nil check already implies failure. Consider rephrasing to state the operation and wrap the underlying error.

Suggested change
return nil, fmt.Errorf("compression: zstd decompress failed: %w", err)
return nil, fmt.Errorf("compression: zstd decompression: %w", err)
References
  1. The project's coding guidelines in CLAUDE.md state to 'Never start an error message with "failed to"'. (link)

Comment on lines +71 to +74
return data[0] == magic[0] &&
data[1] == magic[1] &&
data[2] == magic[2] &&
data[3] == magic[3]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The manual byte-by-byte comparison to check for the magic prefix is correct, but using bytes.HasPrefix is more idiomatic and readable. You'll need to add import "bytes" to the file.

Suggested change
return data[0] == magic[0] &&
data[1] == magic[1] &&
data[2] == magic[2] &&
data[3] == magic[3]
return bytes.HasPrefix(data, magic)

data[i] = b.Data()
decompressed, decompErr := da.Decompress(b.Data())
if decompErr != nil {
return datypes.ResultRetrieve{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backward compat, we should still fallback to the uncompressed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 This is implemented in the Decompress method. It checks for a "magic byte" prefix and returns raw data when not found.

alpe added 2 commits February 24, 2026 12:04
…on based on batch size.

(cherry picked from commit 3ff6211ec69d482437de79a617b5b39a88c91835)
@alpe alpe marked this pull request as ready for review February 24, 2026 15:28
@alpe alpe marked this pull request as draft February 24, 2026 15:29
@alpe
Copy link
Contributor Author

alpe commented Feb 24, 2026

E2e tests failed. I am not sure how this relates to the changes, yet.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
block/internal/da/client.go (2)

74-79: BlobSize reports uncompressed size — document this or add a compressed-size field.

blobSize is calculated from the original data (pre-compression), but the bytes actually submitted to the DA layer are the compressed blobs. For operators relying on BlobSize for cost estimation or quota tracking, this mismatch can be misleading. Consider either documenting that BlobSize is the uncompressed size, or adding a separate field (e.g., CompressedBlobSize) so callers can distinguish.

Also applies to: 200-210

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/client.go` around lines 74 - 79, The current client.Submit
implementation computes blobSize from the original data slices (variable
blobSize) which reflects the uncompressed size while DA submissions use
compressed blobs; update Submit to either record and return both sizes by adding
a new CompressedBlobSize field to the datypes.ResultSubmit (in addition to the
existing BlobSize) and populate it with the length of the compressed payloads
before sending, or explicitly document in the Submit comment and datatypes that
BlobSize is the uncompressed size; change references in Submit and any callers
of datypes.ResultSubmit to use the new CompressedBlobSize when estimating
costs/quota.

91-100: Consider extracting compression-level thresholds as named constants.

The magic numbers 10 and 3 silently encode the backlog-pressure policy. Named constants (or a small config struct) would make the policy self-documenting and easier to tune without reading the switch logic.

Example
+const (
+	// highBacklogThreshold is the batch-size above which we use fastest compression.
+	highBacklogThreshold = 10
+	// moderateBacklogThreshold is the batch-size above which we use default compression.
+	moderateBacklogThreshold = 3
+)
+
 	compLevel := da.LevelBest
 	switch {
-	case len(data) > 10:
+	case len(data) > highBacklogThreshold:
 		compLevel = da.LevelFastest
-	case len(data) > 3:
+	case len(data) > moderateBacklogThreshold:
 		compLevel = da.LevelDefault
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/client.go` around lines 91 - 100, Extract the magic numeric
thresholds used in the compression selection switch into named constants (e.g.,
MinFastBatchSize = 10, MinDefaultBatchSize = 3) or into a small config struct,
then replace the literal `10` and `3` in the switch that sets `compLevel` (which
currently uses `da.LevelBest`, `da.LevelFastest`, and `da.LevelDefault`) with
those constants/fields so the backlog-pressure policy is self-documenting and
easily tunable without changing control flow.
pkg/da/compression_test.go (1)

61-81: Shared sizes slice across subtests is fragile if parallelized.

sizes is appended inside t.Run callbacks and then read after the loop. This works today because subtests run sequentially, but adding t.Parallel() in the future would introduce a data race. Consider collecting sizes outside the subtest or adding a brief comment warning against parallelizing these subtests.

One alternative: move the cross-level assertion into its own subtest
-	var sizes []int
 	for _, lvl := range levels {
+		lvl := lvl
 		t.Run(lvl.name, func(t *testing.T) {
 			compressed, err := Compress(data, lvl.level)
 			require.NoError(t, err)
 			assert.True(t, IsCompressed(compressed))
 
-			sizes = append(sizes, len(compressed))
-
 			decompressed, err := Decompress(compressed)
 			require.NoError(t, err)
 			assert.Equal(t, data, decompressed)
 
 			t.Logf("level=%s compressed=%d ratio=%.4f", lvl.name, len(compressed), float64(len(compressed))/float64(len(data)))
 		})
 	}
 
-	// Best should produce equal or smaller output than Fastest
-	if len(sizes) == 3 {
-		assert.LessOrEqual(t, sizes[2], sizes[0], "LevelBest should compress at least as well as LevelFastest")
-	}
+	t.Run("best_leq_fastest", func(t *testing.T) {
+		fastest, err := Compress(data, LevelFastest)
+		require.NoError(t, err)
+		best, err := Compress(data, LevelBest)
+		require.NoError(t, err)
+		assert.LessOrEqual(t, len(best), len(fastest),
+			"LevelBest should compress at least as well as LevelFastest")
+	})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/da/compression_test.go` around lines 61 - 81, The test currently appends
to a shared sizes slice inside each t.Run which is unsafe if subtests are
parallelized; change to preallocate sizes := make([]int, len(levels)) before the
loop, capture the loop index (e.g., idx := i) inside the t.Run closure, and
assign sizes[idx] = len(compressed) instead of appending; keep the cross-level
assertion after the loop (or move it into its own t.Run) and add a short comment
on not calling t.Parallel() here to avoid races if someone enables parallel
subtests; refer to the t.Run loop, levels, Compress and Decompress calls and the
sizes slice when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/da/client.go`:
- Around line 314-323: The decompression-error branch in Retrieve (where
da.Decompress returns decompErr) returns a ResultRetrieve without setting
BaseResult.Timestamp; update that error return to set Timestamp to the existing
block time (use the blockTime variable if available, otherwise time.Now()) so
the ResultRetrieve mirrors other error branches; modify the return constructing
datypes.ResultRetrieve / datypes.BaseResult in that decompErr branch to include
Timestamp: blockTime (or time.Now()) to ensure non-zero timestamps for callers.
- Around line 113-118: The log computes a compression ratio using
float64(len(compressed))/float64(len(raw)) which yields NaN for zero-length raw;
fix by computing the ratio before logging and guarding when len(raw) == 0 (e.g.,
set ratio := 0.0 or omit the field), then call
c.logger.Debug().Int("original_size", len(raw)).Int("compressed_size",
len(compressed)).Float64("ratio", ratio).Int("level",
int(compLevel)).Msg("compressed blob for DA submission"); this ensures no
division by zero produces NaN in the logs while keeping the same c.logger.Debug
call and the raw/compressed/compLevel identifiers.

In `@pkg/da/compression.go`:
- Around line 47-51: The zstd decoder is created without a max-memory cap
(decoder, err = zstd.NewReader(nil)), which risks OOM on untrusted DA blobs;
update the decoder initialization in pkg/da/compression.go to pass
zstd.WithDecoderMaxMemory(...) (e.g., 7*1024*1024 for a 7MB cap matching the DA
max blob size) so the decoder enforces a safe decompression memory limit.

---

Nitpick comments:
In `@block/internal/da/client.go`:
- Around line 74-79: The current client.Submit implementation computes blobSize
from the original data slices (variable blobSize) which reflects the
uncompressed size while DA submissions use compressed blobs; update Submit to
either record and return both sizes by adding a new CompressedBlobSize field to
the datypes.ResultSubmit (in addition to the existing BlobSize) and populate it
with the length of the compressed payloads before sending, or explicitly
document in the Submit comment and datatypes that BlobSize is the uncompressed
size; change references in Submit and any callers of datypes.ResultSubmit to use
the new CompressedBlobSize when estimating costs/quota.
- Around line 91-100: Extract the magic numeric thresholds used in the
compression selection switch into named constants (e.g., MinFastBatchSize = 10,
MinDefaultBatchSize = 3) or into a small config struct, then replace the literal
`10` and `3` in the switch that sets `compLevel` (which currently uses
`da.LevelBest`, `da.LevelFastest`, and `da.LevelDefault`) with those
constants/fields so the backlog-pressure policy is self-documenting and easily
tunable without changing control flow.

In `@pkg/da/compression_test.go`:
- Around line 61-81: The test currently appends to a shared sizes slice inside
each t.Run which is unsafe if subtests are parallelized; change to preallocate
sizes := make([]int, len(levels)) before the loop, capture the loop index (e.g.,
idx := i) inside the t.Run closure, and assign sizes[idx] = len(compressed)
instead of appending; keep the cross-level assertion after the loop (or move it
into its own t.Run) and add a short comment on not calling t.Parallel() here to
avoid races if someone enables parallel subtests; refer to the t.Run loop,
levels, Compress and Decompress calls and the sizes slice when making the
change.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 67e18bd and c875c0b.

⛔ Files ignored due to path filters (7)
  • apps/evm/go.sum is excluded by !**/*.sum
  • apps/grpc/go.sum is excluded by !**/*.sum
  • apps/testapp/go.sum is excluded by !**/*.sum
  • execution/evm/go.sum is excluded by !**/*.sum
  • execution/evm/test/go.sum is excluded by !**/*.sum
  • go.sum is excluded by !**/*.sum
  • test/e2e/go.sum is excluded by !**/*.sum
📒 Files selected for processing (10)
  • CLAUDE.md
  • apps/evm/go.mod
  • apps/grpc/go.mod
  • apps/testapp/go.mod
  • block/internal/da/client.go
  • execution/evm/test/go.mod
  • go.mod
  • pkg/da/compression.go
  • pkg/da/compression_test.go
  • test/e2e/go.mod

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Spamoor Trace Benchmarks'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: db89458 Previous: 212ac08 Ratio
SpamoorSmoke - DA.Submit (avg) 1089.3372093023256 us 636.5232558139535 us 1.71
SpamoorSmoke - DASubmitter.SubmitData (avg) 1761.9375 us 931.4242424242424 us 1.89

This comment was automatically generated by workflow using github-action-benchmark.

@alpe alpe marked this pull request as ready for review February 25, 2026 14:09
@alpe alpe requested a review from julienrbrt February 25, 2026 14:09
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
pkg/da/compression.go (1)

19-20: Use a single constant for decoder-memory cap and declared-size cap.

7 MiB is duplicated in two places. Centralizing avoids accidental drift between header checks and decoder limits.

♻️ Proposed refactor
 const maxDecompressedSize = 7 * 1024 * 1024 // 7 MiB
+const maxDecoderMemory = maxDecompressedSize
@@
- const maxDecoderMemory = 7 * 1024 * 1024 // 7 MiB cap
  var err error
  decoder, err = zstd.NewReader(nil, zstd.WithDecoderMaxMemory(maxDecoderMemory))

Also applies to: 64-67

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/da/compression.go` around lines 19 - 20, The duplicated 7 MiB value
should be centralized: introduce a single exported or package-level constant
(e.g., maxDecompressedSize) and use it for both the header declared-size cap and
the decoder memory cap instead of repeating the literal; update all occurrences
(including the existing const maxDecompressedSize and the checks in the
decoder/reader code paths referenced around the functions that validate header
size and set decoder limits) to reference that constant so both checks stay in
sync.
block/internal/da/client.go (1)

91-100: Externalize compression-level thresholds instead of hard-coding batch cutoffs.

The >3 / >10 cutoffs are policy knobs. Moving them into config (or constants wired from config) will make tuning easier under real traffic without code changes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/client.go` around lines 91 - 100, The compression-level
batch cutoffs are hard-coded; replace the literals used in the switch (the
len(data) > 10 and len(data) > 3 checks that set compLevel to
da.LevelFastest/da.LevelDefault/da.LevelBest) with configurable thresholds
(e.g., constants or values read from the existing config object) so operators
can tune them without code changes; update the code that computes compLevel to
read two named thresholds (e.g., fastBatchThreshold and defaultBatchThreshold)
from config or constants and use those in the switch, and add
validation/defaults for those thresholds where configuration is loaded.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/da/client.go`:
- Around line 314-323: In Retrieve/Get in client.go, don't swallow decompression
failures: when da.Decompress(ctx, b.Data()) returns an error (the branch that
currently logs via c.logger.Warn() and continues), return an error immediately
(or propagate a wrapped error) instead of skipping that blob and returning a
successful but incomplete payload; update the code paths around da.Decompress,
c.logger.Warn(), and the calling functions Retrieve/Get to propagate the
decompression error (or explicit partial-result sentinel) so callers can detect
corrupted/missing data.

In `@pkg/da/compression.go`:
- Around line 123-137: The goroutine that calls decoder.DecodeAll(payload, nil)
can continue running after Decompress returns on ctx.Done(), causing goroutine
leaks; modify the goroutine (the closure that currently writes to ch) to select
when sending the decodeResult so it respects ctx cancellation — i.e., capture
ctx in the closure, run out, err := decoder.DecodeAll(...), then use select {
case ch <- decodeResult{data: out, err: err}: case <-ctx.Done(): } so the worker
exits without blocking if the caller timed out; keep the existing select in the
caller unchanged.

---

Nitpick comments:
In `@block/internal/da/client.go`:
- Around line 91-100: The compression-level batch cutoffs are hard-coded;
replace the literals used in the switch (the len(data) > 10 and len(data) > 3
checks that set compLevel to da.LevelFastest/da.LevelDefault/da.LevelBest) with
configurable thresholds (e.g., constants or values read from the existing config
object) so operators can tune them without code changes; update the code that
computes compLevel to read two named thresholds (e.g., fastBatchThreshold and
defaultBatchThreshold) from config or constants and use those in the switch, and
add validation/defaults for those thresholds where configuration is loaded.

In `@pkg/da/compression.go`:
- Around line 19-20: The duplicated 7 MiB value should be centralized: introduce
a single exported or package-level constant (e.g., maxDecompressedSize) and use
it for both the header declared-size cap and the decoder memory cap instead of
repeating the literal; update all occurrences (including the existing const
maxDecompressedSize and the checks in the decoder/reader code paths referenced
around the functions that validate header size and set decoder limits) to
reference that constant so both checks stay in sync.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c875c0b and db89458.

📒 Files selected for processing (6)
  • block/internal/da/client.go
  • node/execution_test.go
  • pkg/da/compression.go
  • pkg/da/compression_test.go
  • pkg/da/types/namespace_test.go
  • test/e2e/failover_e2e_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/da/compression_test.go

Comment on lines +314 to +323
decompressed, decompErr := da.Decompress(ctx, b.Data())
if decompErr != nil {
c.logger.Warn().
Err(decompErr).
Uint64("height", height).
Int("blob_index", i).
Int("blob_size", len(b.Data())).
Msg("skipping malicious or corrupt DA blob")
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not return success when decompression dropped all requested data.

Retrieve/Get currently swallow decompression errors and can return a “successful” empty/partial payload with no explicit failure signal. That can mask corruption and break caller assumptions on completeness.

💡 Proposed fix
  ids := make([]datypes.ID, 0, len(blobs))
  data := make([]datypes.Blob, 0, len(blobs))
+ failedDecompress := 0

  for i, b := range blobs {
    decompressed, decompErr := da.Decompress(ctx, b.Data())
    if decompErr != nil {
+     failedDecompress++
      c.logger.Warn().
        Err(decompErr).
        Uint64("height", height).
        Int("blob_index", i).
        Int("blob_size", len(b.Data())).
        Msg("skipping malicious or corrupt DA blob")
      continue
    }
    ids = append(ids, blobrpc.MakeID(height, b.Commitment))
    data = append(data, decompressed)
  }

+ if len(data) == 0 && len(blobs) > 0 {
+   return datypes.ResultRetrieve{
+     BaseResult: datypes.BaseResult{
+       Code:      datypes.StatusError,
+       Message:   fmt.Sprintf("all blobs failed decompression (%d/%d)", failedDecompress, len(blobs)),
+       Height:    height,
+       Timestamp: blockTime,
+     },
+   }
+ }

Apply the same principle in Get: if any requested ID fails decompression, return an error (or explicitly surface partial-result semantics).

As per coding guidelines, "Return errors early".

Also applies to: 330-338, 403-413

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/client.go` around lines 314 - 323, In Retrieve/Get in
client.go, don't swallow decompression failures: when da.Decompress(ctx,
b.Data()) returns an error (the branch that currently logs via c.logger.Warn()
and continues), return an error immediately (or propagate a wrapped error)
instead of skipping that blob and returning a successful but incomplete payload;
update the code paths around da.Decompress, c.logger.Warn(), and the calling
functions Retrieve/Get to propagate the decompression error (or explicit
partial-result sentinel) so callers can detect corrupted/missing data.

Comment on lines +123 to +137
ch := make(chan decodeResult, 1)
go func() {
out, err := decoder.DecodeAll(payload, nil)
ch <- decodeResult{data: out, err: err}
}()

select {
case res := <-ch:
if res.err != nil {
return nil, fmt.Errorf("zstd decompress: %w", res.err)
}
return res.data, nil
case <-ctx.Done():
return nil, fmt.Errorf("zstd decompress timeout: %w", ctx.Err())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Timeout does not stop decode work; detached decode goroutines can pile up.

When ctx.Done() wins, Decompress returns, but the decode goroutine continues running. Under repeated timeouts, this can create unbounded background CPU work.

💡 Proposed fix (bounded concurrent decodes)
+const maxConcurrentDecodes = 8
+var decodeSem = make(chan struct{}, maxConcurrentDecodes)

 func Decompress(ctx context.Context, data []byte) ([]byte, error) {
   if !IsCompressed(data) {
     return data, nil
   }
@@
   ctx, cancel := context.WithTimeout(ctx, decompressTimeout)
   defer cancel()

+  select {
+  case decodeSem <- struct{}{}:
+  case <-ctx.Done():
+    return nil, fmt.Errorf("zstd decompress timeout: %w", ctx.Err())
+  }
+
   ch := make(chan decodeResult, 1)
   go func() {
+    defer func() { <-decodeSem }()
     out, err := decoder.DecodeAll(payload, nil)
     ch <- decodeResult{data: out, err: err}
   }()

As per coding guidelines, "Be mindful of goroutine leaks in concurrent processing code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/da/compression.go` around lines 123 - 137, The goroutine that calls
decoder.DecodeAll(payload, nil) can continue running after Decompress returns on
ctx.Done(), causing goroutine leaks; modify the goroutine (the closure that
currently writes to ch) to select when sending the decodeResult so it respects
ctx cancellation — i.e., capture ctx in the closure, run out, err :=
decoder.DecodeAll(...), then use select { case ch <- decodeResult{data: out,
err: err}: case <-ctx.Done(): } so the worker exits without blocking if the
caller timed out; keep the existing select in the caller unchanged.

Copy link
Member

@julienrbrt julienrbrt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! some questions/nits about some default constants.

// maxDecompressedSize is the maximum allowed decompressed output size.
// Matches the WithDecoderMaxMemory cap and provides early rejection
// by inspecting the zstd frame header before allocating anything.
const maxDecompressedSize = 7 * 1024 * 1024 // 7 MiB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is based on our max blob size. Could we just use that variable instead of having another one to remember here? As when we increase blob size after a celestia upgrade, we may forget to bump this.

}
encoders[i] = enc
}
const maxDecoderMemory = 7 * 1024 * 1024 // 7 MiB cap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why using this instead of maxDecompressedSize?

}

// Layer 3: Apply the shorter of caller deadline and our hard cap.
ctx, cancel := context.WithTimeout(ctx, decompressTimeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i get the idea, but can we maybe factor it by the maxDecompressedSize? so when this one increases, we expect the decrompression time to linearly increase? (or we need to plot decompression size / decompression time, to know the real link and factor it properly)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Compression to Blobs Before Submission

2 participants