diff --git a/sei-tendermint/config/config.go b/sei-tendermint/config/config.go index 18736cb333..a435d41fed 100644 --- a/sei-tendermint/config/config.go +++ b/sei-tendermint/config/config.go @@ -652,7 +652,7 @@ type P2PConfig struct { // MaxOutboundConnections limits the number of outbound connections to regular (non-persistent) peers. // It should be significantly lower than MaxConnections, unless // the node is supposed to have a small number of connections altogether. - MaxOutboundConnections uint + MaxOutboundConnections *uint `mapstructure:"max-outbound-connections"` // MaxIncomingConnectionAttempts rate limits the number of incoming connection // attempts per IP address. @@ -703,14 +703,10 @@ type P2PConfig struct { // DefaultP2PConfig returns a default configuration for the peer-to-peer layer func DefaultP2PConfig() *P2PConfig { return &P2PConfig{ - ListenAddress: "tcp://127.0.0.1:26656", - ExternalAddress: "", - UPNP: false, - MaxConnections: 100, - // TODO(gprusak): decrease to 10, once PEX is improved to: - // * exchange both inbound and outbound connections information - // * exchange information on handshake as well. - MaxOutboundConnections: 100, + ListenAddress: "tcp://127.0.0.1:26656", + ExternalAddress: "", + UPNP: false, + MaxConnections: 100, MaxIncomingConnectionAttempts: 100, FlushThrottleTimeout: 100 * time.Millisecond, MaxPacketMsgPayloadSize: 1000000, diff --git a/sei-tendermint/crypto/ed25519/ed25519.go b/sei-tendermint/crypto/ed25519/ed25519.go index 01475c8b2f..2e2ff545d7 100644 --- a/sei-tendermint/crypto/ed25519/ed25519.go +++ b/sei-tendermint/crypto/ed25519/ed25519.go @@ -60,13 +60,14 @@ func SecretKeyFromSecretBytes(b []byte) (SecretKey, error) { if got, want := len(b), ed25519.PrivateKeySize; got != want { return SecretKey{}, fmt.Errorf("ed25519: bad private key length: got %d, want %d", got, want) } - raw := utils.Alloc([ed25519.PrivateKeySize]byte(b)) - runtime.AddCleanup(&raw, func(int) { + type Secret = [ed25519.PrivateKeySize]byte + raw := utils.Alloc(Secret(b)) + runtime.AddCleanup(&raw, func(raw *Secret) { // Zero the memory to avoid leaking the secret. for i := range raw { raw[i] = 0 } - }, 0) + }, raw) key := SecretKey{key: &raw} // Zero the input slice to avoid leaking the secret. for i := range b { diff --git a/sei-tendermint/internal/p2p/address.go b/sei-tendermint/internal/p2p/address.go index 61fdf1776e..12f48ef0a2 100644 --- a/sei-tendermint/internal/p2p/address.go +++ b/sei-tendermint/internal/p2p/address.go @@ -35,21 +35,6 @@ type NodeAddress struct { Port uint16 } -var cgnat = netip.MustParsePrefix("100.64.0.0/10") - -// IsPublic checks if the address is routable from the public internet. -// It is good enough to exclude internal addresses of cloud providers. -// As a simplification, it treats non-IP Hostnames (DNS addresses) as public. -// TODO(gprusak): DNS addresses should be eliminated from PEX entirely - all -// addresses should be resolved locally and only then advertised to peers. -func (a NodeAddress) IsPublic() bool { - ip, err := netip.ParseAddr(a.Hostname) - if err != nil { - return true - } - return ip.IsGlobalUnicast() && !ip.IsPrivate() && !cgnat.Contains(ip.Unmap()) -} - // ParseNodeAddress parses a node address URL into a NodeAddress, normalizing // and validating it. func ParseNodeAddress(urlString string) (NodeAddress, error) { diff --git a/sei-tendermint/internal/p2p/address_test.go b/sei-tendermint/internal/p2p/address_test.go index 071436ff6f..7a8004a83a 100644 --- a/sei-tendermint/internal/p2p/address_test.go +++ b/sei-tendermint/internal/p2p/address_test.go @@ -5,7 +5,6 @@ import ( "testing" "github.com/sei-protocol/sei-chain/sei-tendermint/crypto/ed25519" - "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/tcp" "github.com/sei-protocol/sei-chain/sei-tendermint/types" @@ -278,21 +277,6 @@ func TestNodeAddress_String(t *testing.T) { } } -func TestNodeAddress_IsPublic(t *testing.T) { - rng := utils.TestRng() - id := makeNodeID(rng) - testcases := map[string]bool{ - "192.168.1.10": false, - "93.184.216.34": true, - "example.com": true, - "100.64.0.1": false, - } - for hostname, isPublic := range testcases { - addr := NodeAddress{NodeID: id, Hostname: hostname, Port: defaultPort} - require.Equal(t, isPublic, addr.IsPublic()) - } -} - func TestNodeAddress_Validate(t *testing.T) { id := types.NodeID("00112233445566778899aabbccddeeff00112233") testcases := []struct { diff --git a/sei-tendermint/internal/p2p/channel.go b/sei-tendermint/internal/p2p/channel.go index 3e49f8177a..4507293f79 100644 --- a/sei-tendermint/internal/p2p/channel.go +++ b/sei-tendermint/internal/p2p/channel.go @@ -79,12 +79,12 @@ func (ch *Channel[T]) send(msg T, queues ...*Queue[sendMsg]) { } func (ch *Channel[T]) Send(msg T, to types.NodeID) { - c, ok := ch.router.peerManager.Conns().Get(to) + c, ok := GetAny(ch.router.peerManager.Conns(), to) if !ok { logger.Debug("dropping message for unconnected peer", "peer", to, "channel", ch.desc.ID) return } - if _, contains := c.peerChannels[ch.desc.ID]; !contains { + if _, contains := c.Channels[ch.desc.ID]; !contains { // reactor tried to send a message across a channel that the // peer doesn't have available. This is a known issue due to // how peer subscriptions work: @@ -98,7 +98,7 @@ func (ch *Channel[T]) Send(msg T, to types.NodeID) { func (ch *Channel[T]) Broadcast(msg T) { var queues []*Queue[sendMsg] for _, c := range ch.router.peerManager.Conns().All() { - if _, ok := c.peerChannels[ch.desc.ID]; ok { + if _, ok := c.Channels[ch.desc.ID]; ok { queues = append(queues, c.sendQueue) } } diff --git a/sei-tendermint/internal/p2p/peermanager.go b/sei-tendermint/internal/p2p/peermanager.go index 81b84ede3c..36060ce156 100644 --- a/sei-tendermint/internal/p2p/peermanager.go +++ b/sei-tendermint/internal/p2p/peermanager.go @@ -2,7 +2,9 @@ package p2p import ( "context" - "errors" + "fmt" + "maps" + "slices" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/im" @@ -12,88 +14,55 @@ import ( var logger = seilog.NewLogger("tendermint", "internal", "p2p") -type connSet[C peerConn] = im.Map[types.NodeID, C] - -type peerManagerInner[C peerConn] struct { - options *RouterOptions - isPersistent map[types.NodeID]bool - // sum of regular and persistent connection sets. - conns utils.AtomicSend[connSet[C]] - - regular *pool[C] - persistent *pool[C] +type PeerConnInfo struct { + ID types.NodeID + Channels ChannelIDSet + DialedAddr utils.Option[NodeAddress] + SelfDeclaredAddr utils.Option[NodeAddress] } -var errPersistentPeerAddr = errors.New("cannot add a persistent peer address to the regular address pool") - -func (i *peerManagerInner[C]) AddAddr(addr NodeAddress) error { - // Adding persistent peer addrs is only allowed during initialization. - // This is to make sure that malicious peers won't cause the preconfigured addrs to be dropped. - if i.isPersistent[addr.NodeID] { - return errPersistentPeerAddr - } - return i.regular.AddAddr(addr) +func (i PeerConnInfo) connID() connID { + return connID{NodeID: i.ID, outbound: i.DialedAddr.IsPresent()} } -func (i *peerManagerInner[C]) TryStartDial(persistentPeer bool) (NodeAddress, bool) { - // Check concurrent dials limit. - if len(i.regular.dialing)+len(i.persistent.dialing) >= i.options.maxDials() { - return NodeAddress{}, false - } - if persistentPeer { - return i.persistent.TryStartDial() - } - // Regular peers are additionally subject to outbound connections limit. - // We should not dial if it would result in too many outbound connections. - if len(i.regular.dialing)+i.regular.outbound >= i.options.maxOutboundConns() { - return NodeAddress{}, false - } - return i.regular.TryStartDial() +type peerConn interface { + comparable + Info() PeerConnInfo + Close() } -func (i *peerManagerInner[C]) DialFailed(addr NodeAddress) { - if i.isPersistent[addr.NodeID] { - i.persistent.DialFailed(addr) - } else { - i.regular.DialFailed(addr) +type connSet[C peerConn] = im.Map[connID, C] + +func GetAny[C peerConn](conns connSet[C], id types.NodeID) (C, bool) { + if c, ok := conns.Get(connID{id, true}); ok { + return c, true } + return conns.Get(connID{id, false}) } -func (i *peerManagerInner[C]) Evict(id types.NodeID) { - if !i.isPersistent[id] { - i.regular.Evict(id) +func GetAll[C peerConn](cs connSet[C], id types.NodeID) []C { + var out []C + for _, outbound := range utils.Slice(true, false) { + if c, ok := cs.Get(connID{id, outbound}); ok { + out = append(out, c) + } } + return out } -// Connected registers a new connection. -// If it is an outbound connection the dialing status is cleared (EVEN IF IT RETURNS AN ERROR). -func (i *peerManagerInner[C]) Connected(conn C) error { - info := conn.Info() - pool := i.regular - if i.isPersistent[info.ID] { - pool = i.persistent - } - err := pool.Connected(conn) - // Copy the update to the total connection pool. - conns := i.conns.Load() - if got, want := conns.GetOpt(info.ID), getOpt(pool.conns, info.ID); got != want { - i.conns.Store(conns.SetOpt(info.ID, want)) - } - return err +type peerManagerInner[C peerConn] struct { + isPersistent map[types.NodeID]bool + conns utils.AtomicSend[connSet[C]] + regular *poolManager + persistent *poolManager + lastDialPool *poolManager } -func (i *peerManagerInner[C]) Disconnected(conn C) { - info := conn.Info() - pool := i.regular - if i.isPersistent[info.ID] { - pool = i.persistent - } - pool.Disconnected(conn) - // Copy the update to the total connection pool. - conns := i.conns.Load() - if got, want := conns.GetOpt(info.ID), getOpt(pool.conns, info.ID); got != want { - i.conns.Store(conns.SetOpt(info.ID, want)) +func (i *peerManagerInner[C]) poolByID(id types.NodeID) *poolManager { + if i.isPersistent[id] { + return i.persistent } + return i.regular } // PeerManager manages connections and addresses of potential peers. @@ -106,184 +75,177 @@ func (i *peerManagerInner[C]) Disconnected(conn C) { // * Connected(conn) -> [communicate] -> Disconnected(conn) // For adding new peer addrs, call AddAddrs(). type peerManager[C peerConn] struct { + selfID types.NodeID options *RouterOptions isBlockSyncPeer map[types.NodeID]bool isPrivate map[types.NodeID]bool + + inner utils.Watch[*peerManagerInner[C]] // Receiver of the inner.conns. It is copyable and allows accessing connections // without taking lock on inner. conns utils.AtomicRecv[connSet[C]] - inner utils.Watch[*peerManagerInner[C]] } func (p *peerManager[C]) LogState() { for inner := range p.inner.Lock() { logger.Info("p2p connections", - "regular", len(inner.regular.conns), - "regular-max", p.options.maxConns(), - "unconditional", len(inner.persistent.conns), + "regular", fmt.Sprintf("in=%v/%v + out=%v/%v", + len(inner.regular.in), inner.regular.cfg.MaxIn, + len(inner.regular.out), inner.regular.cfg.MaxOut, + ), + "unconditional", fmt.Sprintf("in=%v + out=%v", + len(inner.persistent.in), + len(inner.persistent.out), + ), ) } } -// PeerUpdatesRecv. -// NOT THREAD-SAFE. -type peerUpdatesRecv[C peerConn] struct { - recv utils.AtomicRecv[connSet[C]] - last map[types.NodeID]struct{} -} - -// PeerUpdate is a peer update event sent via PeerUpdates. -type PeerUpdate struct { - NodeID types.NodeID - Status PeerStatus - Channels ChannelIDSet -} - -func (s *peerUpdatesRecv[C]) Recv(ctx context.Context) (PeerUpdate, error) { - var update PeerUpdate - _, err := s.recv.Wait(ctx, func(conns connSet[C]) bool { - // Check for disconnected peers. - for id := range s.last { - if _, ok := conns.Get(id); !ok { - delete(s.last, id) - update = PeerUpdate{ - NodeID: id, - Status: PeerStatusDown, - } - return true - } - } - // Check for connected peers. - for id, conn := range conns.All() { - if _, ok := s.last[id]; !ok { - s.last[id] = struct{}{} - update = PeerUpdate{ - NodeID: id, - Status: PeerStatusUp, - Channels: conn.Info().Channels, - } - return true - } - } - return false - }) - return update, err -} - -func (m *peerManager[C]) Subscribe() *peerUpdatesRecv[C] { - return &peerUpdatesRecv[C]{ - recv: m.conns, - last: map[types.NodeID]struct{}{}, - } -} - func newPeerManager[C peerConn](selfID types.NodeID, options *RouterOptions) *peerManager[C] { - inner := &peerManagerInner[C]{ - options: options, - isPersistent: map[types.NodeID]bool{}, - conns: utils.NewAtomicSend(im.NewMap[types.NodeID, C]()), - - persistent: newPool[C](poolConfig{selfID: selfID}), - regular: newPool[C](poolConfig{ - selfID: selfID, - maxConns: utils.Some(options.maxConns()), - maxAddrs: utils.Some(options.maxPeers()), - }), - } isBlockSyncPeer := map[types.NodeID]bool{} isPrivate := map[types.NodeID]bool{} + isPersistent := map[types.NodeID]bool{} for _, id := range options.PrivatePeers { isPrivate[id] = true } for _, id := range options.UnconditionalPeers { - inner.isPersistent[id] = true + isPersistent[id] = true } for _, id := range options.BlockSyncPeers { - inner.isPersistent[id] = true + isPersistent[id] = true isBlockSyncPeer[id] = true } // We do not allow multiple addresses for the same peer in the peer manager any more. // It would be backward incompatible to invalidate configs with multiple addresses per peer. // Instead we just log an error to indicate that some addresses have been ignored. + var persistentAddrs []NodeAddress for _, addr := range options.PersistentPeers { - inner.isPersistent[addr.NodeID] = true - if err := inner.persistent.AddAddr(addr); err != nil { - logger.Error("failed to add a persistent peer address to the pool", "addr", addr, "err", err) + if err := addr.Validate(); err != nil { + logger.Error("invalid persistent peer address", "addr", addr, "err", err) + continue } + isPersistent[addr.NodeID] = true + persistentAddrs = append(persistentAddrs, addr) } + var bootstrapAddrs []NodeAddress for _, addr := range options.BootstrapPeers { - if err := inner.AddAddr(addr); err != nil { - logger.Error("failed to add a bootstrap peer address to the pool", "addr", addr, "err", err) + if err := addr.Validate(); err != nil { + logger.Error("invalid bootstrap peer address", "addr", addr, "err", err) + continue + } + if isPersistent[addr.NodeID] { + persistentAddrs = append(persistentAddrs, addr) + } else { + bootstrapAddrs = append(bootstrapAddrs, addr) } } + + inner := &peerManagerInner[C]{ + isPersistent: isPersistent, + conns: utils.NewAtomicSend(im.NewMap[connID, C]()), + persistent: newPoolManager(&poolConfig{ + MaxIn: utils.Max[int](), + MaxOut: utils.Max[int](), + FixedAddrs: persistentAddrs, + InPool: func(id types.NodeID) bool { + return id != selfID && isPersistent[id] + }, + }), + regular: newPoolManager(&poolConfig{ + MaxIn: options.maxInbound(), + MaxOut: options.maxOutbound(), + FixedAddrs: bootstrapAddrs, + InPool: func(id types.NodeID) bool { + return id != selfID && !isPersistent[id] + }, + }), + } return &peerManager[C]{ + selfID: selfID, options: options, isBlockSyncPeer: isBlockSyncPeer, isPrivate: isPrivate, - conns: inner.conns.Subscribe(), inner: utils.NewWatch(inner), + conns: inner.conns.Subscribe(), } } -func (m *peerManager[C]) Conns() connSet[C] { - return m.conns.Load() -} +func (m *peerManager[C]) Conns() connSet[C] { return m.conns.Load() } -// AddAddrs adds addresses, so that they are available for dialing. +// PushPex registers address list received from sender in the pex table. +// Address list replaces the previous address list received from that sender +// (every sender has a bounded capacity in peermanager). +// The addresses on the list are expected to be fresh, ideally they should be addresses +// of the current peers of the sender. This property allows us to quickly prune stale +// addresses. PeerManager keeps address list from every connected peer and a small +// "extra" cache for senders which are not connected to facilitate random local search. +// If any of the addresses is invalid (does not parse), the whole slice is rejected. // Addresses to persistent peers are ignored, since they are populated in constructor. -// Known addresses are ignored. -// If maxAddrsPerPeer limit is exceeded, new address replaces a random failed address of that peer. -// If options.MaxPeers limit is exceeded, some peer with ALL addresses failed is replaced. -// If there is no such address/peer to replace, the new address is ignored. -// If some address is invalid, an error is returned. -// Even if an error is returned, some addresses might have been added. -func (m *peerManager[C]) AddAddrs(addrs []NodeAddress) error { - if len(addrs) == 0 { - return nil - } +func (m *peerManager[C]) PushPex(sender utils.Option[types.NodeID], addrs []NodeAddress) error { for _, addr := range addrs { if err := addr.Validate(); err != nil { return err } } for inner, ctrl := range m.inner.Lock() { - updated := false - for _, addr := range addrs { - // It is expected that not peer addresses will be accepted to the pool. - if err := inner.AddAddr(addr); err == nil { - updated = true + // pex data is indexed by senders which are connected peers. + // Other pex data is restricted to a small unindexed cache. + // Therefore we downgrade sender to None, if it is not a connected peer. + if id, ok := sender.Get(); ok { + if _, ok := GetAny(inner.conns.Load(), id); !ok { + sender = utils.None[types.NodeID]() } } - if updated { + inner.regular.PushPex(sender, addrs) + ctrl.Updated() + } + return nil +} + +func (m *peerManager[C]) PushUpgradePermit() { + for inner, ctrl := range m.inner.Lock() { + if !inner.regular.upgradePermit { + inner.regular.upgradePermit = true ctrl.Updated() } } - return nil } -// StartDial waits until there is a (persistent/non-persistent) address available for dialing. -// On success, it marks the peer as dialing - peer won't be available for dialing until DialFailed -// is called. -func (m *peerManager[C]) StartDial(ctx context.Context, persistentPeer bool) (NodeAddress, error) { +// StartDial waits until there is a address available for dialing. +// Returns a collection of addresses known for this peer. +// On success, it marks the peer as dialing and this peer won't be available +// for dialing until DialFailed is called. +func (m *peerManager[C]) StartDial(ctx context.Context) ([]NodeAddress, error) { for inner, ctrl := range m.inner.Lock() { + // Start with pool which has NOT dialed previously (for fairness). + pools := utils.Slice(inner.persistent, inner.regular) + if pools[0] == inner.lastDialPool { + pools[0], pools[1] = pools[1], pools[0] + } for { - if addr, ok := inner.TryStartDial(persistentPeer); ok { - return addr, nil + for _, pool := range pools { + if addrs, ok := pool.TryStartDial(); ok { + inner.lastDialPool = pool + ctrl.Updated() + return addrs, nil + } } if err := ctrl.Wait(ctx); err != nil { - return NodeAddress{}, err + return nil, err } } } panic("unreachable") } -// DialFailed marks the address as "failed to dial". -// The addr.NodeID peer will be added back to the pool of peers -// available for dialing. -func (p *peerManager[C]) DialFailed(addr NodeAddress) { - for inner, ctrl := range p.inner.Lock() { - inner.DialFailed(addr) +// DialFailed notifies the peer manager that dialing addresses of id has failed. +func (m *peerManager[C]) DialFailed(id types.NodeID) { + for inner, ctrl := range m.inner.Lock() { + if err := inner.poolByID(id).DialFailed(id); err != nil { + // DialFailed will fail if id was not marked as dialing. + logger.Error("DialFailed()", "id", id, "err", err) + return + } ctrl.Updated() } } @@ -293,29 +255,64 @@ func (p *peerManager[C]) DialFailed(addr NodeAddress) { // May close and drop a duplicate connection already present in the pool. // Returns an error if the connection should be rejected. func (m *peerManager[C]) Connected(conn C) error { + id := conn.Info().connID() + if id.NodeID == m.selfID { + conn.Close() + return fmt.Errorf("connection to self") + } for inner, ctrl := range m.inner.Lock() { + // Notify the pool. + pool := inner.poolByID(id.NodeID) + toDisconnect, err := pool.Connect(id) + if err != nil { + conn.Close() + return err + } + // Update the connection set. + conns := inner.conns.Load() + // Check if pool requested a disconnect. + if toDisconnect, ok := toDisconnect.Get(); ok { + conns.GetOpt(toDisconnect).OrPanic("pool/connection set mismatch").Close() + conns = conns.Delete(toDisconnect) + } + // Insert new connection. + inner.conns.Store(conns.Set(id, conn)) ctrl.Updated() - return inner.Connected(conn) } - panic("unreachable") + return nil } // Disconnected removes conn from the connection pool. // Noop if conn was not in the connection pool. // conn.PeerInfo().NodeID peer is available for dialing again. func (m *peerManager[C]) Disconnected(conn C) { + id := conn.Info().connID() for inner, ctrl := range m.inner.Lock() { - inner.Disconnected(conn) + // It is fine to call Disconnected for conn which is not present. + conns := inner.conns.Load() + if got, ok := conns.Get(id); !ok || conn != got { + return + } + // Notify pool about disconnect. + // Panic is OK, because inconsistency between conns and pool would be a bug. + pool := inner.poolByID(id.NodeID) + utils.OrPanic(pool.Disconnect(id)) + conns = conns.Delete(id) + if _, ok := GetAny(conns, id.NodeID); !ok { + inner.regular.ClearPex(id.NodeID) + } + inner.conns.Store(conns) ctrl.Updated() } } -// Evict removes known addresses of the regular peer and closed connection to the regular peer. -// NOTE: noop for persistent peers. +// Evict closes connection to id. func (m *peerManager[C]) Evict(id types.NodeID) { - for inner, ctrl := range m.inner.Lock() { - inner.Evict(id) - ctrl.Updated() + conns := m.Conns() + for _, outbound := range utils.Slice(true, false) { + if c, ok := conns.Get(connID{id, outbound}); ok { + c.Close() + } } } @@ -323,24 +320,6 @@ func (m *peerManager[C]) IsBlockSyncPeer(id types.NodeID) bool { return len(m.isBlockSyncPeer) == 0 || m.isBlockSyncPeer[id] } -func (m *peerManager[C]) State(id types.NodeID) string { - for inner := range m.inner.Lock() { - if _, ok := inner.conns.Load().Get(id); ok { - return "ready,connected" - } - if inner.isPersistent[id] { - if _, ok := inner.persistent.dialing[id]; ok { - return "dialing" - } - } else { - if _, ok := inner.regular.dialing[id]; ok { - return "dialing" - } - } - } - return "" -} - func (m *peerManager[C]) Advertise() []NodeAddress { var addrs []NodeAddress // Advertise your own address. @@ -348,16 +327,15 @@ func (m *peerManager[C]) Advertise() []NodeAddress { addrs = append(addrs, addr) } var selfAddrs []NodeAddress - conns := m.conns.Load() - for _, conn := range conns.All() { - info := conn.Info() - if m.isPrivate[info.ID] { + for id, conn := range m.Conns().All() { + if m.isPrivate[id.NodeID] { continue } - if addr, ok := info.DialAddr.Get(); ok { + info := conn.Info() + if addr, ok := info.DialedAddr.Get(); ok { // Prioritize dialed addresses of outbound connections. addrs = append(addrs, addr) - } else if addr, ok := info.SelfAddr.Get(); ok { + } else if addr, ok := info.SelfDeclaredAddr.Get(); ok { // Fallback to self-declared addresses of inbound connections. selfAddrs = append(selfAddrs, addr) } @@ -365,28 +343,39 @@ func (m *peerManager[C]) Advertise() []NodeAddress { return append(addrs, selfAddrs...) } -func (m *peerManager[C]) Peers() []types.NodeID { - var ids []types.NodeID - for inner := range m.inner.Lock() { - ids = make([]types.NodeID, 0, len(inner.persistent.addrs)+len(inner.regular.addrs)) - for id := range inner.persistent.addrs { - ids = append(ids, id) - } - for id := range inner.regular.addrs { - ids = append(ids, id) +// All addresses in pools. +// Used by net_info endpoint, which is used by integration tests and for debugging. +func (m *peerManager[C]) AllAddrs() []NodeAddress { + addrs := map[types.NodeID]NodeAddress{} + for _, info := range m.ConnInfos() { + if addr, ok := info.DialedAddr.Get(); ok { + addrs[addr.NodeID] = addr + } else if addr, ok := info.SelfDeclaredAddr.Get(); ok { + addrs[addr.NodeID] = addr } } - return ids -} - -func (m *peerManager[C]) Addresses(id types.NodeID) []NodeAddress { - var addrs []NodeAddress for inner := range m.inner.Lock() { for _, pool := range utils.Slice(inner.persistent, inner.regular) { - if pa, ok := pool.addrs[id]; ok { - addrs = append(addrs, pa.addr) + for e := range pool.pex.All() { + for _, pAddr := range e.addrs { + if _, ok := addrs[pAddr.NodeID]; !ok { + addrs[pAddr.NodeID] = pAddr.NodeAddress + } + } } } } - return addrs + return slices.Collect(maps.Values(addrs)) +} + +// Infos of connections in the pool. +func (m *peerManager[C]) ConnInfos() []PeerConnInfo { + infos := map[types.NodeID]PeerConnInfo{} + for _, conn := range m.Conns().All() { + info := conn.Info() + if _, ok := infos[info.ID]; !ok || info.DialedAddr.IsPresent() { + infos[info.ID] = info + } + } + return slices.Collect(maps.Values(infos)) } diff --git a/sei-tendermint/internal/p2p/peermanager_pool.go b/sei-tendermint/internal/p2p/peermanager_pool.go index 763cdb7d02..05f3b45d07 100644 --- a/sei-tendermint/internal/p2p/peermanager_pool.go +++ b/sei-tendermint/internal/p2p/peermanager_pool.go @@ -1,229 +1,343 @@ package p2p import ( + "crypto/rand" + "crypto/sha256" + "encoding/binary" "errors" - "fmt" - "time" + "iter" + "maps" + "slices" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/types" ) -// None = -inf (i.e. beginning of time) -func before(a, b utils.Option[time.Time]) bool { - bt, ok := b.Get() - if !ok { - return false - } - at, ok := a.Get() - if !ok { - return true - } - return at.Before(bt) +// ID of a directed connection. +type connID struct { + types.NodeID + outbound bool } -func getOpt[K comparable, V any](m map[K]V, k K) utils.Option[V] { - if v, ok := m[k]; ok { - return utils.Some(v) - } - return utils.None[V]() +// NodeID with priority. +type pNodeID struct { + priority uint64 + types.NodeID } -type poolConfig struct { - selfID types.NodeID - maxConns utils.Option[int] - maxAddrs utils.Option[int] +// NodeAddress with priority. +type pAddr struct { + priority uint64 + NodeAddress } -type pool[C peerConn] struct { - poolConfig +func (a pAddr) pNodeID() pNodeID { return pNodeID{a.priority, a.NodeID} } - outbound int - conns map[types.NodeID]C - addrs map[types.NodeID]*peerAddr - dialing map[types.NodeID]NodeAddress +type pexEntry struct { + searched bool + addrs []pAddr } -func newPool[C peerConn](cfg poolConfig) *pool[C] { - return &pool[C]{ - poolConfig: cfg, - conns: map[types.NodeID]C{}, - addrs: map[types.NodeID]*peerAddr{}, - dialing: map[types.NodeID]NodeAddress{}, +func (p *poolManager) newPexEntry(addrs iter.Seq[NodeAddress]) *pexEntry { + e := &pexEntry{} + for addr := range addrs { + if p.cfg.InPool(addr.NodeID) { + e.addrs = append(e.addrs, pAddr{priority: p.priority(addr.NodeID), NodeAddress: addr}) + } } + return e +} + +type pexTable struct { + fixed *pexEntry + bySender map[types.NodeID]*pexEntry + extra utils.RingBuf[*pexEntry] } -type peerAddr struct { - lastFail utils.Option[time.Time] - addr NodeAddress - isPublic bool +func (t *pexTable) ClearSearched() { + for e := range t.All() { + e.searched = false + } } -type peerConnInfo struct { - ID types.NodeID - Channels ChannelIDSet - DialAddr utils.Option[NodeAddress] - SelfAddr utils.Option[NodeAddress] +func (t *pexTable) All() iter.Seq[*pexEntry] { + return func(yield func(*pexEntry) bool) { + if !yield(t.fixed) { + return + } + for _, e := range t.bySender { + if !yield(e) { + return + } + } + for e := range t.extra.All() { + if !yield(e) { + return + } + } + } } -type peerConn interface { - comparable - Info() peerConnInfo - Close() +type poolConfig struct { + MaxIn int // Maximal number of inbound connections. + MaxOut int // Maximal number of outbound connections. + FixedAddrs []NodeAddress // Addresses which are always available for dialing. + InPool func(types.NodeID) bool // InPool(id) <=> id belongs to this pool. } -var errSelfAddr = errors.New("self address is ignored") -var errDuplicate = errors.New("duplicate address for the same peer") -var errTooMany = errors.New("too many addresses in the peer manager") +type poolManager struct { + cfg *poolConfig + priority func(types.NodeID) uint64 -// AddAddr adds an address to the pool. -// Returns an error iff the address could not be added. -func (p *pool[C]) AddAddr(addr NodeAddress) error { - pa := &peerAddr{addr: addr, isPublic: addr.IsPublic()} - // Ignore address to self. - if addr.NodeID == p.selfID { - return errSelfAddr + in map[types.NodeID]struct{} + out map[types.NodeID]uint64 + dialing map[types.NodeID]struct{} + dialHistory map[types.NodeID]struct{} + upgradePermit bool + pex pexTable +} + +func newPoolManager(cfg *poolConfig) *poolManager { + var seed [32]byte + utils.OrPanic1(rand.Read(seed[:])) + // PRF defining peer priority. + // It makes the global topology converge to an uniformly random graph + // of a bounded degree. + priority := func(id types.NodeID) uint64 { + // NOTE: theoretically it would be more efficient to create a hasher once + // (sha256.New), then push seed to it (via hash.Write), then copy the hasher + // at every call to priority and push the id afterwards. However the difference + // is marginal AND the way that hash.Cloner is supposed to be used is disgusting. + // NOTE: converting string to []byte allocates a new slice. + hash := sha256.Sum256(append([]byte(id), seed[:]...)) + return binary.LittleEndian.Uint64(hash[:]) + } + p := &poolManager{ + cfg: cfg, + priority: priority, + in: map[types.NodeID]struct{}{}, + out: map[types.NodeID]uint64{}, + dialing: map[types.NodeID]struct{}{}, + dialHistory: map[types.NodeID]struct{}{}, + upgradePermit: false, + pex: pexTable{ + bySender: map[types.NodeID]*pexEntry{}, + extra: utils.NewRingBuf[*pexEntry](10), + }, } - if old, ok := p.addrs[addr.NodeID]; ok { - // Ignore duplicates. - if old.addr == addr { - return errDuplicate - } - // If the old address failed, prune it. - if old.lastFail.IsPresent() { - p.addrs[pa.addr.NodeID] = pa - return nil - } - // Prune private address, if we insert a public address - if !old.isPublic && addr.IsPublic() { - p.addrs[pa.addr.NodeID] = pa - return nil - } - // Otherwise, do not replace - return errDuplicate + p.pex.fixed = p.newPexEntry(slices.Values(cfg.FixedAddrs)) + return p +} + +func (p *poolManager) setDialing(id types.NodeID) { + // We should keep around enough history to cover the whole pex. + // We cannot keep it unbounded to avoid OOM. + const maxHistory = 10000 + if len(p.dialHistory) >= maxHistory { + p.dialHistory = map[types.NodeID]struct{}{} } - // If there limit on addresses has not been reached, allow the new address. - if maxAddrs, ok := p.maxAddrs.Get(); !ok || len(p.addrs) < maxAddrs { - p.addrs[pa.addr.NodeID] = pa - return nil + p.dialing[id] = struct{}{} + p.dialHistory[id] = struct{}{} +} + +func (p *poolManager) toUpgrade() utils.Option[pNodeID] { + low := utils.None[pNodeID]() + if len(p.out) < p.cfg.MaxOut { + return low } - // Find any failed address to prune. - // It doesn't matter what was the time of the failure, - // since lastFail time is used just for round robin ordering. - for id, old := range p.addrs { - if old.lastFail.IsPresent() { - delete(p.addrs, id) - p.addrs[pa.addr.NodeID] = pa - return nil + for old, priority := range p.out { + if id, ok := low.Get(); !ok || priority < id.priority { + low = utils.Some(pNodeID{priority, old}) } } - // If the new address is public, find a private address to prune. - if addr.IsPublic() { - for id, old := range p.addrs { - if !old.isPublic { - delete(p.addrs, id) - p.addrs[pa.addr.NodeID] = pa - return nil + return low +} + +// TryStartDial looks for a peer available for dialing. +// Marks peer as "dialing" on success. +// Returns a nonempty list of addresses of that peer. +// The complexity of a successful TryStartDial() is O(total pex size): +// - we assume the dialing to be infrequent +// - we assume the bound on the total number of pexEntries to be ~100 and the number of addresses per entry to be ~100. +// The complexity of failed TryStartDial() calls is amortized over the number of successful dial calls (+ number of inbound connections): +// TheTryStartDial marks entries of pex as "searched" and avoids processing them again +// until any event that can make any peer eligible for dialing again (dial failure/disconnect). +func (p *poolManager) TryStartDial() ([]NodeAddress, bool) { + switch { + // Dialing is not allowed if outbound connections are disabled. + case p.cfg.MaxOut == 0: + return nil, false + // Regular dialing is allowed iff the current outbound connections (including ongoing dials) + // do not saturate outbound capacity. + case len(p.out)+len(p.dialing) < p.cfg.MaxOut: // Try to find address to dial. + // Upgrades are allowed iff: + // * we have upgrade permit + // * outbound connections capacity is full + // * there are no ongoing dials + case p.upgradePermit && len(p.dialing) == 0 && len(p.out) == p.cfg.MaxOut: + // Otherwise dialing is not feasible atm. + default: + return nil, false + } + // In case of upgrade, we need to find a peer better than the lowest current outbound connection. + best := p.toUpgrade() + bestAddrs := map[NodeAddress]struct{}{} + for { + clearRecent := false + // Iterate over pex entries worth searching. + for e := range p.pex.All() { + if e.searched { + continue + } + e.searched = true + for _, addr := range e.addrs { + if id, ok := best.Get(); ok { + // Collect all addresses of the best node. + // We check if bestAddr==0, because best is initialized to toUpgrade, + // which is not a valid candidate. + if len(bestAddrs) != 0 && id.NodeID == addr.NodeID { + bestAddrs[addr.NodeAddress] = struct{}{} + continue + } + if addr.priority <= id.priority { + continue + } + } + // Skip candidates which are not eligible for dialing. + if _, ok := p.in[addr.NodeID]; ok { + continue + } + if _, ok := p.out[addr.NodeID]; ok { + continue + } + if _, ok := p.dialing[addr.NodeID]; ok { + continue + } + if _, ok := p.dialHistory[addr.NodeID]; ok { + clearRecent = true + continue + } + // We have found a new best candidate. + best = utils.Some(addr.pNodeID()) + clear(bestAddrs) + bestAddrs[addr.NodeAddress] = struct{}{} } } + if len(bestAddrs) > 0 { + addrs := slices.Collect(maps.Keys(bestAddrs)) + p.setDialing(addrs[0].NodeID) + p.pex.ClearSearched() + return addrs, true + } + // clearRecent indicates that we have finished round robin over all available peers, + // but if we clear dialHistory we will find a dialing candidate. + if !clearRecent { + return nil, false + } + p.dialHistory = map[types.NodeID]struct{}{} + p.pex.ClearSearched() } - // Nothing can be pruned. - return errTooMany } -func (p *pool[C]) DialFailed(addr NodeAddress) { - // Clear dialing status. - if p.dialing[addr.NodeID] == addr { - delete(p.dialing, addr.NodeID) +func (p *poolManager) PushUpgradePermit() { + p.upgradePermit = true +} + +func (p *poolManager) PushPex(sender utils.Option[types.NodeID], addrs []NodeAddress) { + // Accept at most 1 address per NodeID from each pex sender. + dedup := map[types.NodeID]NodeAddress{} + for _, addr := range addrs { + dedup[addr.NodeID] = addr } - // Record the failure time. - if peerAddr, ok := p.addrs[addr.NodeID]; ok && peerAddr.addr == addr { - peerAddr.lastFail = utils.Some(time.Now()) + e := p.newPexEntry(maps.Values(dedup)) + if sender, ok := sender.Get(); ok { + p.pex.bySender[sender] = e + } else { + if p.pex.extra.Full() { + p.pex.extra.PopFront() + } + p.pex.extra.PushBack(e) } } -func (p *pool[C]) Evict(id types.NodeID) { - delete(p.addrs, id) - if conn, ok := p.conns[id]; ok { - conn.Close() - } +func (p *poolManager) ClearPex(sender types.NodeID) { + delete(p.pex.bySender, sender) } -func (p *pool[C]) TryStartDial() (NodeAddress, bool) { - // Check the connections limit. - if m, ok := p.maxConns.Get(); ok && len(p.dialing)+len(p.conns) >= m { - return NodeAddress{}, false +func (p *poolManager) DialFailed(id types.NodeID) error { + if _, ok := p.dialing[id]; !ok { + return errUnexpectedPeer } + delete(p.dialing, id) + // Failed dial -> id is available for redialing. + p.pex.ClearSearched() + return nil +} + +var errUnexpectedPeer = errors.New("unexpected peer") +var errTooManyPeers = errors.New("too many peers") +var errNotInPool = errors.New("peer does not belong to the pool") - // Choose peer with the oldest lastFail. - var best utils.Option[*peerAddr] - for id, peerAddrs := range p.addrs { - if _, ok := p.dialing[id]; ok { - continue +// Connect registers a new connection. +// Returns an error if the connection was rejected. +// May disconnect another peer (returned in result) a fit the new one. +// In particular result may be equal to id, in which case the old connection +// under the same id needs to be disconnected. +func (p *poolManager) Connect(id connID) (utils.Option[connID], error) { + none := utils.None[connID]() + if id.outbound { + // Make sure that the peer was expected. + if _, ok := p.dialing[id.NodeID]; !ok { + return none, errUnexpectedPeer } - if _, ok := p.conns[id]; ok { - continue + delete(p.dialing, id.NodeID) + // Insert the peer. + priority := p.priority(id.NodeID) + if toUpgrade, ok := p.toUpgrade().Get(); ok { + // This should never happen, because of how the algorithm works: + // we only dial peers that we know that will be accepted. + if id.NodeID == toUpgrade.NodeID { + panic("BUG: dialed a peer with too low priority") + } + // Consume the upgrade permit. + p.upgradePermit = false + delete(p.out, toUpgrade.NodeID) + p.out[id.NodeID] = priority + return utils.Some(connID{NodeID: toUpgrade.NodeID, outbound: true}), nil } - if x, ok := best.Get(); !ok || before(peerAddrs.lastFail, x.lastFail) { - best = utils.Some(peerAddrs) + p.out[id.NodeID] = priority + return none, nil + } else { + // It is fine if new inbound connection overrides the old one. + if _, ok := p.in[id.NodeID]; ok { + return utils.Some(id), nil } - } - x, ok := best.Get() - if !ok { - return NodeAddress{}, false - } - // clear the failed status for the chosen address and mark it as dialing. - p.addrs[x.addr.NodeID].lastFail = utils.None[time.Time]() - p.dialing[x.addr.NodeID] = x.addr - return x.addr, true -} - -func (p *pool[C]) Connected(conn C) (err error) { - defer func() { - if err != nil { - conn.Close() + // Check the inbound limit. + if len(p.in) >= p.cfg.MaxIn { + return none, errTooManyPeers } - }() - info := conn.Info() - if info.ID == p.selfID { - return errors.New("connection to self") - } - if addr, ok := info.DialAddr.Get(); ok && p.dialing[addr.NodeID] == addr { - delete(p.dialing, addr.NodeID) - } - newIsOutbound := info.DialAddr.IsPresent() - if old, ok := p.conns[info.ID]; ok { - // * allow to override connections in the same direction. - // * inbound priority > outbound priority <=> peerID > selfID. - // This resolves the situation when peers try to connect to each other - // at the same time. - oldIsOutbound := old.Info().DialAddr.IsPresent() - if oldIsOutbound != newIsOutbound && (info.ID < p.selfID) != newIsOutbound { - return fmt.Errorf("duplicate connection from peer %q", info.ID) - } - old.Close() - delete(p.conns, info.ID) - if oldIsOutbound { - p.outbound -= 1 + // Check if this is peer from our pool. + if !p.cfg.InPool(id.NodeID) { + return none, errNotInPool } + p.in[id.NodeID] = struct{}{} + return none, nil } - if m, ok := p.maxConns.Get(); ok && len(p.conns) >= m { - return errors.New("too many connections") - } - if newIsOutbound { - p.outbound += 1 - } - p.conns[info.ID] = conn - return nil } -func (p *pool[C]) Disconnected(conn C) { - info := conn.Info() - if old, ok := p.conns[info.ID]; ok && old == conn { - old.Close() - delete(p.conns, info.ID) - if info.DialAddr.IsPresent() { - p.outbound -= 1 +func (p *poolManager) Disconnect(id connID) error { + if id.outbound { + if _, ok := p.out[id.NodeID]; !ok { + return errUnexpectedPeer } + delete(p.out, id.NodeID) + } else { + if _, ok := p.in[id.NodeID]; !ok { + return errUnexpectedPeer + } + delete(p.in, id.NodeID) } + // Peer disconnected -> available for dialing. + p.pex.ClearSearched() + return nil } diff --git a/sei-tendermint/internal/p2p/peermanager_pool_test.go b/sei-tendermint/internal/p2p/peermanager_pool_test.go index 47f91333be..0fb24a559a 100644 --- a/sei-tendermint/internal/p2p/peermanager_pool_test.go +++ b/sei-tendermint/internal/p2p/peermanager_pool_test.go @@ -1,6 +1,10 @@ package p2p import ( + "cmp" + "iter" + "maps" + "slices" "testing" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -8,277 +12,503 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/types" ) -func makeIPAddr(id types.NodeID, host string) NodeAddress { - return NodeAddress{NodeID: id, Hostname: host, Port: defaultPort} +func inPoolAll(types.NodeID) bool { return true } + +func opt[T any](v T, ok bool) utils.Option[T] { + if ok { + return utils.Some(v) + } + return utils.None[T]() } -func TestPool_AddAddr_deduplicate(t *testing.T) { - rng := utils.TestRng() - selfID := makeNodeID(rng) - p := newPool[*fakeConn](poolConfig{selfID: selfID}) - require.Error(t, p.AddAddr(makeAddrFor(rng, selfID))) - require.Nil(t, p.addrs[selfID]) - - peer := makeNodeID(rng) - addr := makeAddrFor(rng, peer) - require.NoError(t, p.AddAddr(addr)) - require.Error(t, p.AddAddr(addr)) - require.Equal(t, addr, p.addrs[peer].addr) +func toSet[T comparable](vs ...T) map[T]bool { + m := map[T]bool{} + for _, v := range vs { + m[v] = true + } + return m } -func TestPool_AddAddr_publicReplacesPrivateOnly(t *testing.T) { - rng := utils.TestRng() - selfID := makeNodeID(rng) - peer := makeNodeID(rng) - p := newPool[*fakeConn](poolConfig{selfID: selfID, maxAddrs: utils.Some(10)}) - privateAddr := makeIPAddr(peer, "192.168.0.10") - publicAddr := makeIPAddr(peer, "93.184.216.34") - - require.NoError(t, p.AddAddr(privateAddr)) - require.NoError(t, p.AddAddr(publicAddr)) - require.Equal(t, publicAddr, p.addrs[peer].addr) - require.ErrorIs(t, p.AddAddr(privateAddr), errDuplicate) - require.Equal(t, publicAddr, p.addrs[peer].addr) +func minBy[T any, I cmp.Ordered](vals iter.Seq[T], by func(T) I) utils.Option[T] { + var m utils.Option[T] + for v := range vals { + if x, ok := m.Get(); !ok || by(v) < by(x) { + m = utils.Some(v) + } + } + return m } -func TestPool_AddAddr_pruneFailedAllowsAnyAddr(t *testing.T) { - for name, host := range map[string]string{ - "private": "10.0.0.2", - "public": "93.184.216.35", - } { - t.Run(name, func(t *testing.T) { - rng := utils.TestRng() - selfID := makeNodeID(rng) - p := newPool[*fakeConn](poolConfig{selfID: selfID, maxAddrs: utils.Some(1)}) - - // Add an address and fail to dial it. - failedPeer := makeNodeID(rng) - failedAddr := makeIPAddr(failedPeer, "10.0.0.1") - require.NoError(t, p.AddAddr(failedAddr)) - dialAddr, ok := p.TryStartDial() - require.True(t, ok) - require.Equal(t, failedAddr, dialAddr) - p.DialFailed(dialAddr) - - // Add another address which should replace it. - newPeer := makeNodeID(rng) - newAddr := makeIPAddr(newPeer, host) - require.NoError(t, p.AddAddr(newAddr)) - require.Equal(t, newAddr, p.addrs[newPeer].addr) - _, stillPresent := p.addrs[failedPeer] - require.False(t, stillPresent) - }) +// Test checking that TryStartDial respects MaxOut limit. +// - no more dials than len(out)+len(dialing) +// - permit is required for upgrade. +// - only upgrade dial is allowed when MaxOut is saturated. +// - no parallel upgrades allowed even if permit is provided. +func TestPoolManager_TryStartDial_MaxOut(t *testing.T) { + rng := utils.TestRng() + const maxIn = 0 + const maxOut = 5 + fixedAddrs := make([]NodeAddress, maxOut) + want := map[types.NodeID][]NodeAddress{} + for i := range fixedAddrs { + addr := makeAddr(rng) + fixedAddrs[i] = addr + want[addr.NodeID] = utils.Slice(addr) } + pool := newPoolManager(&poolConfig{MaxIn: 0, MaxOut: maxOut, FixedAddrs: fixedAddrs, InPool: inPoolAll}) + got := map[types.NodeID][]NodeAddress{} + t.Log("initial maxOut dials should succeed") + for range maxOut { + addrs := opt(pool.TryStartDial()).OrPanic("") + got[addrs[0].NodeID] = addrs + } + require.NoError(t, utils.TestDiff(want, got)) + t.Log("successful connects do not free dialing slots") + for id := range got { + // no successful dial. + require.False(t, opt(pool.TryStartDial()).IsPresent()) + // connects should not cause evictions. + require.False(t, utils.OrPanic1(pool.Connect(connID{id, true})).IsPresent()) + } + + t.Log("Find a peer which should be able to upgrade connected peer.") + lowPriority := minBy(maps.Keys(got), pool.priority).OrPanic("") + newAddr := makeAddr(rng) + for pool.priority(newAddr.NodeID) <= pool.priority(lowPriority) { + newAddr = makeAddr(rng) + } + pool.PushPex(utils.Some(newAddr.NodeID), utils.Slice(newAddr)) + require.False(t, opt(pool.TryStartDial()).IsPresent()) + t.Log("successful dial after upgrade permit.") + pool.PushUpgradePermit() + require.Equal(t, utils.Some(utils.Slice(newAddr)), opt(pool.TryStartDial())) + + t.Log("find even better peer") + betterAddr := makeAddr(rng) + for pool.priority(betterAddr.NodeID) <= pool.priority(newAddr.NodeID) { + betterAddr = makeAddr(rng) + } + pool.PushPex(utils.Some(betterAddr.NodeID), utils.Slice(betterAddr)) + + t.Log("even with upgrade permit, start dial should fail until the first upgrade dial finishes") + pool.PushUpgradePermit() + require.False(t, opt(pool.TryStartDial()).IsPresent()) + + t.Log("finish upgrade successfully, lowest priority peer should be evicted") + evicted := utils.OrPanic1(pool.Connect(connID{newAddr.NodeID, true})).OrPanic("expected peer to evict") + require.Equal(t, connID{lowPriority, true}, evicted) + + t.Log("permit should be cleared") + require.False(t, opt(pool.TryStartDial()).IsPresent()) + + t.Log("push the permit again, better peer should be available for dialing") + pool.PushUpgradePermit() + require.Equal(t, utils.Some(utils.Slice(betterAddr)), opt(pool.TryStartDial())) } -func TestPool_AddAddr_prunePrivateOnlyForPublicInsert(t *testing.T) { +// Test checking that pool manager behaves reasonably with MaxOut = 0 +func TestPoolManager_MaxOutZero(t *testing.T) { rng := utils.TestRng() - selfID := makeNodeID(rng) - p := newPool[*fakeConn](poolConfig{selfID: selfID, maxAddrs: utils.Some(2)}) - - // Fill the pool with private and public addresses. - peerA := makeNodeID(rng) - peerB := makeNodeID(rng) - addrA := makeIPAddr(peerA, "93.184.216.1") - addrB := makeIPAddr(peerB, "10.0.0.4") - require.NoError(t, p.AddAddr(addrA)) - require.NoError(t, p.AddAddr(addrB)) - - // Adding a private address should fail. - peerC := makeNodeID(rng) - privateNew := makeIPAddr(peerC, "10.0.0.5") - require.ErrorIs(t, p.AddAddr(privateNew), errTooMany) - require.Equal(t, addrA, p.addrs[peerA].addr) - require.Equal(t, addrB, p.addrs[peerB].addr) - - // Adding a public address should prune the private one. - peerD := makeNodeID(rng) - publicNew := makeIPAddr(peerD, "93.184.216.36") - require.NoError(t, p.AddAddr(publicNew)) - require.Equal(t, publicNew, p.addrs[peerD].addr) - _, okPublic := p.addrs[peerA] - _, okPrivate := p.addrs[peerB] - require.True(t, okPublic) - require.False(t, okPrivate) + t.Log("populate pool in various ways") + pool := newPoolManager(&poolConfig{MaxIn: 1, MaxOut: 0, FixedAddrs: utils.Slice(makeAddr(rng)), InPool: inPoolAll}) + pool.PushPex(utils.Some(makeNodeID(rng)), []NodeAddress{makeAddr(rng)}) + pool.PushPex(utils.None[types.NodeID](), []NodeAddress{makeAddr(rng)}) + pool.PushUpgradePermit() + t.Log("dialing should fail anyway") + require.False(t, opt(pool.TryStartDial()).IsPresent()) + t.Log("Connect should work as usual") + _, err := pool.Connect(connID{makeNodeID(rng), true}) + require.ErrorIs(t, err, errUnexpectedPeer) + require.False(t, utils.OrPanic1(pool.Connect(connID{makeNodeID(rng), false})).IsPresent()) } -func TestPool_AddAddr_prune_failed_addrs(t *testing.T) { +// Test checking that DialFailed WAI: +// - only dialed addresses are accepted +func TestPoolManager_DialFailed(t *testing.T) { rng := utils.TestRng() - selfID := makeNodeID(rng) - p := newPool[*fakeConn](poolConfig{ - selfID: selfID, - }) - peer := makeNodeID(rng) - for range 3 { - // Insert a new address which should replace the old one. - addr := makeAddrFor(rng, peer) - require.NoError(t, p.AddAddr(addr)) - - // Dial and fail multiple times. - // Only the newest address is expected, since maxAddrsPerPeer == 1 - for range 5 { - dialAddr, ok := p.TryStartDial() - require.Equal(t, utils.Some(addr), opt(dialAddr, ok)) - p.DialFailed(dialAddr) + addr := makeAddr(rng) + pool := newPoolManager(&poolConfig{MaxIn: 1, MaxOut: 1, FixedAddrs: utils.Slice(addr), InPool: inPoolAll}) + t.Log("DialFailed() should error before TryStartDial()") + require.ErrorIs(t, pool.DialFailed(addr.NodeID), errUnexpectedPeer) + require.Equal(t, utils.Some(utils.Slice(addr)), opt(pool.TryStartDial())) + t.Log("DialFailed() should error for peer different than the one returned by TryStartDial()") + require.ErrorIs(t, pool.DialFailed(makeNodeID(rng)), errUnexpectedPeer) + t.Log("DialFailed() should succeed for the expected peer") + require.NoError(t, pool.DialFailed(addr.NodeID)) +} + +// Test checking that Connected behavior WAI: +// - for outbound only dialed addresses are accepted +// - for inbound the MaxIn is respected. +// - for inbound duplicates are accepted. +// - for outbound upgrade a low prio peer is disconnected and permit is cleared +func TestPoolManager_ConnectDisconnect(t *testing.T) { + rng := utils.TestRng() + fixedAddrs := utils.Slice(makeAddr(rng), makeAddr(rng)) + pool := newPoolManager(&poolConfig{MaxIn: 1, MaxOut: 1, FixedAddrs: fixedAddrs, InPool: inPoolAll}) + + t.Log("only dialed addresses succeed Connect") + dialed := opt(pool.TryStartDial()).OrPanic("")[0] + require.True(t, slices.Contains(fixedAddrs, dialed)) + for _, addr := range fixedAddrs { + evicted, err := pool.Connect(connID{addr.NodeID, true}) + if addr == dialed { + require.NoError(t, err) + require.False(t, evicted.IsPresent()) + } else { + require.ErrorIs(t, err, errUnexpectedPeer) } } + t.Log("duplicate outbound connections are rejected (since they are not dialed)") + outConn := connID{dialed.NodeID, true} + _, err := pool.Connect(outConn) + require.ErrorIs(t, err, errUnexpectedPeer) + + t.Log("MaxIn is respected") + inConn := connID{makeNodeID(rng), false} + require.False(t, utils.OrPanic1(pool.Connect(inConn)).IsPresent()) + _, err = pool.Connect(connID{makeNodeID(rng), false}) + require.ErrorIs(t, err, errTooManyPeers) + + t.Log("duplicate inbound connection are accepted, replacing the old ones") + require.Equal(t, utils.Some(inConn), utils.OrPanic1(pool.Connect(inConn))) + + t.Log("only connected addresses succeed Disconnect") + for _, outbound := range utils.Slice(true, false) { + require.ErrorIs(t, pool.Disconnect(connID{makeNodeID(rng), outbound}), errUnexpectedPeer) + } + for _, conn := range utils.Slice(inConn, outConn) { + require.NoError(t, pool.Disconnect(conn)) + } } -func TestPool_AddAddr_prune_failed_peers(t *testing.T) { +// Test checking connected/dialing addresses are not dialed. +// Test checking that disconnected/dial failed addresses are immediately available +// for dialing again in case no other addresses are available. +func TestPoolManager_DialAvailability(t *testing.T) { rng := utils.TestRng() - selfID := makeNodeID(rng) - p := newPool[*fakeConn](poolConfig{ - selfID: selfID, - maxAddrs: utils.Some(1), - }) - for range 3 { - peer := makeNodeID(rng) - // Insert a new peer which should replace the old one. - addr := makeAddrFor(rng, peer) - require.NoError(t, p.AddAddr(addr)) - - // Dial and fail multiple times. - // Only the newest address is expected, since maxAddrsPerPeer == 1 - for range 5 { - dialAddr, ok := p.TryStartDial() - require.Equal(t, utils.Some(addr), opt(dialAddr, ok)) - p.DialFailed(dialAddr) - } + var fixedAddrs []NodeAddress + for range 4 { + fixedAddrs = append(fixedAddrs, makeAddr(rng)) } + pool := newPoolManager(&poolConfig{MaxIn: 10, MaxOut: 10, FixedAddrs: fixedAddrs, InPool: inPoolAll}) + + t.Log("connect inbound, outbound and dial") + addr0 := fixedAddrs[0] + require.False(t, utils.OrPanic1(pool.Connect(connID{addr0.NodeID, false})).IsPresent()) + addr1 := opt(pool.TryStartDial()).OrPanic("")[0] + addr2 := opt(pool.TryStartDial()).OrPanic("")[0] + + t.Log("none of them should be dialed") + require.False(t, utils.OrPanic1(pool.Connect(connID{addr1.NodeID, true})).IsPresent()) + busy := utils.Slice(addr0, addr1, addr2) + require.False(t, slices.Contains(busy, opt(pool.TryStartDial()).OrPanic("")[0])) + require.False(t, opt(pool.TryStartDial()).IsPresent()) + + t.Log("reuse after dial failure") + require.NoError(t, pool.DialFailed(addr2.NodeID)) + require.Equal(t, utils.Slice(addr2), opt(pool.TryStartDial()).OrPanic("")) + require.False(t, opt(pool.TryStartDial()).IsPresent()) + + t.Log("reuse after inbound disconnect") + require.NoError(t, pool.Disconnect(connID{addr0.NodeID, false})) + require.Equal(t, utils.Slice(addr0), opt(pool.TryStartDial()).OrPanic("")) + require.False(t, opt(pool.TryStartDial()).IsPresent()) + + t.Log("reuse after outbound disconnect") + require.NoError(t, pool.Disconnect(connID{addr1.NodeID, true})) + require.Equal(t, utils.Slice(addr1), opt(pool.TryStartDial()).OrPanic("")) + require.False(t, opt(pool.TryStartDial()).IsPresent()) } -func TestPool_TryStartDial_RoundRobin(t *testing.T) { +// Test checking that TryStartDial does round robin in priority order +// - over all NodeIDs if there is 0 { + got := opt(pool.TryStartDial()).OrPanic("")[0] + require.NoError(t, pool.DialFailed(got.NodeID)) + require.True(t, want[got]) + delete(want, got) + } + } + + t.Log("fill the outbound capacity with random conns") + busy := map[NodeAddress]bool{} + minPrio := utils.Max[uint64]() + for range maxOut { + addr := opt(pool.TryStartDial()).OrPanic("")[0] + // decide whether dial was successful at random. + if rng.Intn(10) != 0 { + require.NoError(t, pool.DialFailed(addr.NodeID)) + continue + } + minPrio = min(minPrio, pool.priority(addr.NodeID)) + busy[addr] = true + require.False(t, utils.OrPanic1(pool.Connect(connID{addr.NodeID, true})).IsPresent()) + } + + t.Log("expect high priority addresses to be dialed round robin") + for range 3 { + want := map[NodeAddress]bool{} + for _, addr := range allAddrs { + if busy[addr] || pool.priority(addr.NodeID) <= minPrio { + continue + } + want[addr] = true + } + for len(want) > 0 { + got := opt(pool.TryStartDial()).OrPanic("")[0] + require.NoError(t, pool.DialFailed(got.NodeID)) + require.True(t, want[got]) + delete(want, got) } } } -func TestPool_Connected_deduplicate(t *testing.T) { +// Test checking that interleaving PushPex and TryStartDial works as intended: +// - pushed addresses are immediately available. +func TestPoolManager_PushPex(t *testing.T) { rng := utils.TestRng() - p := newPool[*fakeConn](poolConfig{selfID: makeNodeID(rng)}) - peer := makeNodeID(rng) - oldConn := makeConnFor(rng, peer, utils.GenBool(rng)) - require.NoError(t, p.Connected(oldConn)) - for range 100 { - require.False(t, oldConn.Closed()) - newConn := makeConnFor(rng, peer, utils.GenBool(rng)) - if err := p.Connected(newConn); err == nil { - newConn, oldConn = oldConn, newConn + pool := newPoolManager(&poolConfig{MaxIn: 0, MaxOut: 10, InPool: inPoolAll}) + + senders := utils.GenSliceN(rng, 3, makeNodeID) + for i := range 10 { + addrs := utils.GenSliceN(rng, 3, makeAddr) + pool.PushPex(utils.Some(senders[i%len(senders)]), addrs) + want := toSet(addrs...) + for len(want) > 0 { + got := opt(pool.TryStartDial()).OrPanic("")[0] + require.NoError(t, pool.DialFailed(got.NodeID)) + require.True(t, want[got]) + delete(want, got) } - require.True(t, newConn.Closed()) - p.Disconnected(newConn) } } -func opt[T any](v T, ok bool) utils.Option[T] { - if ok { - return utils.Some(v) +// Test checking that inbound and outbound connection for the same NodeID can coexist. +func TestPoolManager_InboundOutboundCoexist(t *testing.T) { + rng := utils.TestRng() + fixedAddrs := utils.GenSliceN(rng, 2, makeAddr) + pool := newPoolManager(&poolConfig{MaxIn: 10, MaxOut: 10, FixedAddrs: fixedAddrs, InPool: inPoolAll}) + + t.Logf("race inbound/outbound connections for the same peer") + addr1 := opt(pool.TryStartDial()).OrPanic("")[0] + addr2 := opt(pool.TryStartDial()).OrPanic("")[0] + utils.OrPanic1(pool.Connect(connID{addr1.NodeID, false})) + utils.OrPanic1(pool.Connect(connID{addr1.NodeID, true})) + utils.OrPanic1(pool.Connect(connID{addr2.NodeID, true})) + utils.OrPanic1(pool.Connect(connID{addr2.NodeID, false})) +} + +// Test checking that InPool filter works as intended: +// - inbound connections not from the pool should be rejected. +func TestPoolManager_InPool_Connect(t *testing.T) { + rng := utils.TestRng() + const maxIn = 10 + allowed := toSet(utils.GenSliceN(rng, maxIn, makeNodeID)...) + inPool := func(id types.NodeID) bool { return allowed[id] } + pool := newPoolManager(&poolConfig{MaxIn: maxIn, MaxOut: 10, InPool: inPool}) + for id := range allowed { + _, err := pool.Connect(connID{makeNodeID(rng), false}) + require.ErrorIs(t, err, errNotInPool) + require.False(t, utils.OrPanic1(pool.Connect(connID{id, false})).IsPresent()) + } +} + +// Test checking that InPool filter works as intended: +// - if PushPex/FixedAddrs inserts a mix of addresses form the pool and not from the pool, +// filtered out entries should be never dialed. +// - inbound connections not from the pool should be rejected. +func TestPoolManager_InPool_PushPex(t *testing.T) { + rng := utils.TestRng() + allowed := toSet(utils.GenSliceN(rng, 50, makeNodeID)...) + inPool := func(id types.NodeID) bool { return allowed[id] } + + t.Log("addresses of not-in-pool peers should get filtered out during PushPex") + var allowedAddrs []NodeAddress + for id := range allowed { + allowedAddrs = append(allowedAddrs, makeAddrFor(rng, id)) + } + nextAllowed := 5 + fixedAddrs := utils.GenSliceN(rng, 10, makeAddr) + fixedAddrs = append(fixedAddrs, allowedAddrs[:nextAllowed]...) + want := toSet(allowedAddrs[:nextAllowed]...) + utils.Shuffle(rng, fixedAddrs) + pool := newPoolManager(&poolConfig{MaxIn: 1, MaxOut: 1, FixedAddrs: fixedAddrs, InPool: inPool}) + for nextAllowed < len(allowedAddrs) { + // Push some pex entries with some allowed addresses interleaved. + for range 2 { + addrs := utils.GenSliceN(rng, 10, makeAddr) + n := min(nextAllowed+3, len(allowedAddrs)) + for _, a := range allowedAddrs[nextAllowed:n] { + addrs = append(addrs, a) + want[a] = true + } + nextAllowed = n + utils.Shuffle(rng, addrs) + pool.PushPex(utils.Some(makeNodeID(rng)), addrs) + } + // Expect all of them to get dialled. + for len(want) > 0 { + got := opt(pool.TryStartDial()).OrPanic("")[0] + require.NoError(t, pool.DialFailed(got.NodeID)) + require.True(t, want[got]) + delete(want, got) + } } - return utils.None[T]() } -func TestPool_Connected_race(t *testing.T) { +// Test checking that if PushPex accepts at most 1 addr per NodeID and the remaining ones are discarded. +func TestPoolManager_PushPex_DedupPerNode(t *testing.T) { rng := utils.TestRng() - for range 100 { - // There are 2 peers - p1 := newPool[*fakeConn](poolConfig{selfID: makeNodeID(rng)}) - p2 := newPool[*fakeConn](poolConfig{selfID: makeNodeID(rng)}) - // They know each others addresses. - p1addr := makeAddrFor(rng, p1.selfID) - p2addr := makeAddrFor(rng, p2.selfID) - require.NoError(t, p1.AddAddr(p2addr)) - require.NoError(t, p2.AddAddr(p1addr)) - // They dial each other. - require.Equal(t, utils.Some(p2addr), opt(p1.TryStartDial())) - require.Equal(t, utils.Some(p1addr), opt(p2.TryStartDial())) - p1c1, p2c1 := makeConnFor(rng, p2.selfID, false), makeConnTo(p1addr) - p1c2, p2c2 := makeConnTo(p2addr), makeConnFor(rng, p1.selfID, false) - // Connections are established in random order on each side. - conns := utils.Slice(p1c1, p2c1, p1c2, p2c2) - utils.Shuffle(rng, conns) - for _, c := range conns { - switch c.info.ID { - case p1.selfID: - _ = p2.Connected(c) - case p2.selfID: - _ = p1.Connected(c) + pool := newPoolManager(&poolConfig{MaxIn: 0, MaxOut: 1, InPool: inPoolAll}) + + for iter := range 10 { + t.Logf("iter = %v", iter) + // Multiple PushPex each with multiple addresses for multiple ids. + ids := map[types.NodeID]bool{} + senders := make([][]NodeAddress, 5) + for range 4 { + id := makeNodeID(rng) + ids[id] = true + for i := range senders { + for range 3 { + senders[i] = append(senders[i], makeAddrFor(rng, id)) + } } } - // Connections should be closed consistently on both sides. - require.Equal(t, p1c1.Closed(), p2c1.Closed()) - require.Equal(t, p1c2.Closed(), p2c2.Closed()) - // One connection should survice. - require.Equal(t, p1c1.Closed(), !p1c2.Closed()) + for _, s := range senders { + utils.Shuffle(rng, s) + pool.PushPex(utils.Some(makeNodeID(rng)), s) + } + // Dial the whole set of new ids. + for len(ids) > 0 { + addrs, ok := pool.TryStartDial() + require.True(t, ok) + id := addrs[0].NodeID + require.True(t, ids[id]) + delete(ids, id) + require.Equal(t, len(senders), len(addrs)) + require.NoError(t, pool.DialFailed(id)) + } } } -func TestPool_Evict(t *testing.T) { +// Test checking that InPool filter does not apply to PushPex sender. +func TestPoolManager_PushPex_SenderBypassesInPool(t *testing.T) { + rng := utils.TestRng() + allowed := makeNodeID(rng) + inPool := func(id types.NodeID) bool { return id == allowed } + pool := newPoolManager(&poolConfig{MaxIn: 0, MaxOut: 1, InPool: inPool}) + + t.Log("push pex from sender outside the pool, its allowed addresses should still be dialed") + want := makeAddrFor(rng, allowed) + addrs := append(utils.GenSliceN(rng, 5, makeAddr), want) + utils.Shuffle(rng, addrs) + pool.PushPex(utils.Some(makeNodeID(rng)), addrs) + require.Equal(t, utils.Slice(want), opt(pool.TryStartDial()).OrPanic("")) +} + +// Test checking that addresses of the same node are aggregated +// and that TryStartDial() deduplicates addresses. +func TestPoolManager_TryStartDial_AggregatesAddresses(t *testing.T) { rng := utils.TestRng() - p := newPool[*fakeConn](poolConfig{selfID: makeNodeID(rng)}) - // Dial a peer and connect. - peer := makeNodeID(rng) - addr := makeAddrFor(rng, peer) - require.NoError(t, p.AddAddr(addr)) - require.Equal(t, utils.Some(addr), opt(p.TryStartDial())) - conn := makeConnTo(addr) - require.NoError(t, p.Connected(conn)) - - // Evict the peer. Connection should be closed, and the address removed. - p.Evict(peer) - require.True(t, conn.Closed()) - require.Equal(t, utils.None[NodeAddress](), opt(p.TryStartDial())) + t.Log("generate bunch of addresses for a bunch of NodeIDs") + var byID [][]NodeAddress + for range 10 { + id := makeNodeID(rng) + var addrs []NodeAddress + for range 3 { + addrs = append(addrs, makeAddrFor(rng, id)) + } + byID = append(byID, addrs) + } + t.Log("sample the addresses into a bunch of PushPex calls") + var senders [][]NodeAddress + want := map[NodeAddress]bool{} + for range 10 { + addrs := make([]NodeAddress, len(byID)) + for i := range addrs { + addrs[i] = byID[i][rng.Intn(len(byID[i]))] + want[addrs[i]] = true + } + utils.Shuffle(rng, addrs) + senders = append(senders, addrs) + } + pool := newPoolManager(&poolConfig{MaxIn: 0, MaxOut: len(byID), FixedAddrs: senders[0], InPool: inPoolAll}) + pool.PushPex(utils.None[types.NodeID](), senders[1]) + for _, addrs := range senders[2:] { + pool.PushPex(utils.Some(makeNodeID(rng)), addrs) + } + + t.Log("expect TryStartDial to aggregate all addresses of that node") + got := map[NodeAddress]bool{} + for range len(byID) { + addrs := opt(pool.TryStartDial()).OrPanic("") + // No duplicates. + require.Equal(t, len(toSet(addrs...)), len(addrs)) + // Consistent id + id := addrs[0].NodeID + for _, addr := range addrs { + require.Equal(t, id, addr.NodeID) + got[addr] = true + } + } + // All provided addresses have been covered. + require.Equal(t, want, got) } -func TestPool_OutboundCount(t *testing.T) { +// Test checking that ClearPex makes addresses not available for dialing. +func TestPoolManager_ClearPex(t *testing.T) { rng := utils.TestRng() - p := newPool[*fakeConn](poolConfig{ - selfID: makeNodeID(rng), - maxConns: utils.Some(3), - }) - var ids []types.NodeID + fixedAddrs := utils.GenSliceN(rng, 5, makeAddr) + extraAddrs := utils.GenSliceN(rng, 5, makeAddr) + pool := newPoolManager(&poolConfig{MaxIn: 0, MaxOut: 10, FixedAddrs: fixedAddrs, InPool: inPoolAll}) + pool.PushPex(utils.None[types.NodeID](), extraAddrs) + + t.Log("populate pool with multiple senders and extra entries") + senders := map[types.NodeID][]NodeAddress{} + allAddrs := append(fixedAddrs, extraAddrs...) for range 5 { - ids = append(ids, makeNodeID(rng)) + addrs := utils.GenSliceN(rng, 5, makeAddr) + s := makeNodeID(rng) + senders[s] = addrs + pool.PushPex(utils.Some(s), addrs) + allAddrs = append(allAddrs, addrs...) } - conns := map[types.NodeID]*fakeConn{} - for range 100 { - id := ids[rng.Intn(len(ids))] - if conns[id] != nil && utils.GenBool(rng) { - p.Disconnected(conns[id]) - require.True(t, conns[id].Closed()) - conns[id] = nil - } else { - conn := makeConnFor(rng, id, utils.GenBool(rng)) - if err := p.Connected(conn); err == nil { - conns[id], conn = conn, conns[id] - } - require.True(t, conn == nil || conn.Closed()) - require.True(t, conns[id] == nil || !conns[id].Closed()) + all := toSet(allAddrs...) + for s, addrs := range senders { + for _, addr := range addrs { + delete(all, addr) } - outbound := 0 - for _, c := range conns { - if c != nil && c.info.DialAddr.IsPresent() { - outbound += 1 - } + pool.ClearPex(s) + for want := maps.Clone(all); len(want) > 0; { + got := opt(pool.TryStartDial()).OrPanic("")[0] + require.True(t, want[got]) + delete(want, got) + require.NoError(t, pool.DialFailed(got.NodeID)) } - require.Equal(t, outbound, p.outbound) } } diff --git a/sei-tendermint/internal/p2p/peermanager_test.go b/sei-tendermint/internal/p2p/peermanager_test.go index 03bbcd586f..78727b5a28 100644 --- a/sei-tendermint/internal/p2p/peermanager_test.go +++ b/sei-tendermint/internal/p2p/peermanager_test.go @@ -3,6 +3,7 @@ package p2p import ( "context" "fmt" + "slices" "sync/atomic" "testing" "time" @@ -15,24 +16,44 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/types" ) +func makeKey(rng utils.Rng) NodeSecretKey { + return NodeSecretKey(ed25519.TestSecretKey(utils.GenBytes(rng, 32))) +} + +func makeNodeID(rng utils.Rng) types.NodeID { + return makeKey(rng).Public().NodeID() +} + +func makeAddrFor(rng utils.Rng, id types.NodeID) NodeAddress { + return NodeAddress{ + NodeID: id, + Hostname: fmt.Sprintf("%s.example.com", utils.GenString(rng, 10)), + Port: uint16(rng.Int()), + } +} + +func makeAddr(rng utils.Rng) NodeAddress { + return makeAddrFor(rng, makeNodeID(rng)) +} + type fakeConn struct { - info peerConnInfo + info PeerConnInfo closed atomic.Bool } func makeConnFor(rng utils.Rng, id types.NodeID, dialing bool) *fakeConn { - info := peerConnInfo{ID: id} + info := PeerConnInfo{ID: id} if dialing { - info.DialAddr = utils.Some(makeAddrFor(rng, id)) + info.DialedAddr = utils.Some(makeAddrFor(rng, id)) } return &fakeConn{info: info} } func makeConnTo(addr NodeAddress) *fakeConn { return &fakeConn{ - info: peerConnInfo{ - ID: addr.NodeID, - DialAddr: utils.Some(addr), + info: PeerConnInfo{ + ID: addr.NodeID, + DialedAddr: utils.Some(addr), }, } } @@ -45,7 +66,7 @@ func (c *fakeConn) Closed() bool { return c.closed.Load() } -func (c *fakeConn) Info() peerConnInfo { return c.info } +func (c *fakeConn) Info() PeerConnInfo { return c.info } func (c *fakeConn) Close() { c.closed.Store(true) @@ -57,34 +78,21 @@ func makePeerManager(selfID types.NodeID, options *RouterOptions) *peerManager[* var selfID = types.NodeIDFromPubKey(ed25519.TestSecretKey([]byte{12, 43}).Public()) -func makeKey(rng utils.Rng) NodeSecretKey { - return NodeSecretKey(ed25519.TestSecretKey(utils.GenBytes(rng, 32))) -} - -func makeNodeID(rng utils.Rng) types.NodeID { - return makeKey(rng).Public().NodeID() -} - -func makeAddrFor(rng utils.Rng, id types.NodeID) NodeAddress { - return NodeAddress{ - NodeID: id, - Hostname: fmt.Sprintf("%s.example.com", utils.GenString(rng, 10)), - Port: uint16(rng.Int()), - } -} - -func makeAddr(rng utils.Rng) NodeAddress { - return makeAddrFor(rng, makeNodeID(rng)) -} - func justIDs[C peerConn](conns connSet[C]) map[types.NodeID]bool { ids := map[types.NodeID]bool{} for id := range conns.All() { - ids[id] = true + ids[id.NodeID] = true } return ids } +func mustStartDial(t *testing.T, ctx context.Context, m *peerManager[*fakeConn]) []NodeAddress { + addrs, err := m.StartDial(ctx) + require.NoError(t, err) + require.NotEmpty(t, addrs) + return addrs +} + func TestRouterOptions(t *testing.T) { rng := utils.TestRng() @@ -135,174 +143,92 @@ func TestRouterOptions(t *testing.T) { } } -func TestPeerManager_AddAddrs(t *testing.T) { +func TestPeerManager_PushPex(t *testing.T) { + ctx := t.Context() rng := utils.TestRng() - t.Log("Generate some addresses") - var ids []types.NodeID - for range 6 { - ids = append(ids, makeNodeID(rng)) - } - addrs := map[types.NodeID]NodeAddress{} - for _, id := range ids { - addrs[id] = makeAddrFor(rng, id) - } - - t.Log("Collect all persistent peers' addrs.") - var persistentAddrs []NodeAddress - for _, id := range ids[:2] { - persistentAddrs = append(persistentAddrs, addrs[id]) - } - t.Log("Collect some other peers' addrs.") - var bootstrapAddrs []NodeAddress - for _, id := range ids[2:] { - bootstrapAddrs = append(bootstrapAddrs, addrs[id]) - } + t.Log("Generate persistent and bootstrap peers") + persistentAddrs := utils.GenSliceN(rng, 2, makeAddr) + bootstrapAddrs := utils.GenSliceN(rng, 4, makeAddr) unconditionalPeer := makeNodeID(rng) - t.Log("Create peer manager.") - maxPeers := 10 m := makePeerManager(selfID, &RouterOptions{ - BootstrapPeers: bootstrapAddrs, - PersistentPeers: persistentAddrs, - // Unconditional peers are just persistent peers that don't need to be dialed. + BootstrapPeers: bootstrapAddrs, + PersistentPeers: persistentAddrs, UnconditionalPeers: utils.Slice(unconditionalPeer), - // Blocksync peers are a subset of persistent peers. - // It is also a valid configuration to add blocksync peer without adding - // an address to persistent peers, but in such a case we expect such a peer to - // connect to us instead. - BlockSyncPeers: utils.Slice(ids[1]), - PrivatePeers: utils.Slice(ids[4]), - MaxPeers: utils.Some(maxPeers), + MaxInbound: utils.Some(50), + MaxOutbound: utils.Some(50), }) - t.Log("Check that all expected addrs are present.") - for _, id := range ids[:2] { - require.Equal(t, utils.Slice(addrs[id]), m.Addresses(id)) - } - for _, id := range ids[2:] { - require.Equal(t, utils.Slice(addrs[id]), m.Addresses(id)) + t.Log("All configured peers should eventually become dialable") + want := map[types.NodeID]bool{} + for _, addr := range append(slices.Clone(persistentAddrs), bootstrapAddrs...) { + want[addr.NodeID] = true } - require.NoError(t, utils.TestDiff(nil, m.Addresses(unconditionalPeer))) - - t.Log("Add all addresses at once.") - var allAddrs []NodeAddress - for _, id := range ids { - allAddrs = append(allAddrs, addrs[id]) + seen := map[types.NodeID]bool{} + for len(seen) < len(want) { + addrs := mustStartDial(t, ctx, m) + id := addrs[0].NodeID + require.True(t, want[id]) + seen[id] = true } - require.NoError(t, m.AddAddrs(allAddrs)) - t.Log("Check that all expected addrs are present.") - for _, id := range ids { - require.Equal(t, utils.Slice(addrs[id]), m.Addresses(id)) - } - require.Equal(t, nil, m.Addresses(unconditionalPeer)) - - t.Log("Check that adding new persistent peer address is ignored.") - require.NoError(t, m.AddAddrs(utils.Slice(makeAddrFor(rng, ids[0])))) - require.Equal(t, utils.Slice(addrs[ids[0]]), m.Addresses(ids[0])) - require.NoError(t, m.AddAddrs(utils.Slice(makeAddrFor(rng, unconditionalPeer)))) - require.Equal(t, nil, m.Addresses(unconditionalPeer)) - - t.Log("Check that options.MaxPeers limit is respected") - var newAddrs []NodeAddress - for range maxPeers { - addr := makeAddr(rng) - ids = append(ids, addr.NodeID) - addrs[addr.NodeID] = addr - newAddrs = append(newAddrs, addr) - } - require.NoError(t, m.AddAddrs(newAddrs)) - expectedIDs := maxPeers + 2 // There are 2 persistent peers. - for _, id := range ids[:expectedIDs] { - require.Equal(t, utils.Slice(addrs[id]), m.Addresses(id)) - } - for _, id := range ids[expectedIDs:] { - require.Equal(t, nil, m.Addresses(id)) - } + t.Log("Pushing new addresses via PEX should make them immediately dialable") + newPeer := makeAddr(rng) + require.NoError(t, m.PushPex(utils.Some(makeNodeID(rng)), utils.Slice(newPeer))) + require.Equal(t, utils.Slice(newPeer), mustStartDial(t, ctx, m)) - t.Log("Check that failed addresses are replaceable") - idToFail := ids[2] - m.DialFailed(addrs[idToFail]) // we fail 1 arbitrary address - newAddrs = utils.Slice(makeAddrFor(rng, idToFail)) - require.NoError(t, m.AddAddrs(newAddrs)) // we try to add some addrs - require.ElementsMatch(t, newAddrs, m.Addresses(idToFail)) + t.Log("DialFailed makes the peer eligible for dialing again") + m.DialFailed(newPeer.NodeID) + require.Equal(t, utils.Slice(newPeer), mustStartDial(t, ctx, m)) - t.Log("Check that failed peers are replaceable") - m.DialFailed(addrs[ids[4]]) - newPeer := makeAddr(rng) - require.NoError(t, m.AddAddrs(utils.Slice(newPeer))) - require.ElementsMatch(t, nil, m.Addresses(ids[4])) - require.ElementsMatch(t, utils.Slice(newPeer), m.Addresses(newPeer.NodeID)) + t.Log("Unconditional peers can always connect inbound") + require.NoError(t, m.Connected(makeConnFor(rng, unconditionalPeer, false))) } func TestPeerManager_ConcurrentDials(t *testing.T) { ctx := t.Context() - for _, tc := range []struct{ peers, maxDials, dials int }{ - {peers: 10, maxDials: 3, dials: 20}, // dialing limited by MaxConcurrentDials - {peers: 4, maxDials: 10, dials: 20}, // dialing limited by available peer addrs - } { - t.Run(fmt.Sprintf("peers=%v maxDials=%v", tc.peers, tc.maxDials), func(t *testing.T) { - rng := utils.TestRng() - addrsMap := map[NodeAddress]bool{} - // Generate some persistent and non-persistent peers. - var bootstrapAddrs []NodeAddress - var persistentAddrs []NodeAddress - for i := range tc.peers { - addrs := &bootstrapAddrs - if i%2 == 0 { - addrs = &persistentAddrs + rng := utils.TestRng() + addrs := utils.GenSliceN(rng, 4, makeAddr) + addrSet := map[types.NodeID]bool{} + for _, addr := range addrs { + addrSet[addr.NodeID] = true + } + m := makePeerManager(makeNodeID(rng), &RouterOptions{ + BootstrapPeers: addrs, + MaxOutbound: utils.Some(len(addrs)), + }) + dialing := utils.NewMutex(map[types.NodeID]bool{}) + err := scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { + for range 20 { + addrs, err := m.StartDial(ctx) + if err != nil { + return fmt.Errorf("m.StartDial(): %w", err) + } + addr := addrs[0] + if !addrSet[addr.NodeID] { + return fmt.Errorf("unexpected addr %v", addr) + } + for dialing := range dialing.Lock() { + if dialing[addr.NodeID] { + return fmt.Errorf("duplicate concurrent dials for %v", addr.NodeID) } - addr := makeAddr(rng) - *addrs = append(*addrs, addr) - addrsMap[addr] = (addrs == &persistentAddrs) + dialing[addr.NodeID] = true } - m := makePeerManager(makeNodeID(rng), &RouterOptions{ - BootstrapPeers: bootstrapAddrs, - PersistentPeers: persistentAddrs, - MaxConcurrentDials: utils.Some(tc.maxDials), - MaxConnected: utils.Some(tc.dials), - }) - dialing := utils.NewMutex(map[types.NodeID]bool{}) - err := scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { - for i := range tc.dials { - persistentPeer := i%2 == 0 - addr, err := m.StartDial(ctx, persistentPeer) - if err != nil { - return fmt.Errorf("StartDial(): %w", err) - } - if isPersistent, ok := addrsMap[addr]; !ok { - return fmt.Errorf("unexpected addr %v", addr) - } else if isPersistent != persistentPeer { - return fmt.Errorf("address does not match the requested type (peristent/non-persistent)") - } - for dialing := range dialing.Lock() { - if got := len(dialing); got > tc.maxDials { - return fmt.Errorf("dials limit exceeded: %v", got) - } - if dialing[addr.NodeID] { - return fmt.Errorf("duplicate concurrent dials for %v", addr.NodeID) - } - dialing[addr.NodeID] = true - } - s.Spawn(func() error { - if err := utils.Sleep(ctx, 50*time.Millisecond); err != nil { - return err - } - for dialing := range dialing.Lock() { - delete(dialing, addr.NodeID) - } - m.DialFailed(addr) - return nil - }) + s.Spawn(func() error { + if err := utils.Sleep(ctx, 50*time.Millisecond); err != nil { + return err + } + for dialing := range dialing.Lock() { + delete(dialing, addr.NodeID) } + m.DialFailed(addr.NodeID) return nil }) - if err != nil { - t.Fatal(err) - } - }) - } + } + return nil + }) + require.NoError(t, err) } // Test checking that all the provided addresses are eventually dialed. @@ -324,102 +250,79 @@ func TestPeerManager_DialRoundRobin(t *testing.T) { BootstrapPeers: bootstrapAddrs, PersistentPeers: persistentAddrs, }) - for i := 0; len(addrsMap) > 0; i++ { - persistentPeer := i%2 == 0 - addr, err := m.StartDial(ctx, persistentPeer) - require.NoError(t, err) + for len(addrsMap) > 0 { + addr := mustStartDial(t, ctx, m)[0] delete(addrsMap, addr) - m.DialFailed(addr) + m.DialFailed(addr.NodeID) } } // Test checking that MaxConnected limit applies to non-uncondional peers. func TestPeerManager_MaxConnected(t *testing.T) { ctx := t.Context() - maxConns := 5 rng := utils.TestRng() + const maxIn = 3 + const maxOut = 4 - // Generate some unconditional peers (persistent peers are also unconditional) - isUnconditional := map[types.NodeID]bool{} - var unconditionalPeers []types.NodeID - var persistentPeers []NodeAddress - for range 20 { - addr := makeAddr(rng) - isUnconditional[addr.NodeID] = true - if utils.GenBool(rng) { - unconditionalPeers = append(unconditionalPeers, addr.NodeID) - } else { - persistentPeers = append(persistentPeers, addr) - } - } - + unconditionalPeers := utils.GenSliceN(rng, 3, makeNodeID) + persistentPeers := utils.GenSliceN(rng, 2, makeAddr) + bootstrapPeers := utils.GenSliceN(rng, maxOut*2, makeAddr) m := makePeerManager(makeNodeID(rng), &RouterOptions{ PersistentPeers: persistentPeers, + BootstrapPeers: bootstrapPeers, UnconditionalPeers: unconditionalPeers, - MaxConnected: utils.Some(maxConns), + MaxInbound: utils.Some(maxIn), + MaxOutbound: utils.Some(maxOut), }) - // Construct connections to all unconditional peers and some regular peers. want := map[types.NodeID]bool{} - for i := range max(len(persistentPeers), len(unconditionalPeers), maxConns+5) { - // One connection to persistent peer. - if i < len(persistentPeers) { - addr, err := m.StartDial(ctx, true) - require.NoError(t, err) - conn := makeConnFor(rng, addr.NodeID, utils.GenBool(rng)) - require.NoError(t, m.Connected(conn)) - want[addr.NodeID] = true - } - - // One connection to unconditional peer. - if i < len(unconditionalPeers) { - id := unconditionalPeers[i] - conn := makeConnFor(rng, id, utils.GenBool(rng)) - require.NoError(t, m.Connected(conn)) - want[id] = true - } + t.Log("outbound connections up to MaxOutbound succeed") + for range maxOut { + addrs := mustStartDial(t, ctx, m) + conn := makeConnTo(addrs[0]) + require.NoError(t, m.Connected(conn)) + want[conn.Info().ID] = true + } - // One connection to regular peer. - conn := makeConn(rng, utils.GenBool(rng)) - wantErr := i >= maxConns - if err := m.Connected(conn); (err != nil) != wantErr { - t.Fatalf("m.Connected() = %v, wantErr = %v", err, wantErr) - } - if !wantErr { - want[conn.Info().ID] = true - } + t.Log("inbound regular connections are limited by MaxInbound") + for range maxIn { + conn := makeConn(rng, false) + require.NoError(t, m.Connected(conn)) + want[conn.Info().ID] = true + } + require.ErrorIs(t, m.Connected(makeConn(rng, false)), errTooManyPeers) - // Check if connection sets match. - if err := utils.TestDiff(want, justIDs(m.Conns())); err != nil { - t.Fatal(fmt.Errorf("m.Conns() %w", err)) - } + t.Log("unconditional peers bypass inbound limits") + for _, id := range unconditionalPeers { + conn := makeConnFor(rng, id, false) + require.NoError(t, m.Connected(conn)) + want[id] = true } + + require.NoError(t, utils.TestDiff(want, justIDs(m.Conns()))) } -// Test checking that concurrent dialing is limited by the number of connection slots. -// I.e. dialing + connected <= MaxConnected. +// Test checking that concurrent dialing is limited by the number of outbound slots. +// I.e. dialing + connected <= MaxOutbound. func TestPeerManager_MaxConnectedForDial(t *testing.T) { ctx := t.Context() - maxConns := 10 + maxOut := 10 rng := utils.TestRng() - var addrs []NodeAddress - for range maxConns { - addrs = append(addrs, makeAddr(rng)) - } + addrs := utils.GenSliceN(rng, maxOut*2, makeAddr) m := makePeerManager(makeNodeID(rng), &RouterOptions{ - BootstrapPeers: addrs, - MaxConcurrentDials: utils.Some(maxConns), - MaxConnected: utils.Some(maxConns), + BootstrapPeers: addrs, + MaxOutbound: utils.Some(maxOut), }) var conns []*fakeConn - for range maxConns { - conn := makeConn(rng, false) - conns = append(conns, conn) + for range maxOut { + addrs := mustStartDial(t, ctx, m) + conn := makeConnTo(addrs[0]) require.NoError(t, m.Connected(conn)) + conns = append(conns, conn) } var dialsAndConns atomic.Int64 - dialsAndConns.Store(int64(maxConns)) + dialsAndConns.Store(int64(maxOut)) err := scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { s.Spawn(func() error { // Gradually disconnect existing connections. @@ -432,15 +335,24 @@ func TestPeerManager_MaxConnectedForDial(t *testing.T) { } return nil }) - for range maxConns { + for range 5 * maxOut { // Dial peers as fast as possible. - _, err := m.StartDial(ctx, false) + addrs, err := m.StartDial(ctx) if err != nil { return fmt.Errorf("m.StartDial(): %w", err) } - if got := int(dialsAndConns.Add(1)); got > maxConns { - return fmt.Errorf("dials + conns = %v, want <= %v", got, maxConns) + if got := int(dialsAndConns.Add(1)); got > maxOut { + return fmt.Errorf("dials + conns = %v, want <= %v", got, maxOut) } + id := addrs[0].NodeID + s.Spawn(func() error { + if err := utils.Sleep(ctx, 20*time.Millisecond); err != nil { + return err + } + dialsAndConns.Add(-1) + m.DialFailed(id) + return nil + }) } return nil }) @@ -459,24 +371,22 @@ func TestPeerManager_MaxOutboundConnectionsForDialing(t *testing.T) { addrs = append(addrs, makeAddr(rng)) } m := makePeerManager(makeNodeID(rng), &RouterOptions{ - BootstrapPeers: addrs, - MaxPeers: utils.Some(len(addrs)), - MaxConcurrentDials: utils.Some(len(addrs)), - MaxConnected: utils.Some(len(addrs)), - MaxOutboundConnections: utils.Some(maxOutbound), + BootstrapPeers: addrs, + MaxOutbound: utils.Some(maxOutbound), }) var dialsAndConns atomic.Int64 const attempts = 20 err := scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { for i := range attempts { - addr, err := m.StartDial(ctx, false) + addrs, err := m.StartDial(ctx) if err != nil { return fmt.Errorf("m.StartDial(): %w", err) } if got := int(dialsAndConns.Add(1)); got > maxOutbound { return fmt.Errorf("dialing + outbound = %v, want <= %v", got, maxOutbound) } + addr := addrs[0] s.Spawn(func() error { if i%2 == 0 { if err := utils.Sleep(ctx, 10*time.Millisecond); err != nil { @@ -485,7 +395,7 @@ func TestPeerManager_MaxOutboundConnectionsForDialing(t *testing.T) { // Keep accounting in sync with slot release: decrement before // unblocking StartDial to avoid transient overcount in this test. dialsAndConns.Add(-1) - m.DialFailed(addr) + m.DialFailed(addr.NodeID) return nil } conn := makeConnTo(addr) @@ -521,15 +431,14 @@ func TestPeerManager_AcceptsInboundWhenOutboundFull(t *testing.T) { addrs = append(addrs, makeAddr(rng)) } m := makePeerManager(makeNodeID(rng), &RouterOptions{ - BootstrapPeers: addrs, - MaxConcurrentDials: utils.Some(maxConns), - MaxConnected: utils.Some(maxConns), - MaxOutboundConnections: utils.Some(maxOutbound), + BootstrapPeers: addrs, + MaxOutbound: utils.Some(maxOutbound), + MaxInbound: utils.Some(maxConns - maxOutbound), }) // Fill up outbound slots. for range maxOutbound { - addr := utils.OrPanic1(m.StartDial(ctx, false)) - require.NoError(t, m.Connected(makeConnTo(addr))) + addrs := mustStartDial(t, ctx, m) + require.NoError(t, m.Connected(makeConnTo(addrs[0]))) } require.Equal(t, maxOutbound, m.Conns().Len()) // Fill up inbound slots. @@ -543,82 +452,53 @@ func TestPeerManager_AcceptsInboundWhenOutboundFull(t *testing.T) { func TestPeerManager_Wake(t *testing.T) { ctx := t.Context() rng := utils.TestRng() - maxDials := 1 maxConns := 2 persistentAddr := makeAddr(rng) m := makePeerManager(makeNodeID(rng), &RouterOptions{ - PersistentPeers: utils.Slice(persistentAddr), - MaxConcurrentDials: utils.Some(maxDials), - MaxConnected: utils.Some(maxConns), + PersistentPeers: utils.Slice(persistentAddr), + MaxOutbound: utils.Some(maxConns), + MaxInbound: utils.Some(maxConns), }) // Adding an address while none are available should wake. - addrs := utils.Slice(makeAddr(rng)) require.True(t, utils.MonitorWatchUpdates(&m.inner, func() { - require.NoError(t, m.AddAddrs(addrs)) + require.NoError(t, m.PushPex(utils.Some(makeNodeID(rng)), utils.Slice(makeAddr(rng)))) })) - // Adding duplicate address should NOT wake. - require.False(t, utils.MonitorWatchUpdates(&m.inner, func() { - require.NoError(t, m.AddAddrs(addrs)) + t.Log("freeing a dial slot via DialFailed should wake") + addrs := mustStartDial(t, ctx, m) + require.True(t, utils.MonitorWatchUpdates(&m.inner, func() { + m.DialFailed(addrs[0].NodeID) })) - conns := map[bool]*fakeConn{} - for _, persistentPeer := range utils.Slice(false, true) { - // Freeing a dial slot via DialFailed should wake. - addr, err := m.StartDial(ctx, persistentPeer) - require.NoError(t, err) - require.True(t, utils.MonitorWatchUpdates(&m.inner, func() { - m.DialFailed(addr) - })) - // Freeing a dial slot via Connected should wake. - addr, err = m.StartDial(ctx, persistentPeer) - require.NoError(t, err) - conns[persistentPeer] = makeConnTo(addr) - require.True(t, utils.MonitorWatchUpdates(&m.inner, func() { - require.NoError(t, m.Connected(conns[persistentPeer])) - })) - } - // Fill all the connection slots. - for m.Conns().Len() < maxConns { - require.NoError(t, m.Connected(makeConn(rng, false))) - } - // Freeing a connection slot via Disconnected should wake (as long as there are addresses to dial), - // since we don't dial if connections are full (for non-persistent connections). + + t.Log("establishing a connection should wake") + addrs = mustStartDial(t, ctx, m) + conn := makeConnTo(addrs[0]) require.True(t, utils.MonitorWatchUpdates(&m.inner, func() { - m.Disconnected(conns[false]) + require.NoError(t, m.Connected(conn)) + })) + + t.Log("disconnecting should wake") + require.True(t, utils.MonitorWatchUpdates(&m.inner, func() { + m.Disconnected(conn) })) } -// Test checking that manager does not allow for duplicate connections, -// and that it closes the duplicates. +// Test checking that manager closes duplicate inbound connection. func TestPeerManager_DuplicateConn(t *testing.T) { rng := utils.TestRng() - var addrs []NodeAddress - var persistentAddrs []NodeAddress - for range 10 { - addr := makeAddr(rng) - addrs = append(addrs, addr) - if utils.GenBool(rng) { - persistentAddrs = append(persistentAddrs, addr) - } - } + ids := utils.GenSliceN(rng, 10, makeNodeID) m := makePeerManager(makeNodeID(rng), &RouterOptions{ - PersistentPeers: persistentAddrs, + MaxInbound: utils.Some(len(ids) + 5), }) - for _, addr := range addrs { - var active utils.Option[*fakeConn] + for _, id := range ids { + var active *fakeConn for range 5 { - conn := makeConnFor(rng, addr.NodeID, utils.GenBool(rng)) - toClose := utils.Some(conn) - // Peer manager has internal logic deciding whether a new connection should replace the old one (err == nil) or not. - // However at most 1 connection to each peer should be active at all times. - if err := m.Connected(conn); err == nil { - active, toClose = toClose, active - } - activeConn, ok := active.Get() - require.True(t, ok) - require.False(t, activeConn.Closed()) - if toCloseConn, ok := toClose.Get(); ok { - require.True(t, toCloseConn.Closed()) + conn := makeConnFor(rng, id, false) + require.NoError(t, m.Connected(conn)) + if active != nil { + require.True(t, active.Closed()) } + active = conn + require.False(t, active.Closed()) } } } @@ -628,11 +508,12 @@ func TestPeerManager_Subscribe(t *testing.T) { rng := utils.TestRng() maxConns := 60 m := makePeerManager(makeNodeID(rng), &RouterOptions{ - MaxConnected: utils.Some(maxConns), + MaxInbound: utils.Some(maxConns), + MaxOutbound: utils.Some(maxConns), }) t.Log("initialize with some connections") for range 5 { - require.NoError(t, m.Connected(makeConn(rng, utils.GenBool(rng)))) + require.NoError(t, m.Connected(makeConn(rng, false))) } t.Log("subscribe with preexisting connections") recv := m.Subscribe() @@ -642,7 +523,7 @@ func TestPeerManager_Subscribe(t *testing.T) { for range 10 { conns := m.Conns() if conns.Len() == 0 || (conns.Len() < maxConns && utils.GenBool(rng)) { - require.NoError(t, m.Connected(makeConn(rng, utils.GenBool(rng)))) + require.NoError(t, m.Connected(makeConn(rng, false))) } else { for _, conn := range conns.All() { m.Disconnected(conn) diff --git a/sei-tendermint/internal/p2p/peermanager_updates.go b/sei-tendermint/internal/p2p/peermanager_updates.go new file mode 100644 index 0000000000..b301cea4ed --- /dev/null +++ b/sei-tendermint/internal/p2p/peermanager_updates.go @@ -0,0 +1,60 @@ +package p2p + +import ( + "context" + + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" + "github.com/sei-protocol/sei-chain/sei-tendermint/types" +) + +// PeerUpdatesRecv. +// NOT THREAD-SAFE. +type peerUpdatesRecv[C peerConn] struct { + recv utils.AtomicRecv[connSet[C]] + last map[types.NodeID]struct{} +} + +// PeerUpdate is a peer update event sent via PeerUpdates. +type PeerUpdate struct { + NodeID types.NodeID + Status PeerStatus + Channels ChannelIDSet +} + +func (s *peerUpdatesRecv[C]) Recv(ctx context.Context) (PeerUpdate, error) { + var update PeerUpdate + _, err := s.recv.Wait(ctx, func(conns connSet[C]) bool { + // Check for disconnected peers. + for id := range s.last { + if _, ok := GetAny(conns, id); !ok { + delete(s.last, id) + update = PeerUpdate{ + NodeID: id, + Status: PeerStatusDown, + } + return true + } + } + // Check for connected peers. + for id, conn := range conns.All() { + if _, ok := s.last[id.NodeID]; !ok { + s.last[id.NodeID] = struct{}{} + update = PeerUpdate{ + NodeID: id.NodeID, + Status: PeerStatusUp, + Channels: conn.Info().Channels, + } + return true + } + } + return false + }) + return update, err +} + +func (m *peerManager[C]) Subscribe() *peerUpdatesRecv[C] { + return &peerUpdatesRecv[C]{ + recv: m.conns, + last: map[types.NodeID]struct{}{}, + } +} diff --git a/sei-tendermint/internal/p2p/pex/reactor.go b/sei-tendermint/internal/p2p/pex/reactor.go index 17e8a03cc9..7e75e6d816 100644 --- a/sei-tendermint/internal/p2p/pex/reactor.go +++ b/sei-tendermint/internal/p2p/pex/reactor.go @@ -203,7 +203,7 @@ func (r *Reactor) handlePexMessage(m p2p.RecvMsg[*pb.PexMessage]) error { } addrs = append(addrs, addr) } - if err := r.router.AddAddrs(addrs); err != nil { + if err := r.router.AddAddrs(m.From, addrs); err != nil { return fmt.Errorf("failed adding addresses from PEX response: %w", err) } return nil diff --git a/sei-tendermint/internal/p2p/pex/reactor_test.go b/sei-tendermint/internal/p2p/pex/reactor_test.go index ad909ed1f4..73ec46a596 100644 --- a/sei-tendermint/internal/p2p/pex/reactor_test.go +++ b/sei-tendermint/internal/p2p/pex/reactor_test.go @@ -16,7 +16,6 @@ import ( const ( testSendInterval = 500 * time.Millisecond - checkFrequency = 500 * time.Millisecond shortWait = 5 * time.Second ) @@ -96,29 +95,6 @@ func TestReactorSendsResponseWithoutRequest(t *testing.T) { testNet.listenForPeerDown(t, 1, 0) } -func TestReactorNeverSendsTooManyPeers(t *testing.T) { - t.Skip("This test needs updated https://github.com/tendermint/tendermint/issue/7634") - ctx := t.Context() - - testNet := setupNetwork(t, testOptions{ - MockNodes: 1, - TotalNodes: 2, - }) - testNet.connectAll(t) - testNet.start(ctx, t) - - testNet.addNodes(t, 110) - nodes := make([]int, 110) - for i := range nodes { - nodes[i] = i + 2 - } - testNet.addAddresses(t, 1, nodes) - - // first we check that even although we have 110 peers, honest pex reactors - // only send 100 (test if secondNode sends firstNode 100 addresses) - testNet.pingAndlistenForNAddresses(ctx, t, 1, 0, shortWait, 100) -} - func TestReactorErrorsOnReceivingTooManyPeers(t *testing.T) { ctx := t.Context() testNet := setupNetwork(t, testOptions{ @@ -157,77 +133,6 @@ func TestReactorErrorsOnReceivingTooManyPeers(t *testing.T) { testNet.listenForPeerDown(t, 1, 0) } -func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) { - ctx := t.Context() - - testNet := setupNetwork(t, testOptions{ - TotalNodes: 8, - MaxPeers: utils.Some(7), // total-1, because PeerManager doesn't count self - MaxConnected: utils.Some(2), // enough capacity to establish a connected graph - }) - testNet.network.ConnectCycle(ctx, t) // Saturate capacity by connecting nodes in a cycle. - testNet.start(ctx, t) - - t.Logf("test that peers are gossiped even if connection cap is reached") - for _, nodeID := range testNet.nodes { - node := testNet.network.Node(nodeID) - require.Eventually(t, func() bool { - return node.Router.PeerRatio() >= 0.9 - }, time.Minute, checkFrequency, - "peer ratio is: %f", node.Router.PeerRatio()) - } -} - -func TestReactorLargePeerStoreInASmallNetwork(t *testing.T) { - ctx := t.Context() - - testNet := setupNetwork(t, testOptions{ - TotalNodes: 3, - MaxPeers: utils.Some(25), - MaxConnected: utils.Some(25), - }) - testNet.seedAddrs(t) - testNet.start(ctx, t) - - // assert that all nodes add each other in the network - for idx := 0; idx < len(testNet.nodes); idx++ { - testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1) - } -} - -func TestReactorWithNetworkGrowth(t *testing.T) { - t.Skip("This test needs updated https://github.com/tendermint/tendermint/issue/7634") - ctx := t.Context() - - testNet := setupNetwork(t, testOptions{ - TotalNodes: 5, - }) - testNet.connectAll(t) - testNet.start(ctx, t) - - // assert that all nodes add each other in the network - for idx := 0; idx < len(testNet.nodes); idx++ { - testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1) - } - - // now we inject 10 more nodes - testNet.addNodes(t, 10) - for i := 5; i < testNet.total; i++ { - node := testNet.nodes[i] - require.NoError(t, testNet.reactors[node].Start(ctx)) - require.True(t, testNet.reactors[node].IsRunning()) - // we connect all new nodes to a single entry point and check that the - // node can distribute the addresses to all the others - testNet.connectPeers(ctx, t, 0, i) - } - require.Len(t, testNet.reactors, 15) - - // assert that all nodes add each other in the network - for idx := 0; idx < len(testNet.nodes); idx++ { - testNet.requireNumberOfPeers(t, idx, len(testNet.nodes)-1) - } -} - type reactorTestSuite struct { network *p2p.TestNetwork @@ -243,7 +148,6 @@ type reactorTestSuite struct { type testOptions struct { MockNodes int TotalNodes int - MaxPeers utils.Option[int] MaxConnected utils.Option[int] } @@ -256,7 +160,6 @@ func setupNetwork(t *testing.T, opts testOptions) *reactorTestSuite { networkOpts := p2p.TestNetworkOptions{ NumNodes: opts.TotalNodes, NodeOpts: p2p.TestNodeOptions{ - MaxPeers: opts.MaxPeers, MaxConnected: opts.MaxConnected, }, } @@ -297,26 +200,6 @@ func setupNetwork(t *testing.T, opts testOptions) *reactorTestSuite { return rts } -// connects node1 to node2 -func (r *reactorTestSuite) connectPeers(ctx context.Context, t *testing.T, sourceNode, targetNode int) { - t.Helper() - node1, node2 := r.checkNodePair(t, sourceNode, targetNode) - - n1 := r.network.Node(node1) - if n1 == nil { - require.Fail(t, "connectPeers: source node %v is not part of the testnet", node1) - return - } - - n2 := r.network.Node(node2) - if n2 == nil { - require.Fail(t, "connectPeers: target node %v is not part of the testnet", node2) - return - } - - n1.Connect(ctx, n2) -} - // starts up the pex reactors for each node func (r *reactorTestSuite) start(ctx context.Context, t *testing.T) { t.Helper() @@ -336,28 +219,6 @@ func (r *reactorTestSuite) start(ctx context.Context, t *testing.T) { }) } -func (r *reactorTestSuite) addNodes(t *testing.T, nodes int) { - t.Helper() - - for range nodes { - node := r.network.MakeNode(t, p2p.TestNodeOptions{ - MaxPeers: r.opts.MaxPeers, - MaxConnected: r.opts.MaxConnected, - }) - nodeID := node.NodeID - reactor, err := NewReactor( - node.Router, - testSendInterval, - ) - if err != nil { - t.Fatalf("NewReactor(): %v", err) - } - r.reactors[nodeID] = reactor - r.nodes = append(r.nodes, nodeID) - r.total++ - } -} - func (r *reactorTestSuite) listenFor( ctx context.Context, t *testing.T, @@ -397,40 +258,6 @@ func (r *reactorTestSuite) listenForRequest(ctx context.Context, t *testing.T, f r.listenFor(ctx, t, to, conditional, assertion, waitPeriod) } -func (r *reactorTestSuite) pingAndlistenForNAddresses( - ctx context.Context, - t *testing.T, - fromNode, toNode int, - waitPeriod time.Duration, - addresses int, -) { - t.Helper() - - to, from := r.checkNodePair(t, toNode, fromNode) - conditional := func(msg p2p.RecvMsg[*pb.PexMessage]) bool { - _, ok := msg.Message.Sum.(*pb.PexMessage_PexResponse) - return ok && msg.From == from - } - assertion := func(t *testing.T, msg p2p.RecvMsg[*pb.PexMessage]) bool { - m, ok := msg.Message.Sum.(*pb.PexMessage_PexResponse) - if !ok { - require.Fail(t, "expected pex response v2") - return true - } - // assert the same amount of addresses - if len(m.PexResponse.Addresses) == addresses { - return true - } - // if we didn't get the right length, we wait and send the - // request again - time.Sleep(300 * time.Millisecond) - r.sendRequest(t, toNode, fromNode) - return false - } - r.sendRequest(t, toNode, fromNode) - r.listenFor(ctx, t, to, conditional, assertion, waitPeriod) -} - func (r *reactorTestSuite) listenForResponse( ctx context.Context, t *testing.T, @@ -499,7 +326,7 @@ func (r *reactorTestSuite) seedAddrs(t *testing.T) { for i := range r.total - 1 { n1 := r.network.Node(r.nodes[i]) n2 := r.network.Node(r.nodes[i+1]) - require.NoError(t, n1.Router.AddAddrs(utils.Slice(n2.NodeAddress))) + require.NoError(t, n1.Router.AddAddrs(n2.NodeID, utils.Slice(n2.NodeAddress))) } } @@ -510,14 +337,6 @@ func (r *reactorTestSuite) checkNodePair(t *testing.T, first, second int) (types return r.nodes[first], r.nodes[second] } -func (r *reactorTestSuite) addAddresses(t *testing.T, node int, addrIDs []int) { - var addrs []p2p.NodeAddress - for _, i := range addrIDs { - addrs = append(addrs, r.network.Node(r.nodes[i]).NodeAddress) - } - require.NoError(t, r.network.Node(r.nodes[node]).Router.AddAddrs(addrs)) -} - func randomNodeID() types.NodeID { return types.NodeIDFromPubKey(ed25519.GenerateSecretKey().Public()) } diff --git a/sei-tendermint/internal/p2p/router.go b/sei-tendermint/internal/p2p/router.go index acc56b81e4..1ba13adda0 100644 --- a/sei-tendermint/internal/p2p/router.go +++ b/sei-tendermint/internal/p2p/router.go @@ -70,16 +70,26 @@ func NewRouter( if err := options.Validate(); err != nil { return nil, err } - selfID := privKey.Public().NodeID() - peerManager := newPeerManager[*ConnV2](selfID, options) - peerDB, err := newPeerDB(db, options.maxPeers()) + // 100 is arbitrary - we need some bound, otherwise peerDB will + // maintain the whole connection history without pruning. + // 100 is more or less an upper bound on how many concurrent + // connections sei-v2 can effectively handle currently. + peerDB, err := newPeerDB(db, min(options.maxOutbound(), 100)) if err != nil { return nil, fmt.Errorf("newPeerDB(): %w", err) } + var initialAddrs []NodeAddress for addr := range peerDB.All() { - if err := peerManager.AddAddrs(utils.Slice(addr)); err != nil { + if err := addr.Validate(); err != nil { logger.Error("peerDB: bad address", "addr", addr.String(), "err", err) } + initialAddrs = append(initialAddrs, addr) + } + selfID := privKey.Public().NodeID() + peerManager := newPeerManager[*ConnV2](selfID, options) + // initialAddrs will stay around util pex table fills the whole "extra" cache. + if err := peerManager.PushPex(utils.None[types.NodeID](), initialAddrs); err != nil { + return nil, fmt.Errorf("peerManager.PushPex(initialAddrs): %w", err) } router := &Router{ metrics: metrics, @@ -99,15 +109,6 @@ func NewRouter( return router, nil } -// PeerRatio returns the ratio of peer addresses stored to the maximum size. -func (r *Router) PeerRatio() float64 { - m, ok := r.options.MaxConnected.Get() - if !ok || m == 0 { - return 0 - } - return float64(r.peerManager.Conns().Len()) / float64(m) -} - func (r *Router) Endpoint() Endpoint { return r.options.Endpoint } @@ -117,8 +118,8 @@ func (r *Router) WaitForStart(ctx context.Context) error { return err } -func (r *Router) AddAddrs(addrs []NodeAddress) error { - return r.peerManager.AddAddrs(addrs) +func (r *Router) AddAddrs(sender types.NodeID, addrs []NodeAddress) error { + return r.peerManager.PushPex(utils.Some(sender), addrs) } func (r *Router) Subscribe() *PeerUpdatesRecv { @@ -126,27 +127,18 @@ func (r *Router) Subscribe() *PeerUpdatesRecv { } func (r *Router) Connected(id types.NodeID) bool { - _, ok := r.peerManager.Conns().Get(id) + _, ok := GetAny(r.peerManager.Conns(), id) return ok } -func (r *Router) State(id types.NodeID) string { - return r.peerManager.State(id) -} - -func (r *Router) Peers() []types.NodeID { - return r.peerManager.Peers() -} - -func (r *Router) Addresses(id types.NodeID) []NodeAddress { - return r.peerManager.Addresses(id) -} - func (r *Router) Advertise(maxAddrs int) []NodeAddress { addrs := r.peerManager.Advertise() return addrs[:min(len(addrs), maxAddrs)] } +func (r *Router) ConnInfos() []PeerConnInfo { return r.peerManager.ConnInfos() } +func (r *Router) AllAddrs() []NodeAddress { return r.peerManager.AllAddrs() } + // OpenChannel opens a new channel for the given message type. func OpenChannel[T gogoproto.Message](r *Router, chDesc ChannelDescriptor[T]) (*Channel[T], error) { for channels := range r.channels.Lock() { @@ -258,67 +250,79 @@ func (r *Router) acceptPeersRoutine(ctx context.Context) error { func (r *Router) dialPeersRoutine(ctx context.Context) error { return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { - limiter := rate.NewLimiter(r.options.maxDialRate(), r.options.maxDials()) - // Separate routine for dialing persistent/regular peers. - for _, persistentPeer := range utils.Slice(true, false) { + // Task feeding the upgrade permit to peer manager. + s.Spawn(func() error { + const upgradeInterval = time.Minute + for { + r.peerManager.PushUpgradePermit() + if err := utils.Sleep(ctx, upgradeInterval); err != nil { + return err + } + } + }) + const dialBurst = 10 + limiter := rate.NewLimiter(r.options.maxDialRate(), dialBurst) + for { + if err := limiter.Wait(ctx); err != nil { + return err + } + addrs, err := r.peerManager.StartDial(ctx) + if err != nil { + return err + } + id := addrs[0].NodeID s.Spawn(func() error { - for { - if err := limiter.Wait(ctx); err != nil { - return err - } - addr, err := r.peerManager.StartDial(ctx, persistentPeer) + err := scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { + tcpConn, err := r.dial(ctx, addrs) if err != nil { - return err + r.peerManager.DialFailed(id) + return fmt.Errorf("r.dial(): %w", err) } - s.Spawn(func() error { - err := scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { - tcpConn, err := r.dial(ctx, addr) - if err != nil { - r.peerManager.DialFailed(addr) - return fmt.Errorf("r.dial(): %w", err) - } - s.SpawnBg(func() error { return tcpConn.Run(ctx) }) - var hConn *handshakedConn - var info types.NodeInfo - err = utils.WithOptTimeout(ctx, r.options.HandshakeTimeout, func(ctx context.Context) error { - var err error - hConn, err = handshake(ctx, tcpConn, r.privKey, handshakeSpec{ - SelfAddr: r.options.SelfAddress, - SeiGigaConnection: false, - }) - if err != nil { - return fmt.Errorf("handshake(): %w", err) - } - if got := hConn.msg.NodeAuth.Key().NodeID(); got != addr.NodeID { - return fmt.Errorf("peer NodeID = %v, want %v", got, addr.NodeID) - } - if r.options.PexOnHandshake { - if err := r.AddAddrs(hConn.msg.PexAddrs); err != nil { - return fmt.Errorf("r.AddAddrs(): %w", err) - } - } - info, err = exchangeNodeInfo(ctx, hConn, *r.nodeInfoProducer()) - if err != nil { - return fmt.Errorf("exchangeNodeInfo(): %w", err) - } - return nil - }) - if err != nil { - r.peerManager.DialFailed(addr) - return err - } - if err := r.runConn(ctx, hConn, info, utils.Some(addr)); err != nil { - return fmt.Errorf("r.runConn(): %w", err) - } - return nil + s.SpawnBg(func() error { return tcpConn.Run(ctx) }) + var hConn *handshakedConn + var info types.NodeInfo + err = utils.WithOptTimeout(ctx, r.options.HandshakeTimeout, func(ctx context.Context) error { + var err error + hConn, err = handshake(ctx, tcpConn, r.privKey, handshakeSpec{ + SelfAddr: r.options.SelfAddress, + SeiGigaConnection: false, }) - logger.Error("r.runConn(outbound)", "addr", addr, "err", err) + if err != nil { + return fmt.Errorf("handshake(): %w", err) + } + if got := hConn.msg.NodeAuth.Key().NodeID(); got != id { + return fmt.Errorf("peer NodeID = %v, want %v", got, id) + } + if r.options.PexOnHandshake { + // Since the connection is not established yet, the handshake pex data + // will end up in a bounded cache, rather than main index. That's fine because + // we use the handshake pex data only for a local search, + // which is not supposed to be exhaustive. + if err := r.AddAddrs(id, hConn.msg.PexAddrs); err != nil { + return fmt.Errorf("r.AddAddrs(): %w", err) + } + } + info, err = exchangeNodeInfo(ctx, hConn, *r.nodeInfoProducer()) + if err != nil { + return fmt.Errorf("exchangeNodeInfo(): %w", err) + } return nil }) - } + if err != nil { + r.peerManager.DialFailed(id) + return err + } + dialAddrRaw := hConn.conn.RemoteAddr() + dialAddr := NodeAddress{NodeID: id, Hostname: dialAddrRaw.Addr().String(), Port: dialAddrRaw.Port()} + if err := r.runConn(ctx, hConn, info, utils.Some(dialAddr)); err != nil { + return fmt.Errorf("r.runConn(): %w", err) + } + return nil + }) + logger.Error("r.runConn(outbound)", "id", id, "err", err) + return nil }) } - return nil }) } @@ -335,7 +339,7 @@ func (r *Router) storePeersRoutine(ctx context.Context) error { ctrl.Updated() } for _, conn := range conns.All() { - if addr, ok := conn.dialAddr.Get(); ok { + if addr, ok := conn.DialedAddr.Get(); ok { if err := db.Insert(addr, now); err != nil { return fmt.Errorf("db.Insert(): %w", err) } @@ -368,8 +372,8 @@ func (r *Router) IsBlockSyncPeer(id types.NodeID) bool { return r.peerManager.IsBlockSyncPeer(id) } -// dialPeer connects to a peer by dialing it. -func (r *Router) dial(ctx context.Context, addr NodeAddress) (_ tcp.Conn, err error) { +// dial connects to a peer by dialing it. +func (r *Router) dial(ctx context.Context, addrs []NodeAddress) (_ tcp.Conn, err error) { defer func() { success := "true" if err != nil { @@ -384,31 +388,35 @@ func (r *Router) dial(ctx context.Context, addr NodeAddress) (_ tcp.Conn, err er defer cancel() } - logger.Debug("dialing peer address", "peer", addr) - endpoints, err := addr.Resolve(resolveCtx) - if err != nil { - return tcp.Conn{}, fmt.Errorf("address.Resolve(): %w", err) - } - if len(endpoints) == 0 { - return tcp.Conn{}, fmt.Errorf("address %q did not resolve to any endpoints", addr) - } - - for _, endpoint := range endpoints { - dialCtx := ctx - if d, ok := r.options.DialTimeout.Get(); ok { - var cancel context.CancelFunc - dialCtx, cancel = context.WithTimeout(dialCtx, d) - defer cancel() - } - if err := endpoint.Validate(); err != nil { - return tcp.Conn{}, err + endpointSet := map[Endpoint]struct{}{} + // Resolve addresses in parallel. No errors expected, + // just resolve as many addresses as possible within timeout. + utils.OrPanic(scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { + endpointSet := utils.NewMutex(endpointSet) + for _, addr := range addrs { + s.Spawn(func() error { + endpoints, err := addr.Resolve(resolveCtx) + if err != nil { + logger.Info("address.Resolve() failed", "addr", addr, "err", err) + return nil + } + if len(endpoints) > 0 { + for endpointSet := range endpointSet.Lock() { + endpointSet[endpoints[0]] = struct{}{} + } + } + return nil + }) } - c, err := tcp.Dial(dialCtx, endpoint.AddrPort) + return nil + })) + for endpoint := range endpointSet { + c, err := utils.WithOptTimeout1(ctx, r.options.DialTimeout, func(ctx context.Context) (tcp.Conn, error) { + return tcp.Dial(ctx, endpoint.AddrPort) + }) if err != nil { - logger.Debug("failed to dial endpoint", "peer", addr.NodeID, "endpoint", endpoint, "err", err) continue } - logger.Debug("dialed peer", "peer", addr.NodeID, "endpoint", endpoint) return c, nil } return tcp.Conn{}, errors.New("all endpoints failed") diff --git a/sei-tendermint/internal/p2p/router_test.go b/sei-tendermint/internal/p2p/router_test.go index 91e483dc5c..0c4b929dd5 100644 --- a/sei-tendermint/internal/p2p/router_test.go +++ b/sei-tendermint/internal/p2p/router_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/netip" + "slices" "strings" "sync/atomic" "testing" @@ -96,7 +97,7 @@ func TestRouter_Network(t *testing.T) { RequireReceiveUnordered(t, channel, want) t.Logf("We report a fatal error and expect the peer to get disconnected") - conn, ok := local.Router.peerManager.Conns().Get(peers[0].NodeID) + conn, ok := GetAny(local.Router.peerManager.Conns(), peers[0].NodeID) require.True(t, ok) local.Router.Evict(peers[0].NodeID, errors.New("boom")) local.WaitForDisconnect(ctx, conn) @@ -217,7 +218,7 @@ func TestRouter_SendError(t *testing.T) { t.Logf("Erroring b should cause it to be disconnected.") nodes := network.Nodes() - conn, ok := nodes[0].Router.peerManager.Conns().Get(nodes[1].NodeID) + conn, ok := GetAny(nodes[0].Router.peerManager.Conns(), nodes[1].NodeID) require.True(t, ok) nodes[0].Router.Evict(nodes[1].NodeID, errors.New("boom")) nodes[0].WaitForDisconnect(ctx, conn) @@ -235,15 +236,18 @@ func TestRouter_PexOnHandshake_DialerDisabled(t *testing.T) { newNode := network.MakeNode(t, TestNodeOptions{PexOnHandshake: false}) newNode.Connect(ctx, nodes[0]) - // newNode should not learn about nodes[1] during handshake. - require.Empty(t, newNode.Router.Addresses(nodes[1].NodeID)) + // newNode should NOT learn about nodes[1] during handshake. + require.True(t, slices.Index( + newNode.Router.peerManager.AllAddrs(), + nodes[1].NodeAddress, + ) == -1) } func TestRouter_PexOnHandshake_ListenerPeersPropagated(t *testing.T) { ctx := t.Context() t.Log("Create a network with 3 nodes.") - network := MakeTestNetwork(t, TestNetworkOptions{NumNodes: 3, NodeOpts: TestNodeOptions{PexOnHandshake: true}}) + network := MakeTestNetwork(t, TestNetworkOptions{NumNodes: 3, NodeOpts: TestNodeOptions{PexOnHandshake: true, SelfAddress: true}}) nodes := network.Nodes() t.Log("Connect nodes 1,2 to 0.") @@ -272,11 +276,10 @@ func makeRouterOptions() *RouterOptions { c := conn.DefaultMConnConfig() c.PongTimeout = time.Hour return &RouterOptions{ - MaxAcceptRate: utils.Some(rate.Inf), - MaxDialRate: utils.Some(rate.Inf), - MaxConcurrentDials: utils.Some(100), - Endpoint: Endpoint{tcp.TestReserveAddr()}, - Connection: c, + MaxAcceptRate: utils.Some(rate.Inf), + MaxDialRate: utils.Some(rate.Inf), + Endpoint: Endpoint{tcp.TestReserveAddr()}, + Connection: c, // 0 to allow immediate retries from peers. IncomingConnectionWindow: utils.Some(time.Duration(0)), // Large timeouts to avoid flaky happy path tests @@ -331,7 +334,7 @@ func TestRouter_FilterByIP(t *testing.T) { addr := TestAddress(r) if err := scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { - tcpConn, err := r2.dial(ctx, addr) + tcpConn, err := r2.dial(ctx, utils.Slice(addr)) if err != nil { return fmt.Errorf("peerTransport.dial(): %w", err) } @@ -353,7 +356,7 @@ func TestRouter_FilterByIP(t *testing.T) { t.Logf("Connection should fail during handshake.") r2 = makeRouter(rng) return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { - tcpConn, err := r2.dial(ctx, addr) + tcpConn, err := r2.dial(ctx, utils.Slice(addr)) if err != nil { return fmt.Errorf("peerTransport.dial(): %w", err) } @@ -474,7 +477,7 @@ func TestRouter_AcceptPeers_Parallel(t *testing.T) { for range 10 { x := makeRouter(rng) peers = append(peers, x) - conn, err := x.dial(ctx, addr) + conn, err := x.dial(ctx, utils.Slice(addr)) if err != nil { return fmt.Errorf("x.dial(): %w", err) } @@ -518,7 +521,8 @@ func TestRouter_dialPeer_Retry(t *testing.T) { defer listener.Close() t.Log("Populate peer manager.") - if err := r.AddAddrs(utils.Slice(TestAddress(x))); err != nil { + addr := TestAddress(x) + if err := r.AddAddrs(addr.NodeID, utils.Slice(addr)); err != nil { return fmt.Errorf("r.AddAddrs(): %w", err) } @@ -579,7 +583,7 @@ func TestRouter_dialPeer_Reject(t *testing.T) { return fmt.Errorf("tcp.Listen(): %w", err) } defer listener.Close() - if err := r.AddAddrs(utils.Slice(Endpoint{addr}.NodeAddress(tc.dialID))); err != nil { + if err := r.AddAddrs(tc.dialID, utils.Slice(Endpoint{addr}.NodeAddress(tc.dialID))); err != nil { return fmt.Errorf("r.AddAddrs(): %w", err) } tcpConn, err := listener.AcceptOrClose(ctx) @@ -600,6 +604,48 @@ func TestRouter_dialPeer_Reject(t *testing.T) { } } +func TestRouter_dial_TriesAllAddresses(t *testing.T) { + rng := utils.TestRng() + ctx := t.Context() + + // Address dialing order is not deterministic, so we run the test multiple times to + // minimize the false-positive probability (situation where the correct address is attempted first). + for range 10 { + err := scope.Run(ctx, func(ctx context.Context, s scope.Scope) error { + // Prepare addresses + addr := tcp.TestReserveAddr() + id := makeNodeID(rng) + addrs := utils.Slice(Endpoint{addr}.NodeAddress(id)) + for range 10 { + addrs = append(addrs, makeAddrFor(rng, id)) + } + utils.Shuffle(rng, addrs) + + // Create the dialing router. + listener := utils.OrPanic1(tcp.Listen(addr)) + s.Spawn(func() error { + conn, err := listener.AcceptOrClose(ctx) + if err != nil { + return err + } + conn.Close() + return nil + }) + r := makeRouter(rng) + s.SpawnBg(func() error { return utils.IgnoreCancel(r.Run(ctx)) }) + conn, err := r.dial(ctx, addrs) + if err != nil { + return fmt.Errorf("r.dial(): %w", err) + } + conn.Close() + return nil + }) + if err != nil { + t.Fatal(err) + } + } +} + func TestRouter_dialPeers_Parallel(t *testing.T) { ctx := t.Context() rng := utils.TestRng() @@ -625,7 +671,7 @@ func TestRouter_dialPeers_Parallel(t *testing.T) { return fmt.Errorf("tcp.Listen(): %w", err) } defer listener.Close() - if err := r.AddAddrs(utils.Slice(TestAddress(peer))); err != nil { + if err := r.AddAddrs(TestAddress(peer).NodeID, utils.Slice(TestAddress(peer))); err != nil { return fmt.Errorf("r.AddAddrs(): %w", err) } conn, err := listener.AcceptOrClose(ctx) @@ -721,10 +767,10 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { } addr := TestAddress(r) - tcpConn, err := x.dial(ctx, addr) - if err != nil { - return fmt.Errorf("dial(): %w", err) - } + utils.OrPanic(x.AddAddrs(addr.NodeID, utils.Slice(addr))) + addrs := utils.OrPanic1(x.peerManager.StartDial(ctx)) + utils.OrPanic(utils.TestDiff(utils.Slice(addr), addrs)) + tcpConn := utils.OrPanic1(x.dial(ctx, addrs)) s.SpawnBg(func() error { return utils.IgnoreAfterCancel(ctx, tcpConn.Run(ctx)) }) hConn, info, err := x.handshakeV2(ctx, tcpConn, utils.Some(addr)) if err != nil { @@ -791,7 +837,7 @@ func TestRouter_PeerDB(t *testing.T) { s.SpawnBg(func() error { return utils.IgnoreCancel(r2.Run(ctx)) }) t.Logf("wait for the second node to connect to first node and store its address in the peerdb") - utils.OrPanic(r2.AddAddrs(utils.Slice(addr))) + utils.OrPanic(r2.AddAddrs(info.NodeID, utils.Slice(addr))) for db, ctrl := range r2.peerDB.Lock() { if err := ctrl.WaitUntil(ctx, func() bool { for got := range db.All() { @@ -822,7 +868,7 @@ func TestRouter_PeerDB(t *testing.T) { t.Logf("wait for the second node to retrieve address of the first node from peerdb and connect to the first node") s.SpawnBg(func() error { return utils.IgnoreCancel(r2.Run(ctx)) }) if _, err := r2.peerManager.conns.Wait(ctx, func(conns ConnSet) bool { - _, ok := conns.Get(addr.NodeID) + _, ok := GetAny(conns, addr.NodeID) return ok }); err != nil { return err diff --git a/sei-tendermint/internal/p2p/routeroptions.go b/sei-tendermint/internal/p2p/routeroptions.go index ff168351e9..ad5dc0aa68 100644 --- a/sei-tendermint/internal/p2p/routeroptions.go +++ b/sei-tendermint/internal/p2p/routeroptions.go @@ -113,25 +113,15 @@ type RouterOptions struct { // consider private and never gossip. PrivatePeers []types.NodeID - // MaxPeers is the maximum number of peers to track address information about. - // When exceeded, unreachable peers will be deleted. - // Defaults to 128. - MaxPeers utils.Option[int] - - // MaxConnected is the maximum number of connected peers (inbound and outbound). + // MaxInbound is the maximum number of inbound connections. // Persistent and unconditional connections are not counted towards this limit. - // Defaults to 64. - MaxConnected utils.Option[int] - - // MaxOutboundConnections is the maximum number of outbound connections. - // Note that MaxConnected is still respected and the actual number of outbound connections - // is bounded by min(MaxConnected,MaxOutboundConnections) - // Defaults to 10. - MaxOutboundConnections utils.Option[int] + // Defaults to 40. + MaxInbound utils.Option[int] - // MaxConcurrentDials limits the number of concurrent outbound connection handshakes. - // Defaults to 10. - MaxConcurrentDials utils.Option[int] + // MaxOutbound is the maximum number of outbound connections. + // Persistent and unconditional connections are not counted towards this limit. + // Defaults to 20. + MaxOutbound utils.Option[int] // MaxConncurrentAccepts limites the number of concurrent inbound connection handshakes. // Defaults to 10. @@ -145,16 +135,9 @@ type RouterOptions struct { PeerStoreInterval utils.Option[time.Duration] } -func (o *RouterOptions) maxDials() int { return o.MaxConcurrentDials.Or(10) } -func (o *RouterOptions) maxAccepts() int { return o.MaxConcurrentAccepts.Or(10) } -func (o *RouterOptions) maxConns() int { return o.MaxConnected.Or(64) } -func (o *RouterOptions) maxOutboundConns() int { - return min(o.maxConns(), o.MaxOutboundConnections.Or(10)) -} - -func (o *RouterOptions) maxPeers() int { - return o.MaxPeers.Or(128) -} +func (o *RouterOptions) maxAccepts() int { return o.MaxConcurrentAccepts.Or(10) } +func (o *RouterOptions) maxOutbound() int { return o.MaxOutbound.Or(20) } +func (o *RouterOptions) maxInbound() int { return o.MaxInbound.Or(40) } func (o *RouterOptions) peerStoreInterval() time.Duration { return o.PeerStoreInterval.Or(10 * time.Second) diff --git a/sei-tendermint/internal/p2p/testonly.go b/sei-tendermint/internal/p2p/testonly.go index 9450ab3ccb..6d03cdf0ef 100644 --- a/sei-tendermint/internal/p2p/testonly.go +++ b/sei-tendermint/internal/p2p/testonly.go @@ -48,9 +48,9 @@ type TestNetworkOptions struct { } type TestNodeOptions struct { - MaxPeers utils.Option[int] MaxConnected utils.Option[int] PexOnHandshake bool + SelfAddress bool } func TestAddress(r *Router) NodeAddress { @@ -93,7 +93,7 @@ func (n *TestNetwork) ConnectCycle(ctx context.Context, t *testing.T) { nodes := n.Nodes() N := len(nodes) for i := range nodes { - err := nodes[i].Router.peerManager.AddAddrs(utils.Slice(nodes[(i+1)%len(nodes)].NodeAddress)) + err := nodes[i].Router.peerManager.PushPex(utils.Some(nodes[i].NodeID), utils.Slice(nodes[(i+1)%len(nodes)].NodeAddress)) require.NoError(t, err) } for i := range n.Nodes() { @@ -108,7 +108,7 @@ func (n *TestNetwork) Start(t *testing.T) { // Populate peer managers. for i, source := range nodes { for _, target := range nodes[i+1:] { // nodes 0: + // MaxConnections defaults to 64 + maxConns := 64 + if cfg.P2P.MaxConnections > 0 { maxConns = utils.Clamp[int](cfg.P2P.MaxConnections) - default: - maxConns = 64 } - options.MaxConcurrentAccepts = utils.Some(maxConns) - options.MaxConnected = utils.Some(maxConns) - options.MaxPeers = utils.Some(2 * maxConns) + // MaxOutbound defaults to 20, unless MaxConnections<40, + // then it defaults to half of the maxConnections. + maxOutbound := min(20, (maxConns+1)/2) + if m := cfg.P2P.MaxOutboundConnections; m != nil { + maxOutbound = min(maxConns, utils.Clamp[int](*m)) + } + // MaxInbound is simply MaxConnections - MaxOutbound, + // because now we have totally separate inbound and outbound connection pools. + // TODO(gprusak): eventually we should migrate configs to specify + // MaxInbound and MaxOutbound explicitly, rather than doing the computation above. + maxInbound := maxConns - maxOutbound + options.MaxOutbound = utils.Some(maxOutbound) + options.MaxConcurrentAccepts = utils.Some(maxInbound) + options.MaxInbound = utils.Some(maxInbound) options.PrivatePeers = privatePeerIDs for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " ") { diff --git a/sei-tendermint/types/validation.go b/sei-tendermint/types/validation.go index 8c728c2369..c76627fe9e 100644 --- a/sei-tendermint/types/validation.go +++ b/sei-tendermint/types/validation.go @@ -240,7 +240,7 @@ func verifyCommitBatch( // attempt to verify the batch. if err := bv.Verify(); err != nil { - err := utils.ErrorAs[crypto.ErrBadSig](err).OrPanic() + err := utils.ErrorAs[crypto.ErrBadSig](err).OrPanic("unexpected error type") // go back from the batch index to the commit.Signatures index idx := batchSigIdxs[err.Idx] sig := commit.Signatures[idx]