Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package core

import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

Expand All @@ -35,3 +38,10 @@ type ChainEvent struct {
type ChainHeadEvent struct {
Header *types.Header
}

// NewPayloadEvent is posted when engine_newPayloadVX processes a block.
type NewPayloadEvent struct {
Hash common.Hash
Number uint64
ProcessingTime time.Duration
}
5 changes: 5 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ func (b *EthAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) e
return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
}

// SubscribeNewPayloadEvent registers a subscription for NewPayloadEvent.
func (b *EthAPIBackend) SubscribeNewPayloadEvent(ch chan<- core.NewPayloadEvent) event.Subscription {
return b.eth.SubscribeNewPayloadEvent(ch)
}

func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}
Expand Down
12 changes: 12 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ type Ethereum struct {
lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)

shutdownTracker *shutdowncheck.ShutdownTracker // Tracks if and when the node has shutdown ungracefully

newPayloadFeed event.Feed // Feed for engine API newPayload events
}

// New creates a new Ethereum object (including the initialisation of the common Ethereum object),
Expand Down Expand Up @@ -428,6 +430,16 @@ func (s *Ethereum) Synced() bool { return s.handler.synced
func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }

// SubscribeNewPayloadEvent registers a subscription for NewPayloadEvent.
func (s *Ethereum) SubscribeNewPayloadEvent(ch chan<- core.NewPayloadEvent) event.Subscription {
return s.newPayloadFeed.Subscribe(ch)
}

// SendNewPayloadEvent sends a NewPayloadEvent to subscribers.
func (s *Ethereum) SendNewPayloadEvent(ev core.NewPayloadEvent) {
s.newPayloadFeed.Send(ev)
}

// Protocols returns all the currently configured
// network protocols to start.
func (s *Ethereum) Protocols() []p2p.Protocol {
Expand Down
10 changes: 10 additions & 0 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
Expand Down Expand Up @@ -773,7 +774,9 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
return engine.PayloadStatusV1{Status: engine.ACCEPTED}, nil
}
log.Trace("Inserting block without sethead", "hash", block.Hash(), "number", block.Number())
start := time.Now()
proofs, err := api.eth.BlockChain().InsertBlockWithoutSetHead(block, witness)
processingTime := time.Since(start)
if err != nil {
log.Warn("NewPayload: inserting block failed", "error", err)

Expand All @@ -786,6 +789,13 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
}
hash := block.Hash()

// Emit NewPayloadEvent for ethstats reporting
api.eth.SendNewPayloadEvent(core.NewPayloadEvent{
Hash: hash,
Number: block.NumberU64(),
ProcessingTime: processingTime,
})

// If witness collection was requested, inject that into the result too
var ow *hexutil.Bytes
if proofs != nil {
Expand Down
60 changes: 53 additions & 7 deletions ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
type backend interface {
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription
SubscribeNewPayloadEvent(ch chan<- core.NewPayloadEvent) event.Subscription
CurrentHeader() *types.Header
HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error)
Stats() (pending int, queued int)
Expand Down Expand Up @@ -92,8 +93,9 @@ type Service struct {
pongCh chan struct{} // Pong notifications are fed into this channel
histCh chan []uint64 // History request block numbers are fed into this channel

headSub event.Subscription
txSub event.Subscription
headSub event.Subscription
txSub event.Subscription
newPayloadSub event.Subscription
}

// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
Expand Down Expand Up @@ -198,7 +200,9 @@ func (s *Service) Start() error {
s.headSub = s.backend.SubscribeChainHeadEvent(chainHeadCh)
txEventCh := make(chan core.NewTxsEvent, txChanSize)
s.txSub = s.backend.SubscribeNewTxsEvent(txEventCh)
go s.loop(chainHeadCh, txEventCh)
newPayloadCh := make(chan core.NewPayloadEvent, chainHeadChanSize)
s.newPayloadSub = s.backend.SubscribeNewPayloadEvent(newPayloadCh)
go s.loop(chainHeadCh, txEventCh, newPayloadCh)

log.Info("Stats daemon started")
return nil
Expand All @@ -208,18 +212,20 @@ func (s *Service) Start() error {
func (s *Service) Stop() error {
s.headSub.Unsubscribe()
s.txSub.Unsubscribe()
s.newPayloadSub.Unsubscribe()
log.Info("Stats daemon stopped")
return nil
}

// loop keeps trying to connect to the netstats server, reporting chain events
// until termination.
func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core.NewTxsEvent) {
func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core.NewTxsEvent, newPayloadCh chan core.NewPayloadEvent) {
// Start a goroutine that exhausts the subscriptions to avoid events piling up
var (
quitCh = make(chan struct{})
headCh = make(chan *types.Header, 1)
txCh = make(chan struct{}, 1)
quitCh = make(chan struct{})
headCh = make(chan *types.Header, 1)
txCh = make(chan struct{}, 1)
newPayloadEvCh = make(chan core.NewPayloadEvent, 1)
)
go func() {
var lastTx mclock.AbsTime
Expand All @@ -246,11 +252,20 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core
default:
}

// Notify of new payload events, but drop if too frequent
case ev := <-newPayloadCh:
select {
case newPayloadEvCh <- ev:
default:
}

// node stopped
case <-s.txSub.Err():
break HandleLoop
case <-s.headSub.Err():
break HandleLoop
case <-s.newPayloadSub.Err():
break HandleLoop
}
}
close(quitCh)
Expand Down Expand Up @@ -336,6 +351,10 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core
if err = s.reportPending(conn); err != nil {
log.Warn("Post-block transaction stats report failed", "err", err)
}
case ev := <-newPayloadEvCh:
if err = s.reportNewPayload(conn, ev); err != nil {
log.Warn("New payload stats report failed", "err", err)
}
case <-txCh:
if err = s.reportPending(conn); err != nil {
log.Warn("Transaction stats report failed", "err", err)
Expand Down Expand Up @@ -600,6 +619,33 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
return []byte("[]"), nil
}

// newPayloadStats is the information to report about new payload events.
type newPayloadStats struct {
Number uint64 `json:"number"`
Hash common.Hash `json:"hash"`
ProcessingTime uint64 `json:"processingTime"` // nanoseconds
}

// reportNewPayload reports a new payload event to the stats server.
func (s *Service) reportNewPayload(conn *connWrapper, ev core.NewPayloadEvent) error {
details := &newPayloadStats{
Number: ev.Number,
Hash: ev.Hash,
ProcessingTime: uint64(ev.ProcessingTime.Nanoseconds()),
}

log.Trace("Sending new payload to ethstats", "number", details.Number, "hash", details.Hash)

stats := map[string]interface{}{
"id": s.node,
"block": details,
}
report := map[string][]interface{}{
"emit": {"block_new_payload", stats},
}
return conn.WriteJSON(report)
}

// reportBlock retrieves the current chain head and reports it to the stats server.
func (s *Service) reportBlock(conn *connWrapper, header *types.Header) error {
// Gather the block details from the header or block chain
Expand Down