diff --git a/pkg/flashblock/block_listener.go b/pkg/flashblock/block_listener.go index 9c3d3a3..a6a9a23 100644 --- a/pkg/flashblock/block_listener.go +++ b/pkg/flashblock/block_listener.go @@ -72,7 +72,7 @@ func (c *NodeClient) ListenFlashBlocks(ctx context.Context) error { return nil } - c.l.Warnw("Retrying in %s...", "wait", retryWait) + c.l.Warnw("Retrying in", "wait", retryWait) time.Sleep(retryWait) retryWait *= 2 if retryWait > maxRetryWait { diff --git a/pkg/flashblock/block_listener_test.go b/pkg/flashblock/block_listener_test.go new file mode 100644 index 0000000..d9c64a6 --- /dev/null +++ b/pkg/flashblock/block_listener_test.go @@ -0,0 +1,38 @@ +// nolint: testpackage +package flashblock + +import ( + "context" + "testing" + "time" + + "go.uber.org/zap" +) + +const baseNode = "wss://mainnet.flashblocks.base.org/ws" + +func TestNodeListenFlashBlock(t *testing.T) { + t.Skip("skip for CI") + + logger, _ := zap.NewDevelopment() + defer logger.Sync() //nolint:errcheck + zap.ReplaceGlobals(logger) + + publisher := &logPublisher{l: zap.S()} + + client, err := NewNodeListenerClient(NodeBlockListenerConfig{ + WebSocketURL: baseNode, + AuthHeader: authHeader, + }, publisher, NodeDataSource) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err = client.ListenFlashBlocks(ctx) + if err != nil { + t.Errorf("unexpected error: %v", err) + } +} diff --git a/pkg/flashblock/block_route_types.go b/pkg/flashblock/block_route_types.go index 2d1f9b3..376ec11 100644 --- a/pkg/flashblock/block_route_types.go +++ b/pkg/flashblock/block_route_types.go @@ -15,6 +15,10 @@ import ( // GetParsedBdnFlashBlockStreamRequest is the request message type GetParsedBdnFlashBlockStreamRequest struct{} +type GetBdnFlashBlockStreamResponse struct { + BdnFlashBlock []byte `json:"bdnFlashBlock"` +} + // GetParsedBdnFlashBlockStreamResponse represents a parsed flashblock response type GetParsedBdnFlashBlockStreamResponse struct { PayloadId string `json:"payloadId"` diff --git a/pkg/flashblock/blox_route_client.go b/pkg/flashblock/blox_route_client.go index 892cbaf..8ce614d 100644 --- a/pkg/flashblock/blox_route_client.go +++ b/pkg/flashblock/blox_route_client.go @@ -13,6 +13,8 @@ import ( "go.uber.org/zap" ) +type messageHandler func(ctx context.Context, data []byte) + // Config holds the configuration for the flashblock client type Config struct { WebSocketURL string @@ -47,8 +49,8 @@ type WebSocketMessage struct { Params []interface{} `json:"params"` } -// WebSocketResponse represents the WebSocket response -type WebSocketResponse struct { +// WebSocketParsedBdnFlashBlockResponse represents the WebSocket response +type WebSocketParsedBdnFlashBlockResponse struct { JSONRPC string `json:"jsonrpc"` Method string `json:"method"` Params struct { @@ -57,7 +59,16 @@ type WebSocketResponse struct { } `json:"params"` } -// NewNodeListenerClient creates a new bloxroute client +type WebSocketFlashBlockResponse struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params struct { + Subscription string `json:"subscription"` + Result *GetBdnFlashBlockStreamResponse `json:"result"` + } `json:"params"` +} + +// NewBloxRouteClient creates a new bloxroute client func NewBloxRouteClient(config Config, publisher Publisher) (*Client, error) { if err := config.validate(); err != nil { return nil, fmt.Errorf("invalid config: %w", err) @@ -70,17 +81,30 @@ func NewBloxRouteClient(config Config, publisher Publisher) (*Client, error) { }, nil } -// Start starts the bloxroute stream listener -func (c *Client) Start(ctx context.Context) error { - if err := c.Listen(ctx); err != nil { - c.l.Errorw("Failed to listen for bloxroute", "error", err) +func (c *Client) ListenParsedBdnFlashBlock(ctx context.Context) error { + subscribeMsg := WebSocketMessage{ + JSONRPC: "2.0", + ID: 1, + Method: "subscribe", + Params: []interface{}{"GetParsedBdnFlashBlockStream", map[string]interface{}{}}, } - return nil + return c.Listen(ctx, subscribeMsg, c.processParsedFlashblock) +} + +func (c *Client) ListenFlashBlock(ctx context.Context) error { + subscribeMsg := WebSocketMessage{ + JSONRPC: "2.0", + ID: 1, + Method: "subscribe", + Params: []interface{}{"GetBdnFlashBlockStream", map[string]interface{}{}}, + } + + return c.Listen(ctx, subscribeMsg, c.processFlashBlock) } // Listen connects to the WebSocket stream and listens for bloxroute with automatic retry -func (c *Client) Listen(ctx context.Context) error { +func (c *Client) Listen(ctx context.Context, subscribeMsg interface{}, messageHandler messageHandler) error { c.l.Infow("Starting bloxroute block listener with retry", "url", c.config.WebSocketURL) retryWait := 3 * time.Second @@ -96,7 +120,7 @@ func (c *Client) Listen(ctx context.Context) error { c.l.Info("Context cancelled, stopping flashblock listener") return ctx.Err() default: - err := c.connectAndListen(ctx, resetRetryWait) + err := c.connectAndListen(ctx, resetRetryWait, subscribeMsg, messageHandler) if err == nil { return nil // Normal exit } @@ -130,7 +154,8 @@ func (c *Client) Listen(ctx context.Context) error { } // connectAndListen establishes connection and listens for bloxroute -func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func()) error { +// nolint: lll +func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func(), subscribeMsg interface{}, messageHandler messageHandler) error { c.l.Infow("Connecting to flashblock stream", "url", c.config.WebSocketURL) // Create WebSocket connection with authorization header @@ -144,14 +169,6 @@ func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func()) e c.conn = conn defer conn.Close() - // Subscribe to GetParsedBdnFlashBlockStream - subscribeMsg := WebSocketMessage{ - JSONRPC: "2.0", - ID: 1, - Method: "subscribe", - Params: []interface{}{"GetParsedBdnFlashBlockStream", map[string]interface{}{}}, - } - if err := conn.WriteJSON(subscribeMsg); err != nil { return fmt.Errorf("failed to send subscription message: %w", err) } @@ -195,24 +212,13 @@ func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func()) e c.l.Info("Context cancelled, stopping flashblock listener") return ctx.Err() default: - var response WebSocketResponse _, data, err := conn.ReadMessage() if err != nil { c.l.Errorw("Error reading bloxroute flashblock", "error", err, "data", data) continue } - if err := json.Unmarshal(data, &response); err != nil { - c.l.Errorw("Error unmarshaling bloxroute flashblock", "error", err, "data", string(data)) - } - - if response.Params.Result == nil { - // First message is subscription confirmation - c.l.Debugw("Received subscription confirmation") - continue - } - - c.processFlashblock(ctx, *response.Params.Result) + messageHandler(ctx, data) } } } @@ -251,18 +257,30 @@ func (c *Client) isRetryableError(err error) bool { return true } -// processFlashblock processes a received flashblock and publishes to subscribers -func (c *Client) processFlashblock(ctx context.Context, response GetParsedBdnFlashBlockStreamResponse) { - if response.Metadata == nil { +// processParsedFlashblock processes a received flashblock and publishes to subscribers +// nolint: lll +func (c *Client) processParsedFlashblock(ctx context.Context, data []byte) { + var response WebSocketParsedBdnFlashBlockResponse + if err := json.Unmarshal(data, &response); err != nil { + c.l.Errorw("Error unmarshaling bloxroute flashblock", "error", err, "data", string(data)) + } + + if response.Params.Result == nil { + // First message is subscription confirmation + c.l.Debugw("Received subscription confirmation") + return + } + + if response.Params.Result.Metadata == nil { return } - blockNumber := response.Metadata.BlockNumber - c.l.Debugw("Processing flashblock", "blockNumber", blockNumber, "index", response.Index) + blockNumber := response.Params.Result.Metadata.BlockNumber + c.l.Debugw("Processing flashblock", "blockNumber", blockNumber, "index", response.Params.Result.Index) - flashBlock, err := convertBlockRouteFlashBlock(response) + flashBlock, err := convertBloxRouteFlashBlock(*response.Params.Result) if err != nil { - c.l.Errorw("Error converting bloxroute flashblock", "error", err, "blockNumber", blockNumber, "index", response.Index) + c.l.Errorw("Error converting bloxroute flashblock", "error", err, "blockNumber", blockNumber, "index", response.Params.Result.Index) return } // Publish the flashblock data to subscribers @@ -273,7 +291,7 @@ func (c *Client) processFlashblock(ctx context.Context, response GetParsedBdnFla } } -func convertBlockRouteFlashBlock(bloxrouteFlashblock GetParsedBdnFlashBlockStreamResponse) (Flashblock, error) { +func convertBloxRouteFlashBlock(bloxrouteFlashblock GetParsedBdnFlashBlockStreamResponse) (Flashblock, error) { index, err := strconv.ParseInt(bloxrouteFlashblock.Index, 10, 64) if err != nil { return Flashblock{}, fmt.Errorf("invalid index: %w value %v", err, index) @@ -286,3 +304,32 @@ func convertBlockRouteFlashBlock(bloxrouteFlashblock GetParsedBdnFlashBlockStrea Metadata: convertBloxRouteMetadataToFlashblockMeta(bloxrouteFlashblock.Metadata), }, nil } + +func (c *Client) processFlashBlock(ctx context.Context, data []byte) { + var response WebSocketFlashBlockResponse + if err := json.Unmarshal(data, &response); err != nil { + c.l.Errorw("Error unmarshaling bloxroute flashblock", "error", err, "data", string(data)) + } + + if response.Params.Result == nil { + // First message is subscription confirmation + c.l.Debugw("Received subscription confirmation") + return + } + + jsonBytes, err := DecompressBrotli(response.Params.Result.BdnFlashBlock) + if err != nil { + c.l.Errorw("Error decompressing flashblock", "error", err) + return + } + + var flashBlock Flashblock + if err := json.Unmarshal(jsonBytes, &flashBlock); err != nil { + c.l.Errorw("Error parsing flashblock", "error", err, "data", string(jsonBytes)) + return + } + + if err := c.publisher.Publish(ctx, BloxRouteDataSource, flashBlock); err != nil { + c.l.Errorw("Error publishing flashblock", "error", err) + } +} diff --git a/pkg/flashblock/blox_route_client_test.go b/pkg/flashblock/blox_route_client_test.go new file mode 100644 index 0000000..8a77cdc --- /dev/null +++ b/pkg/flashblock/blox_route_client_test.go @@ -0,0 +1,81 @@ +// nolint: testpackage +package flashblock + +import ( + "context" + "os" + "testing" + "time" + + "go.uber.org/zap" +) + +var ( + bloxRouteWsUrl = "wss://base.blxrbdn.com:5005/ws" + authHeader = os.Getenv("AUTH_HEADER") +) + +type logPublisher struct { + l *zap.SugaredLogger +} + +func (p *logPublisher) Publish(_ context.Context, source DataSource, data Flashblock) error { + p.l.Infow("received flashblock", "source", source, "payloadID", data.PayloadID, "index", data.Index, "blockNumber", data.Metadata.BlockNumber) + return nil +} + +func TestListenFlashBlock(t *testing.T) { + if authHeader == "" { + t.Skip("AUTH_HEADER env not set") + } + + logger, _ := zap.NewDevelopment() + defer logger.Sync() //nolint:errcheck + zap.ReplaceGlobals(logger) + + publisher := &logPublisher{l: zap.S()} + + client, err := NewBloxRouteClient(Config{ + WebSocketURL: bloxRouteWsUrl, + AuthHeader: authHeader, + }, publisher) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err = client.ListenFlashBlock(ctx) + if err != nil { + t.Errorf("unexpected error: %v", err) + } +} + +func TestListenParsedBdnFlashBlock(t *testing.T) { + if authHeader == "" { + t.Skip("AUTH_HEADER env not set") + } + + logger, _ := zap.NewDevelopment() + defer logger.Sync() //nolint:errcheck + zap.ReplaceGlobals(logger) + + publisher := &logPublisher{l: zap.S()} + + client, err := NewBloxRouteClient(Config{ + WebSocketURL: bloxRouteWsUrl, + AuthHeader: authHeader, + }, publisher) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err = client.ListenParsedBdnFlashBlock(ctx) + if err != nil { + t.Errorf("unexpected error: %v", err) + } +} diff --git a/pkg/flashblock/flash_bock.go b/pkg/flashblock/flash_bock.go index cbe81bf..f6cd746 100644 --- a/pkg/flashblock/flash_bock.go +++ b/pkg/flashblock/flash_bock.go @@ -38,7 +38,6 @@ type FlashblockMeta struct { Receipts map[common.Hash]*Receipt `json:"receipts"` } -// --- Unmarshal Implementations --- func (b *FlashblockBase) UnmarshalJSON(data []byte) error { type Alias FlashblockBase aux := &struct {