diff --git a/libs/strings/string.go b/libs/strings/string.go index 466759233..b09c00063 100644 --- a/libs/strings/string.go +++ b/libs/strings/string.go @@ -5,6 +5,29 @@ import ( "strings" ) +// SplitAndTrimEmpty slices s into all subslices separated by sep and returns a +// slice of the string s with all leading and trailing Unicode code points +// contained in cutset removed. If sep is empty, SplitAndTrim splits after each +// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of +// -1. also filter out empty strings, only return non-empty strings. +func SplitAndTrimEmpty(s, sep, cutset string) []string { + if s == "" { + return []string{} + } + + spl := strings.Split(s, sep) + nonEmptyStrings := make([]string, 0, len(spl)) + + for i := 0; i < len(spl); i++ { + element := strings.Trim(spl[i], cutset) + if element != "" { + nonEmptyStrings = append(nonEmptyStrings, element) + } + } + + return nonEmptyStrings +} + // StringInSlice returns true if a is found the list. func StringInSlice(a string, list []string) bool { for _, b := range list { diff --git a/libs/strings/string_test.go b/libs/strings/string_test.go index 1ec7b0d56..c56116393 100644 --- a/libs/strings/string_test.go +++ b/libs/strings/string_test.go @@ -4,15 +4,32 @@ import ( "testing" "github.com/stretchr/testify/require" - - "github.com/stretchr/testify/assert" ) +func TestSplitAndTrimEmpty(t *testing.T) { + testCases := []struct { + s string + sep string + cutset string + expected []string + }{ + {"a,b,c", ",", " ", []string{"a", "b", "c"}}, + {" a , b , c ", ",", " ", []string{"a", "b", "c"}}, + {" a, b, c ", ",", " ", []string{"a", "b", "c"}}, + {" a, ", ",", " ", []string{"a"}}, + {" ", ",", " ", []string{}}, + } + + for _, tc := range testCases { + require.Equal(t, tc.expected, SplitAndTrimEmpty(tc.s, tc.sep, tc.cutset), "%s", tc.s) + } +} + func TestStringInSlice(t *testing.T) { - assert.True(t, StringInSlice("a", []string{"a", "b", "c"})) - assert.False(t, StringInSlice("d", []string{"a", "b", "c"})) - assert.True(t, StringInSlice("", []string{""})) - assert.False(t, StringInSlice("", []string{})) + require.True(t, StringInSlice("a", []string{"a", "b", "c"})) + require.False(t, StringInSlice("d", []string{"a", "b", "c"})) + require.True(t, StringInSlice("", []string{""})) + require.False(t, StringInSlice("", []string{})) } func TestIsASCIIText(t *testing.T) { @@ -20,22 +37,22 @@ func TestIsASCIIText(t *testing.T) { "", "\xC2", "\xC2\xA2", "\xFF", "\x80", "\xF0", "\n", "\t", } for _, v := range notASCIIText { - assert.False(t, IsASCIIText(v), "%q is not ascii-text", v) + require.False(t, IsASCIIText(v), "%q is not ascii-text", v) } asciiText := []string{ " ", ".", "x", "$", "_", "abcdefg;", "-", "0x00", "0", "123", } for _, v := range asciiText { - assert.True(t, IsASCIIText(v), "%q is ascii-text", v) + require.True(t, IsASCIIText(v), "%q is ascii-text", v) } } func TestASCIITrim(t *testing.T) { - assert.Equal(t, ASCIITrim(" "), "") - assert.Equal(t, ASCIITrim(" a"), "a") - assert.Equal(t, ASCIITrim("a "), "a") - assert.Equal(t, ASCIITrim(" a "), "a") - assert.Panics(t, func() { ASCIITrim("\xC2\xA2") }) + require.Equal(t, ASCIITrim(" "), "") + require.Equal(t, ASCIITrim(" a"), "a") + require.Equal(t, ASCIITrim("a "), "a") + require.Equal(t, ASCIITrim(" a "), "a") + require.Panics(t, func() { ASCIITrim("\xC2\xA2") }) } func TestStringSliceEqual(t *testing.T) { diff --git a/node/node.go b/node/node.go index 57416ca18..605578d67 100644 --- a/node/node.go +++ b/node/node.go @@ -29,6 +29,7 @@ import ( tmnet "github.com/tendermint/tendermint/libs/net" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/libs/strings" "github.com/tendermint/tendermint/light" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" @@ -574,7 +575,7 @@ func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{}, p2p.MConnTransportOptions{ MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers + - len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")), + len(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")), ), }, ) @@ -584,8 +585,11 @@ func createPeerManager( config *cfg.Config, dbProvider DBProvider, p2pLogger log.Logger, - nodeID p2p.NodeID) (*p2p.PeerManager, error) { + nodeID p2p.NodeID, +) (*p2p.PeerManager, error) { + var maxConns uint16 + switch { case config.P2P.MaxConnections > 0: maxConns = config.P2P.MaxConnections @@ -607,6 +611,11 @@ func createPeerManager( maxConns = 64 } + privatePeerIDs := make(map[p2p.NodeID]struct{}) + for _, id := range strings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") { + privatePeerIDs[p2p.NodeID(id)] = struct{}{} + } + options := p2p.PeerManagerOptions{ MaxConnected: maxConns, MaxConnectedUpgrade: 4, @@ -615,10 +624,11 @@ func createPeerManager( MaxRetryTime: 8 * time.Hour, MaxRetryTimePersistent: 5 * time.Minute, RetryTimeJitter: 3 * time.Second, + PrivatePeers: privatePeerIDs, } peers := []p2p.NodeAddress{} - for _, p := range splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") { + for _, p := range strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") { address, err := p2p.ParseNodeAddress(p) if err != nil { return nil, fmt.Errorf("invalid peer address %q: %w", p, err) @@ -628,7 +638,7 @@ func createPeerManager( options.PersistentPeers = append(options.PersistentPeers, address.NodeID) } - for _, p := range splitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") { + for _, p := range strings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") { address, err := p2p.ParseNodeAddress(p) if err != nil { return nil, fmt.Errorf("invalid peer address %q: %w", p, err) @@ -640,6 +650,7 @@ func createPeerManager( if err != nil { return nil, err } + peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options) if err != nil { return nil, fmt.Errorf("failed to create peer manager: %w", err) @@ -797,7 +808,7 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, sw *p2p.Switch, logger log.Logger) *pex.Reactor { reactorConfig := &pex.ReactorConfig{ - Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "), + Seeds: strings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "), SeedMode: config.Mode == cfg.ModeSeed, // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000 // blocks assuming 10s blocks ~ 28 hours. @@ -918,12 +929,12 @@ func NewSeedNode(config *cfg.Config, nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger, ) - err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) + err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) if err != nil { return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err) } - err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) + err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) if err != nil { return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) } @@ -1204,12 +1215,12 @@ func NewNode(config *cfg.Config, stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, ) - err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) + err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) if err != nil { return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err) } - err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) + err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) if err != nil { return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) } @@ -1305,7 +1316,7 @@ func (n *Node) OnStart() error { } // Add private IDs to addrbook to block those peers being added - n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " ")) + n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " ")) // Start the RPC server before the P2P server // so we can eg. receive txs for the first block @@ -1380,7 +1391,7 @@ func (n *Node) OnStart() error { } // Always connect to persistent peers - err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) + err = n.sw.DialPeersAsync(strings.SplitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) if err != nil { return fmt.Errorf("could not dial peers from persistent-peers field: %w", err) } @@ -1530,7 +1541,7 @@ func (n *Node) startRPC() ([]net.Listener, error) { return nil, err } - listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ") + listenAddrs := strings.SplitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ") if n.config.RPC.Unsafe { rpccore.AddUnsafeRoutes() diff --git a/node/node_test.go b/node/node_test.go index 21830804d..43bba0099 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -77,25 +77,6 @@ func TestNodeStartStop(t *testing.T) { } } -func TestSplitAndTrimEmpty(t *testing.T) { - testCases := []struct { - s string - sep string - cutset string - expected []string - }{ - {"a,b,c", ",", " ", []string{"a", "b", "c"}}, - {" a , b , c ", ",", " ", []string{"a", "b", "c"}}, - {" a, b, c ", ",", " ", []string{"a", "b", "c"}}, - {" a, ", ",", " ", []string{"a"}}, - {" ", ",", " ", []string{}}, - } - - for _, tc := range testCases { - assert.Equal(t, tc.expected, splitAndTrimEmpty(tc.s, tc.sep, tc.cutset), "%s", tc.s) - } -} - func TestNodeDelayedStart(t *testing.T) { config := cfg.ResetTestRoot("node_delayed_start_test") defer os.RemoveAll(config.RootDir) diff --git a/node/utils.go b/node/utils.go deleted file mode 100644 index a96dd457f..000000000 --- a/node/utils.go +++ /dev/null @@ -1,26 +0,0 @@ -package node - -import ( - "strings" -) - -// splitAndTrimEmpty slices s into all subslices separated by sep and returns a -// slice of the string s with all leading and trailing Unicode code points -// contained in cutset removed. If sep is empty, SplitAndTrim splits after each -// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of -// -1. also filter out empty strings, only return non-empty strings. -func splitAndTrimEmpty(s, sep, cutset string) []string { - if s == "" { - return []string{} - } - - spl := strings.Split(s, sep) - nonEmptyStrings := make([]string, 0, len(spl)) - for i := 0; i < len(spl); i++ { - element := strings.Trim(spl[i], cutset) - if element != "" { - nonEmptyStrings = append(nonEmptyStrings, element) - } - } - return nonEmptyStrings -} diff --git a/p2p/peermanager.go b/p2p/peermanager.go index cb6d9272a..613f23f93 100644 --- a/p2p/peermanager.go +++ b/p2p/peermanager.go @@ -149,6 +149,10 @@ type PeerManagerOptions struct { // for testing. A score of 0 is ignored. PeerScores map[NodeID]PeerScore + // PrivatePeerIDs defines a set of NodeID objects which the PEX reactor will + // consider private and never gossip. + PrivatePeers map[NodeID]struct{} + // persistentPeers provides fast PersistentPeers lookups. It is built // by optimize(). persistentPeers map[NodeID]bool @@ -161,6 +165,13 @@ func (o *PeerManagerOptions) Validate() error { return fmt.Errorf("invalid PersistentPeer ID %q: %w", id, err) } } + + for id := range o.PrivatePeers { + if err := id.Validate(); err != nil { + return fmt.Errorf("invalid private peer ID %q: %w", id, err) + } + } + if o.MaxConnected > 0 && len(o.PersistentPeers) > int(o.MaxConnected) { return fmt.Errorf("number of persistent peers %v can't exceed MaxConnected %v", len(o.PersistentPeers), o.MaxConnected) @@ -182,6 +193,7 @@ func (o *PeerManagerOptions) Validate() error { o.MinRetryTime, o.MaxRetryTime) } } + if o.MaxRetryTimePersistent > 0 { if o.MinRetryTime == 0 { return errors.New("can't set MaxRetryTimePersistent without MinRetryTime") @@ -285,6 +297,7 @@ func NewPeerManager(selfID NodeID, peerDB dbm.DB, options PeerManagerOptions) (* if err := options.Validate(); err != nil { return nil, err } + options.optimize() store, err := newPeerStore(peerDB) @@ -795,13 +808,19 @@ func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []NodeAddress { if peer.ID == peerID { continue } - for _, addressInfo := range peer.AddressInfo { + + for nodeAddr, addressInfo := range peer.AddressInfo { if len(addresses) >= int(limit) { return addresses } - addresses = append(addresses, addressInfo.Address) + + // only add non-private NodeIDs + if _, ok := m.options.PrivatePeers[nodeAddr.NodeID]; !ok { + addresses = append(addresses, addressInfo.Address) + } } } + return addresses } diff --git a/p2p/pex/reactor.go b/p2p/pex/reactor.go index bcfe3b7f3..ef8fd2ec4 100644 --- a/p2p/pex/reactor.go +++ b/p2p/pex/reactor.go @@ -65,6 +65,7 @@ type ReactorV2 struct { availablePeers *clist.CList mtx sync.RWMutex + // requestsSent keeps track of which peers the PEX reactor has sent requests // to. This prevents the sending of spurious responses. // NOTE: If a node never responds, they will remain in this map until a @@ -83,6 +84,7 @@ type ReactorV2 struct { // extrapolate the size of the network newPeers uint32 totalPeers uint32 + // discoveryRatio is the inverse ratio of new peers to old peers squared. // This is multiplied by the minimum duration to calculate how long to wait // between each request. @@ -96,6 +98,7 @@ func NewReactorV2( pexCh *p2p.Channel, peerUpdates *p2p.PeerUpdates, ) *ReactorV2 { + r := &ReactorV2{ peerManager: peerManager, pexCh: pexCh, @@ -188,8 +191,8 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { switch msg := envelope.Message.(type) { case *protop2p.PexRequest: - // check if the peer hasn't sent a prior request too close to this one - // in time + // Check if the peer hasn't sent a prior request too close to this one + // in time. if err := r.markPeerRequest(envelope.From); err != nil { return err } @@ -304,15 +307,18 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { func (r *ReactorV2) resolve(addresses []p2p.NodeAddress) []protop2p.PexAddress { limit := len(addresses) pexAddresses := make([]protop2p.PexAddress, 0, limit) + for _, address := range addresses { ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout) endpoints, err := address.Resolve(ctx) r.Logger.Debug("resolved node address", "endpoints", endpoints) cancel() + if err != nil { r.Logger.Debug("failed to resolve address", "address", address, "err", err) continue } + for _, endpoint := range endpoints { r.Logger.Debug("checking endpint", "IP", endpoint.IP, "Port", endpoint.Port) if len(pexAddresses) >= limit { @@ -334,6 +340,7 @@ func (r *ReactorV2) resolve(addresses []p2p.NodeAddress) []protop2p.PexAddress { } } } + return pexAddresses }