Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/flashblock/block_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *NodeClient) ListenFlashBlocks(ctx context.Context) error {
return nil
}

c.l.Warnw("Retrying in %s...", "wait", retryWait)
c.l.Warnw("Retrying in", "wait", retryWait)
time.Sleep(retryWait)
retryWait *= 2
if retryWait > maxRetryWait {
Expand Down
38 changes: 38 additions & 0 deletions pkg/flashblock/block_listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// nolint: testpackage
package flashblock

import (
"context"
"testing"
"time"

"go.uber.org/zap"
)

const baseNode = "wss://mainnet.flashblocks.base.org/ws"

func TestNodeListenFlashBlock(t *testing.T) {
t.Skip("skip for CI")

logger, _ := zap.NewDevelopment()
defer logger.Sync() //nolint:errcheck
zap.ReplaceGlobals(logger)

publisher := &logPublisher{l: zap.S()}

client, err := NewNodeListenerClient(NodeBlockListenerConfig{
WebSocketURL: baseNode,
AuthHeader: authHeader,
}, publisher, NodeDataSource)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err = client.ListenFlashBlocks(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
4 changes: 4 additions & 0 deletions pkg/flashblock/block_route_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
// GetParsedBdnFlashBlockStreamRequest is the request message
type GetParsedBdnFlashBlockStreamRequest struct{}

type GetBdnFlashBlockStreamResponse struct {
BdnFlashBlock []byte `json:"bdnFlashBlock"`
}

// GetParsedBdnFlashBlockStreamResponse represents a parsed flashblock response
type GetParsedBdnFlashBlockStreamResponse struct {
PayloadId string `json:"payloadId"`
Expand Down
125 changes: 86 additions & 39 deletions pkg/flashblock/blox_route_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"go.uber.org/zap"
)

type messageHandler func(ctx context.Context, data []byte)

// Config holds the configuration for the flashblock client
type Config struct {
WebSocketURL string
Expand Down Expand Up @@ -47,8 +49,8 @@ type WebSocketMessage struct {
Params []interface{} `json:"params"`
}

// WebSocketResponse represents the WebSocket response
type WebSocketResponse struct {
// WebSocketParsedBdnFlashBlockResponse represents the WebSocket response
type WebSocketParsedBdnFlashBlockResponse struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params struct {
Expand All @@ -57,7 +59,16 @@ type WebSocketResponse struct {
} `json:"params"`
}

// NewNodeListenerClient creates a new bloxroute client
type WebSocketFlashBlockResponse struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params struct {
Subscription string `json:"subscription"`
Result *GetBdnFlashBlockStreamResponse `json:"result"`
} `json:"params"`
}

// NewBloxRouteClient creates a new bloxroute client
func NewBloxRouteClient(config Config, publisher Publisher) (*Client, error) {
if err := config.validate(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
Expand All @@ -70,17 +81,30 @@ func NewBloxRouteClient(config Config, publisher Publisher) (*Client, error) {
}, nil
}

// Start starts the bloxroute stream listener
func (c *Client) Start(ctx context.Context) error {
if err := c.Listen(ctx); err != nil {
c.l.Errorw("Failed to listen for bloxroute", "error", err)
func (c *Client) ListenParsedBdnFlashBlock(ctx context.Context) error {
subscribeMsg := WebSocketMessage{
JSONRPC: "2.0",
ID: 1,
Method: "subscribe",
Params: []interface{}{"GetParsedBdnFlashBlockStream", map[string]interface{}{}},
}

return nil
return c.Listen(ctx, subscribeMsg, c.processParsedFlashblock)
}

func (c *Client) ListenFlashBlock(ctx context.Context) error {
subscribeMsg := WebSocketMessage{
JSONRPC: "2.0",
ID: 1,
Method: "subscribe",
Params: []interface{}{"GetBdnFlashBlockStream", map[string]interface{}{}},
}

return c.Listen(ctx, subscribeMsg, c.processFlashBlock)
}

// Listen connects to the WebSocket stream and listens for bloxroute with automatic retry
func (c *Client) Listen(ctx context.Context) error {
func (c *Client) Listen(ctx context.Context, subscribeMsg interface{}, messageHandler messageHandler) error {
c.l.Infow("Starting bloxroute block listener with retry", "url", c.config.WebSocketURL)

retryWait := 3 * time.Second
Expand All @@ -96,7 +120,7 @@ func (c *Client) Listen(ctx context.Context) error {
c.l.Info("Context cancelled, stopping flashblock listener")
return ctx.Err()
default:
err := c.connectAndListen(ctx, resetRetryWait)
err := c.connectAndListen(ctx, resetRetryWait, subscribeMsg, messageHandler)
if err == nil {
return nil // Normal exit
}
Expand Down Expand Up @@ -130,7 +154,8 @@ func (c *Client) Listen(ctx context.Context) error {
}

// connectAndListen establishes connection and listens for bloxroute
func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func()) error {
// nolint: lll
func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func(), subscribeMsg interface{}, messageHandler messageHandler) error {
c.l.Infow("Connecting to flashblock stream", "url", c.config.WebSocketURL)

// Create WebSocket connection with authorization header
Expand All @@ -144,14 +169,6 @@ func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func()) e
c.conn = conn
defer conn.Close()

// Subscribe to GetParsedBdnFlashBlockStream
subscribeMsg := WebSocketMessage{
JSONRPC: "2.0",
ID: 1,
Method: "subscribe",
Params: []interface{}{"GetParsedBdnFlashBlockStream", map[string]interface{}{}},
}

if err := conn.WriteJSON(subscribeMsg); err != nil {
return fmt.Errorf("failed to send subscription message: %w", err)
}
Expand Down Expand Up @@ -195,24 +212,13 @@ func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func()) e
c.l.Info("Context cancelled, stopping flashblock listener")
return ctx.Err()
default:
var response WebSocketResponse
_, data, err := conn.ReadMessage()
if err != nil {
c.l.Errorw("Error reading bloxroute flashblock", "error", err, "data", data)
continue
}

if err := json.Unmarshal(data, &response); err != nil {
c.l.Errorw("Error unmarshaling bloxroute flashblock", "error", err, "data", string(data))
}

if response.Params.Result == nil {
// First message is subscription confirmation
c.l.Debugw("Received subscription confirmation")
continue
}

c.processFlashblock(ctx, *response.Params.Result)
messageHandler(ctx, data)
}
}
}
Expand Down Expand Up @@ -251,18 +257,30 @@ func (c *Client) isRetryableError(err error) bool {
return true
}

// processFlashblock processes a received flashblock and publishes to subscribers
func (c *Client) processFlashblock(ctx context.Context, response GetParsedBdnFlashBlockStreamResponse) {
if response.Metadata == nil {
// processParsedFlashblock processes a received flashblock and publishes to subscribers
// nolint: lll
func (c *Client) processParsedFlashblock(ctx context.Context, data []byte) {
var response WebSocketParsedBdnFlashBlockResponse
if err := json.Unmarshal(data, &response); err != nil {
c.l.Errorw("Error unmarshaling bloxroute flashblock", "error", err, "data", string(data))
}

if response.Params.Result == nil {
// First message is subscription confirmation
c.l.Debugw("Received subscription confirmation")
return
}

if response.Params.Result.Metadata == nil {
return
}

blockNumber := response.Metadata.BlockNumber
c.l.Debugw("Processing flashblock", "blockNumber", blockNumber, "index", response.Index)
blockNumber := response.Params.Result.Metadata.BlockNumber
c.l.Debugw("Processing flashblock", "blockNumber", blockNumber, "index", response.Params.Result.Index)

flashBlock, err := convertBlockRouteFlashBlock(response)
flashBlock, err := convertBloxRouteFlashBlock(*response.Params.Result)
if err != nil {
c.l.Errorw("Error converting bloxroute flashblock", "error", err, "blockNumber", blockNumber, "index", response.Index)
c.l.Errorw("Error converting bloxroute flashblock", "error", err, "blockNumber", blockNumber, "index", response.Params.Result.Index)
return
}
// Publish the flashblock data to subscribers
Expand All @@ -273,7 +291,7 @@ func (c *Client) processFlashblock(ctx context.Context, response GetParsedBdnFla
}
}

func convertBlockRouteFlashBlock(bloxrouteFlashblock GetParsedBdnFlashBlockStreamResponse) (Flashblock, error) {
func convertBloxRouteFlashBlock(bloxrouteFlashblock GetParsedBdnFlashBlockStreamResponse) (Flashblock, error) {
index, err := strconv.ParseInt(bloxrouteFlashblock.Index, 10, 64)
if err != nil {
return Flashblock{}, fmt.Errorf("invalid index: %w value %v", err, index)
Expand All @@ -286,3 +304,32 @@ func convertBlockRouteFlashBlock(bloxrouteFlashblock GetParsedBdnFlashBlockStrea
Metadata: convertBloxRouteMetadataToFlashblockMeta(bloxrouteFlashblock.Metadata),
}, nil
}

func (c *Client) processFlashBlock(ctx context.Context, data []byte) {
var response WebSocketFlashBlockResponse
if err := json.Unmarshal(data, &response); err != nil {
c.l.Errorw("Error unmarshaling bloxroute flashblock", "error", err, "data", string(data))
}

if response.Params.Result == nil {
// First message is subscription confirmation
c.l.Debugw("Received subscription confirmation")
return
}

jsonBytes, err := DecompressBrotli(response.Params.Result.BdnFlashBlock)
if err != nil {
c.l.Errorw("Error decompressing flashblock", "error", err)
return
}

var flashBlock Flashblock
if err := json.Unmarshal(jsonBytes, &flashBlock); err != nil {
c.l.Errorw("Error parsing flashblock", "error", err, "data", string(jsonBytes))
return
}

if err := c.publisher.Publish(ctx, BloxRouteDataSource, flashBlock); err != nil {
c.l.Errorw("Error publishing flashblock", "error", err)
}
}
81 changes: 81 additions & 0 deletions pkg/flashblock/blox_route_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// nolint: testpackage
package flashblock

import (
"context"
"os"
"testing"
"time"

"go.uber.org/zap"
)

var (
bloxRouteWsUrl = "wss://base.blxrbdn.com:5005/ws"
authHeader = os.Getenv("AUTH_HEADER")
)

type logPublisher struct {
l *zap.SugaredLogger
}

func (p *logPublisher) Publish(_ context.Context, source DataSource, data Flashblock) error {
p.l.Infow("received flashblock", "source", source, "payloadID", data.PayloadID, "index", data.Index, "blockNumber", data.Metadata.BlockNumber)
return nil
}

func TestListenFlashBlock(t *testing.T) {
if authHeader == "" {
t.Skip("AUTH_HEADER env not set")
}

logger, _ := zap.NewDevelopment()
defer logger.Sync() //nolint:errcheck
zap.ReplaceGlobals(logger)

publisher := &logPublisher{l: zap.S()}

client, err := NewBloxRouteClient(Config{
WebSocketURL: bloxRouteWsUrl,
AuthHeader: authHeader,
}, publisher)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err = client.ListenFlashBlock(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}

func TestListenParsedBdnFlashBlock(t *testing.T) {
if authHeader == "" {
t.Skip("AUTH_HEADER env not set")
}

logger, _ := zap.NewDevelopment()
defer logger.Sync() //nolint:errcheck
zap.ReplaceGlobals(logger)

publisher := &logPublisher{l: zap.S()}

client, err := NewBloxRouteClient(Config{
WebSocketURL: bloxRouteWsUrl,
AuthHeader: authHeader,
}, publisher)
if err != nil {
t.Fatalf("failed to create client: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err = client.ListenParsedBdnFlashBlock(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
1 change: 0 additions & 1 deletion pkg/flashblock/flash_bock.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type FlashblockMeta struct {
Receipts map[common.Hash]*Receipt `json:"receipts"`
}

// --- Unmarshal Implementations ---
func (b *FlashblockBase) UnmarshalJSON(data []byte) error {
type Alias FlashblockBase
aux := &struct {
Expand Down
Loading