From 8711af608fff1cfe8d1de1ba8e91d6b0a25a9211 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 3 May 2019 17:21:56 +0400 Subject: [PATCH] p2p: make persistent prop independent of conn direction (#3593) ## Description Previously only outbound peers can be persistent. Now, even if the peer is inbound, if it's marked as persistent, when/if conn is lost, Tendermint will try to reconnect. This part is actually optional and can be reverted. Plus, seed won't disconnect from inbound peer if it's marked as persistent. Fixes #3362 ## Commits * make persistent prop independent of conn direction Previously only outbound peers can be persistent. Now, even if the peer is inbound, if it's marked as persistent, when/if conn is lost, Tendermint will try to reconnect. Plus, seed won't disconnect from inbound peer if it's marked as persistent. Fixes #3362 * fix TestPEXReactorDialPeer test * add a changelog entry * update changelog * add two tests * reformat code * test UnsafeDialPeers and UnsafeDialSeeds * add TestSwitchDialPeersAsync * spec: update p2p/config spec * fixes after Ismail's review * Apply suggestions from code review Co-Authored-By: melekes * fix merge conflict * remove sleep from TestPEXReactorDoesNotDisconnectFromPersistentPeerInSeedMode We don't need it actually. --- CHANGELOG_PENDING.md | 5 ++ docs/spec/p2p/config.md | 12 ++-- node/node.go | 13 ++-- p2p/pex/pex_reactor.go | 5 +- p2p/pex/pex_reactor_test.go | 42 ++++++++++++- p2p/switch.go | 121 ++++++++++++++++++++++++++---------- p2p/switch_test.go | 111 +++++++++++++++++++++------------ p2p/transport.go | 28 +++++++-- rpc/core/net.go | 14 ++--- rpc/core/net_test.go | 73 ++++++++++++++++++++++ rpc/core/pipe.go | 3 +- 11 files changed, 323 insertions(+), 104 deletions(-) create mode 100644 rpc/core/net_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 8f162e072..635d0dd1a 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -23,11 +23,16 @@ - [cli] \#3585 Add option to not clear address book with unsafe reset (@climber73) - [cli] [\#3160](https://github.com/tendermint/tendermint/issues/3160) Add `-config=` option to `testnet` cmd (@gregdhill) - [cs/replay] \#3460 check appHash for each block +- [rpc] \#3362 `/dial_seeds` & `/dial_peers` return errors if addresses are incorrect (except when IP lookup fails) +- [node] \#3362 returns an error if `persistent_peers` list is invalid (except when IP lookup fails) - [p2p] \#3531 Terminate session on nonce wrapping (@climber73) ### BUG FIXES: - [p2p] \#3532 limit the number of attempts to connect to a peer in seed mode to 16 (as a result, the node will stop retrying after a 35 hours time window) - [consensus] \#2723, \#3451 and \#3317 Fix non-deterministic tests +- [p2p] \#3362 make persistent prop independent of conn direction + * `Switch#DialPeersAsync` now only takes a list of peers + * `Switch#DialPeerWithAddress` now only takes an address - [consensus] \#3067 getBeginBlockValidatorInfo loads validators from stateDB instead of state (@james-ray) - [pex] \#3603 Dial seeds when addrbook needs more addresses (@defunctzombie) diff --git a/docs/spec/p2p/config.md b/docs/spec/p2p/config.md index b31a36736..7ff2b5e8d 100644 --- a/docs/spec/p2p/config.md +++ b/docs/spec/p2p/config.md @@ -12,14 +12,14 @@ and upon incoming connection shares some peers and disconnects. ## Seeds -`--p2p.seeds “1.2.3.4:26656,2.3.4.5:4444”` +`--p2p.seeds “id100000000000000000000000000000000@1.2.3.4:26656,id200000000000000000000000000000000@2.3.4.5:4444”` Dials these seeds when we need more peers. They should return a list of peers and then disconnect. If we already have enough peers in the address book, we may never need to dial them. ## Persistent Peers -`--p2p.persistent_peers “1.2.3.4:26656,2.3.4.5:26656”` +`--p2p.persistent_peers “id100000000000000000000000000000000@1.2.3.4:26656,id200000000000000000000000000000000@2.3.4.5:26656”` Dial these peers and auto-redial them if the connection fails. These are intended to be trusted persistent peers that can help @@ -30,9 +30,9 @@ backoff and will give up after a day of trying to connect. the user will be warned that seeds may auto-close connections and that the node may not be able to keep the connection persistent. -## Private Persistent Peers +## Private Peers -`--p2p.private_persistent_peers “1.2.3.4:26656,2.3.4.5:26656”` +`--p2p.private_peer_ids “id100000000000000000000000000000000,id200000000000000000000000000000000”` -These are persistent peers that we do not add to the address book or -gossip to other peers. They stay private to us. +These are IDs of the peers that we do not add to the address book or gossip to +other peers. They stay private to us. diff --git a/node/node.go b/node/node.go index 36bdb9d57..5af080ad1 100644 --- a/node/node.go +++ b/node/node.go @@ -579,6 +579,11 @@ func NewNode(config *cfg.Config, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, ) + err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) + if err != nil { + return nil, errors.Wrap(err, "could not add peers from persistent_peers field") + } + addrBook := createAddrBookAndSetOnSwitch(config, sw, p2pLogger) // Optionally, start the pex reactor @@ -675,12 +680,8 @@ func (n *Node) OnStart() error { } // Always connect to persistent peers - if n.config.P2P.PersistentPeers != "" { - err = n.sw.DialPeersAsync(n.addrBook, splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "), true) - if err != nil { - return err - } - } + // parsing errors are handled above by AddPersistentPeers + _ = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) return nil } diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index f24f44dd5..957dbf802 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -531,8 +531,7 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) error { } } - err := r.Switch.DialPeerWithAddress(addr, false) - + err := r.Switch.DialPeerWithAddress(addr) if err != nil { if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok { return err @@ -584,7 +583,7 @@ func (r *PEXReactor) dialSeeds() { for _, i := range perm { // dial a random seed seedAddr := r.seedAddrs[i] - err := r.Switch.DialPeerWithAddress(seedAddr, false) + err := r.Switch.DialPeerWithAddress(seedAddr) if err == nil { return } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 6572a5f61..8c52a25ee 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -291,7 +291,8 @@ func TestPEXReactorSeedMode(t *testing.T) { require.Nil(t, err) defer os.RemoveAll(dir) // nolint: errcheck - pexR, book := createReactor(&PEXReactorConfig{SeedMode: true, SeedDisconnectWaitPeriod: 10 * time.Millisecond}) + pexRConfig := &PEXReactorConfig{SeedMode: true, SeedDisconnectWaitPeriod: 10 * time.Millisecond} + pexR, book := createReactor(pexRConfig) defer teardownReactor(book) sw := createSwitchAndAddReactors(pexR) @@ -315,13 +316,48 @@ func TestPEXReactorSeedMode(t *testing.T) { pexR.attemptDisconnects() assert.Equal(t, 1, sw.Peers().Size()) - time.Sleep(100 * time.Millisecond) + // sleep for SeedDisconnectWaitPeriod + time.Sleep(pexRConfig.SeedDisconnectWaitPeriod + 1*time.Millisecond) // 3. attemptDisconnects should disconnect after wait period pexR.attemptDisconnects() assert.Equal(t, 0, sw.Peers().Size()) } +func TestPEXReactorDoesNotDisconnectFromPersistentPeerInSeedMode(t *testing.T) { + // directory to store address books + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(t, err) + defer os.RemoveAll(dir) // nolint: errcheck + + pexR, book := createReactor(&PEXReactorConfig{SeedMode: true, SeedDisconnectWaitPeriod: 1 * time.Millisecond}) + defer teardownReactor(book) + + sw := createSwitchAndAddReactors(pexR) + sw.SetAddrBook(book) + err = sw.Start() + require.NoError(t, err) + defer sw.Stop() + + assert.Zero(t, sw.Peers().Size()) + + peerSwitch := testCreateDefaultPeer(dir, 1) + require.NoError(t, peerSwitch.Start()) + defer peerSwitch.Stop() + + err = sw.AddPersistentPeers([]string{peerSwitch.NetAddress().String()}) + require.NoError(t, err) + + // 1. Test crawlPeers dials the peer + pexR.crawlPeers([]*p2p.NetAddress{peerSwitch.NetAddress()}) + assert.Equal(t, 1, sw.Peers().Size()) + assert.True(t, sw.Peers().Has(peerSwitch.NodeInfo().ID())) + + // 2. attemptDisconnects should not disconnect because the peer is persistent + pexR.attemptDisconnects() + assert.Equal(t, 1, sw.Peers().Size()) +} + func TestPEXReactorDialsPeerUpToMaxAttemptsInSeedMode(t *testing.T) { // directory to store address books dir, err := ioutil.TempDir("", "pex_reactor") @@ -398,7 +434,7 @@ func TestPEXReactorSeedModeFlushStop(t *testing.T) { reactor := switches[0].Reactors()["pex"].(*PEXReactor) peerID := switches[1].NodeInfo().ID() - err = switches[1].DialPeerWithAddress(switches[0].NetAddress(), false) + err = switches[1].DialPeerWithAddress(switches[0].NetAddress()) assert.NoError(t, err) // sleep up to a second while waiting for the peer to send us a message. diff --git a/p2p/switch.go b/p2p/switch.go index a566e6029..ccb7119d2 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -77,6 +77,8 @@ type Switch struct { nodeInfo NodeInfo // our node info nodeKey *NodeKey // our node privkey addrBook AddrBook + // peers addresses with whom we'll maintain constant connection + persistentPeersAddrs []*NetAddress transport Transport @@ -104,16 +106,17 @@ func NewSwitch( options ...SwitchOption, ) *Switch { sw := &Switch{ - config: cfg, - reactors: make(map[string]Reactor), - chDescs: make([]*conn.ChannelDescriptor, 0), - reactorsByCh: make(map[byte]Reactor), - peers: NewPeerSet(), - dialing: cmn.NewCMap(), - reconnecting: cmn.NewCMap(), - metrics: NopMetrics(), - transport: transport, - filterTimeout: defaultFilterTimeout, + config: cfg, + reactors: make(map[string]Reactor), + chDescs: make([]*conn.ChannelDescriptor, 0), + reactorsByCh: make(map[byte]Reactor), + peers: NewPeerSet(), + dialing: cmn.NewCMap(), + reconnecting: cmn.NewCMap(), + metrics: NopMetrics(), + transport: transport, + filterTimeout: defaultFilterTimeout, + persistentPeersAddrs: make([]*NetAddress, 0), } // Ensure we have a completely undeterministic PRNG. @@ -297,7 +300,19 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { sw.stopAndRemovePeer(peer, reason) if peer.IsPersistent() { - go sw.reconnectToPeer(peer.SocketAddr()) + var addr *NetAddress + if peer.IsOutbound() { // socket address for outbound peers + addr = peer.SocketAddr() + } else { // self-reported address for inbound peers + var err error + addr, err = peer.NodeInfo().NetAddress() + if err != nil { + sw.Logger.Error("Wanted to reconnect to inbound peer, but self-reported address is wrong", + "peer", peer, "addr", addr, "err", err) + return + } + } + go sw.reconnectToPeer(addr) } } @@ -341,7 +356,7 @@ func (sw *Switch) reconnectToPeer(addr *NetAddress) { return } - err := sw.DialPeerWithAddress(addr, true) + err := sw.DialPeerWithAddress(addr) if err == nil { return // success } else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok { @@ -365,7 +380,7 @@ func (sw *Switch) reconnectToPeer(addr *NetAddress) { sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) - err := sw.DialPeerWithAddress(addr, true) + err := sw.DialPeerWithAddress(addr) if err == nil { return // success } else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok { @@ -401,28 +416,41 @@ func isPrivateAddr(err error) bool { return ok && te.PrivateAddr() } -// DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent). +// DialPeersAsync dials a list of peers asynchronously in random order. // Used to dial peers from config on startup or from unsafe-RPC (trusted sources). -// TODO: remove addrBook arg since it's now set on the switch -func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error { +// It ignores ErrNetAddressLookup. However, if there are other errors, first +// encounter is returned. +// Nop if there are no peers. +func (sw *Switch) DialPeersAsync(peers []string) error { netAddrs, errs := NewNetAddressStrings(peers) - // only log errors, dial correct addresses + // report all the errors for _, err := range errs { sw.Logger.Error("Error in peer's address", "err", err) } + // return first non-ErrNetAddressLookup error + for _, err := range errs { + if _, ok := err.(ErrNetAddressLookup); ok { + continue + } + return err + } + sw.dialPeersAsync(netAddrs) + return nil +} +func (sw *Switch) dialPeersAsync(netAddrs []*NetAddress) { ourAddr := sw.NetAddress() // TODO: this code feels like it's in the wrong place. // The integration tests depend on the addrBook being saved // right away but maybe we can change that. Recall that // the addrBook is only written to disk every 2min - if addrBook != nil { + if sw.addrBook != nil { // add peers to `addrBook` for _, netAddr := range netAddrs { // do not add our address or ID if !netAddr.Same(ourAddr) { - if err := addrBook.AddAddress(netAddr, ourAddr); err != nil { + if err := sw.addrBook.AddAddress(netAddr, ourAddr); err != nil { if isPrivateAddr(err) { sw.Logger.Debug("Won't add peer's address to addrbook", "err", err) } else { @@ -433,7 +461,7 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b } // Persist some peers to disk right away. // NOTE: integration tests depend on this - addrBook.Save() + sw.addrBook.Save() } // permute the list, dial them in random order. @@ -450,7 +478,7 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b sw.randomSleep(0) - err := sw.DialPeerWithAddress(addr, persistent) + err := sw.DialPeerWithAddress(addr) if err != nil { switch err.(type) { case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID, ErrCurrentlyDialingOrExistingAddress: @@ -461,16 +489,13 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b } }(i) } - return nil } // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects // and authenticates successfully. -// If `persistent == true`, the switch will always try to reconnect to this -// peer if the connection ever fails. // If we're currently dialing this address or it belongs to an existing peer, // ErrCurrentlyDialingOrExistingAddress is returned. -func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error { +func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error { if sw.IsDialingOrExistingAddress(addr) { return ErrCurrentlyDialingOrExistingAddress{addr.String()} } @@ -478,7 +503,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error { sw.dialing.Set(string(addr.ID), addr) defer sw.dialing.Delete(string(addr.ID)) - return sw.addOutboundPeerWithConfig(addr, sw.config, persistent) + return sw.addOutboundPeerWithConfig(addr, sw.config) } // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds] @@ -495,6 +520,38 @@ func (sw *Switch) IsDialingOrExistingAddress(addr *NetAddress) bool { (!sw.config.AllowDuplicateIP && sw.peers.HasIP(addr.IP)) } +// AddPersistentPeers allows you to set persistent peers. It ignores +// ErrNetAddressLookup. However, if there are other errors, first encounter is +// returned. +func (sw *Switch) AddPersistentPeers(addrs []string) error { + sw.Logger.Info("Adding persistent peers", "addrs", addrs) + netAddrs, errs := NewNetAddressStrings(addrs) + // report all the errors + for _, err := range errs { + sw.Logger.Error("Error in peer's address", "err", err) + } + // return first non-ErrNetAddressLookup error + for _, err := range errs { + if _, ok := err.(ErrNetAddressLookup); ok { + continue + } + return err + } + sw.persistentPeersAddrs = netAddrs + return nil +} + +func (sw *Switch) isPeerPersistentFn() func(*NetAddress) bool { + return func(na *NetAddress) bool { + for _, pa := range sw.persistentPeersAddrs { + if pa.Equals(na) { + return true + } + } + return false + } +} + func (sw *Switch) acceptRoutine() { for { p, err := sw.transport.Accept(peerConfig{ @@ -502,6 +559,7 @@ func (sw *Switch) acceptRoutine() { onPeerError: sw.StopPeerForError, reactorsByCh: sw.reactorsByCh, metrics: sw.metrics, + isPersistent: sw.isPeerPersistentFn(), }) if err != nil { switch err := err.(type) { @@ -581,13 +639,12 @@ func (sw *Switch) acceptRoutine() { // dial the peer; make secret connection; authenticate against the dialed ID; // add the peer. -// if dialing fails, start the reconnect loop. If handhsake fails, its over. -// If peer is started succesffuly, reconnectLoop will start when -// StopPeerForError is called +// if dialing fails, start the reconnect loop. If handshake fails, it's over. +// If peer is started successfully, reconnectLoop will start when +// StopPeerForError is called. func (sw *Switch) addOutboundPeerWithConfig( addr *NetAddress, cfg *config.P2PConfig, - persistent bool, ) error { sw.Logger.Info("Dialing peer", "address", addr) @@ -600,7 +657,7 @@ func (sw *Switch) addOutboundPeerWithConfig( p, err := sw.transport.Dial(*addr, peerConfig{ chDescs: sw.chDescs, onPeerError: sw.StopPeerForError, - persistent: persistent, + isPersistent: sw.isPeerPersistentFn(), reactorsByCh: sw.reactorsByCh, metrics: sw.metrics, }) @@ -619,7 +676,7 @@ func (sw *Switch) addOutboundPeerWithConfig( // retry persistent peers after // any dial error besides IsSelf() - if persistent { + if sw.isPeerPersistentFn()(addr) { go sw.reconnectToPeer(addr) } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index bf105e0fa..6c7538b51 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -167,7 +167,7 @@ func TestSwitchFiltersOutItself(t *testing.T) { rp.Start() // addr should be rejected in addPeer based on the same ID - err := s1.DialPeerWithAddress(rp.Addr(), false) + err := s1.DialPeerWithAddress(rp.Addr()) if assert.Error(t, err) { if err, ok := err.(ErrRejected); ok { if !err.IsSelf() { @@ -212,6 +212,7 @@ func TestSwitchPeerFilter(t *testing.T) { p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ chDescs: sw.chDescs, onPeerError: sw.StopPeerForError, + isPersistent: sw.isPeerPersistentFn(), reactorsByCh: sw.reactorsByCh, }) if err != nil { @@ -256,6 +257,7 @@ func TestSwitchPeerFilterTimeout(t *testing.T) { p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ chDescs: sw.chDescs, onPeerError: sw.StopPeerForError, + isPersistent: sw.isPeerPersistentFn(), reactorsByCh: sw.reactorsByCh, }) if err != nil { @@ -281,6 +283,7 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) { p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ chDescs: sw.chDescs, onPeerError: sw.StopPeerForError, + isPersistent: sw.isPeerPersistentFn(), reactorsByCh: sw.reactorsByCh, }) if err != nil { @@ -326,6 +329,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ chDescs: sw.chDescs, onPeerError: sw.StopPeerForError, + isPersistent: sw.isPeerPersistentFn(), reactorsByCh: sw.reactorsByCh, }) require.Nil(err) @@ -390,49 +394,33 @@ func TestSwitchStopPeerForError(t *testing.T) { assert.EqualValues(t, 0, peersMetricValue()) } -func TestSwitchReconnectsToPersistentPeer(t *testing.T) { - assert, require := assert.New(t), require.New(t) - +func TestSwitchReconnectsToOutboundPersistentPeer(t *testing.T) { sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) err := sw.Start() - if err != nil { - t.Error(err) - } + require.NoError(t, err) defer sw.Stop() - // simulate remote peer + // 1. simulate failure by closing connection rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} rp.Start() defer rp.Stop() - p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - persistent: true, - reactorsByCh: sw.reactorsByCh, - }) - require.Nil(err) - - require.Nil(sw.addPeer(p)) + err = sw.AddPersistentPeers([]string{rp.Addr().String()}) + require.NoError(t, err) - require.NotNil(sw.Peers().Get(rp.ID())) + err = sw.DialPeerWithAddress(rp.Addr()) + require.Nil(t, err) + time.Sleep(50 * time.Millisecond) + require.NotNil(t, sw.Peers().Get(rp.ID())) - // simulate failure by closing connection + p := sw.Peers().List()[0] p.(*peer).CloseConn() - // TODO: remove sleep, detect the disconnection, wait for reconnect - npeers := sw.Peers().Size() - for i := 0; i < 20; i++ { - time.Sleep(250 * time.Millisecond) - npeers = sw.Peers().Size() - if npeers > 0 { - break - } - } - assert.NotZero(npeers) - assert.False(p.IsRunning()) + waitUntilSwitchHasAtLeastNPeers(sw, 1) + assert.False(t, p.IsRunning()) // old peer instance + assert.Equal(t, 1, sw.Peers().Size()) // new peer instance - // simulate another remote peer + // 2. simulate first time dial failure rp = &remotePeer{ PrivKey: ed25519.GenPrivKey(), Config: cfg, @@ -443,23 +431,68 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { rp.Start() defer rp.Stop() - // simulate first time dial failure conf := config.DefaultP2PConfig() conf.TestDialFail = true - err = sw.addOutboundPeerWithConfig(rp.Addr(), conf, true) - require.NotNil(err) - + err = sw.addOutboundPeerWithConfig(rp.Addr(), conf) + require.NotNil(t, err) // DialPeerWithAddres - sw.peerConfig resets the dialer + waitUntilSwitchHasAtLeastNPeers(sw, 2) + assert.Equal(t, 2, sw.Peers().Size()) +} + +func TestSwitchReconnectsToInboundPersistentPeer(t *testing.T) { + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + err := sw.Start() + require.NoError(t, err) + defer sw.Stop() + + // 1. simulate failure by closing the connection + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + rp.Start() + defer rp.Stop() + + err = sw.AddPersistentPeers([]string{rp.Addr().String()}) + require.NoError(t, err) + + conn, err := rp.Dial(sw.NetAddress()) + require.NoError(t, err) + time.Sleep(50 * time.Millisecond) + require.NotNil(t, sw.Peers().Get(rp.ID())) + + conn.Close() + + waitUntilSwitchHasAtLeastNPeers(sw, 1) + assert.Equal(t, 1, sw.Peers().Size()) +} + +func TestSwitchDialPeersAsync(t *testing.T) { + if testing.Short() { + return + } + + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + err := sw.Start() + require.NoError(t, err) + defer sw.Stop() + + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + rp.Start() + defer rp.Stop() + + err = sw.DialPeersAsync([]string{rp.Addr().String()}) + require.NoError(t, err) + time.Sleep(dialRandomizerIntervalMilliseconds * time.Millisecond) + require.NotNil(t, sw.Peers().Get(rp.ID())) +} - // TODO: same as above +func waitUntilSwitchHasAtLeastNPeers(sw *Switch, n int) { for i := 0; i < 20; i++ { time.Sleep(250 * time.Millisecond) - npeers = sw.Peers().Size() - if npeers > 1 { + has := sw.Peers().Size() + if has >= n { break } } - assert.EqualValues(2, npeers) } func TestSwitchFullConnectivity(t *testing.T) { diff --git a/p2p/transport.go b/p2p/transport.go index ebf77c9f4..8d6ea236e 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -37,11 +37,15 @@ type accept struct { // events. // TODO(xla): Refactor out with more static Reactor setup and PeerBehaviour. type peerConfig struct { - chDescs []*conn.ChannelDescriptor - onPeerError func(Peer, interface{}) - outbound, persistent bool - reactorsByCh map[byte]Reactor - metrics *Metrics + chDescs []*conn.ChannelDescriptor + onPeerError func(Peer, interface{}) + outbound bool + // isPersistent allows you to set a function, which, given socket address + // (for outbound peers) OR self-reported address (for inbound peers), tells + // if the peer is persistent or not. + isPersistent func(*NetAddress) bool + reactorsByCh map[byte]Reactor + metrics *Metrics } // Transport emits and connects to Peers. The implementation of Peer is left to @@ -446,9 +450,21 @@ func (mt *MultiplexTransport) wrapPeer( socketAddr *NetAddress, ) Peer { + persistent := false + if cfg.isPersistent != nil { + if cfg.outbound { + persistent = cfg.isPersistent(socketAddr) + } else { + selfReportedAddr, err := ni.NetAddress() + if err == nil { + persistent = cfg.isPersistent(selfReportedAddr) + } + } + } + peerConn := newPeerConn( cfg.outbound, - cfg.persistent, + persistent, c, socketAddr, ) diff --git a/rpc/core/net.go b/rpc/core/net.go index 23bc40e88..48bf576a8 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -184,10 +184,8 @@ func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialS if len(seeds) == 0 { return &ctypes.ResultDialSeeds{}, errors.New("No seeds provided") } - // starts go routines to dial each peer after random delays - logger.Info("DialSeeds", "addrBook", addrBook, "seeds", seeds) - err := p2pPeers.DialPeersAsync(addrBook, seeds, false) - if err != nil { + logger.Info("DialSeeds", "seeds", seeds) + if err := p2pPeers.DialPeersAsync(seeds); err != nil { return &ctypes.ResultDialSeeds{}, err } return &ctypes.ResultDialSeeds{Log: "Dialing seeds in progress. See /net_info for details"}, nil @@ -197,12 +195,12 @@ func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent bool) (*c if len(peers) == 0 { return &ctypes.ResultDialPeers{}, errors.New("No peers provided") } - // starts go routines to dial each peer after random delays - logger.Info("DialPeers", "addrBook", addrBook, "peers", peers, "persistent", persistent) - err := p2pPeers.DialPeersAsync(addrBook, peers, persistent) - if err != nil { + logger.Info("DialPeers", "peers", peers, "persistent", persistent) + if err := p2pPeers.AddPersistentPeers(peers); err != nil { return &ctypes.ResultDialPeers{}, err } + // parsing errors are handled above by AddPersistentPeers + _ = p2pPeers.DialPeersAsync(peers) return &ctypes.ResultDialPeers{Log: "Dialing peers in progress. See /net_info for details"}, nil } diff --git a/rpc/core/net_test.go b/rpc/core/net_test.go new file mode 100644 index 000000000..651e1f69d --- /dev/null +++ b/rpc/core/net_test.go @@ -0,0 +1,73 @@ +package core + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" +) + +func TestUnsafeDialSeeds(t *testing.T) { + sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", + func(n int, sw *p2p.Switch) *p2p.Switch { return sw }) + err := sw.Start() + require.NoError(t, err) + defer sw.Stop() + + logger = log.TestingLogger() + p2pPeers = sw + + testCases := []struct { + seeds []string + isErr bool + }{ + {[]string{}, true}, + {[]string{"d51fb70907db1c6c2d5237e78379b25cf1a37ab4@127.0.0.1:41198"}, false}, + {[]string{"127.0.0.1:41198"}, true}, + } + + for _, tc := range testCases { + res, err := UnsafeDialSeeds(&rpctypes.Context{}, tc.seeds) + if tc.isErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.NotNil(t, res) + } + } +} + +func TestUnsafeDialPeers(t *testing.T) { + sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", + func(n int, sw *p2p.Switch) *p2p.Switch { return sw }) + err := sw.Start() + require.NoError(t, err) + defer sw.Stop() + + logger = log.TestingLogger() + p2pPeers = sw + + testCases := []struct { + peers []string + isErr bool + }{ + {[]string{}, true}, + {[]string{"d51fb70907db1c6c2d5237e78379b25cf1a37ab4@127.0.0.1:41198"}, false}, + {[]string{"127.0.0.1:41198"}, true}, + } + + for _, tc := range testCases { + res, err := UnsafeDialPeers(&rpctypes.Context{}, tc.peers, false) + if tc.isErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.NotNil(t, res) + } + } +} diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index ad8afdefc..cefb0e371 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -44,7 +44,8 @@ type transport interface { } type peers interface { - DialPeersAsync(p2p.AddrBook, []string, bool) error + AddPersistentPeers([]string) error + DialPeersAsync([]string) error NumPeers() (outbound, inbound, dialig int) Peers() p2p.IPeerSet }