From 5ef639fcbe6786fdeb99fd7754fa38a45655c84d Mon Sep 17 00:00:00 2001 From: Javed Khan Date: Mon, 2 Apr 2018 10:15:02 +0530 Subject: [PATCH] p2p: persistent - redial if first dial fails Fixes #1401 --- CHANGELOG.md | 1 + p2p/CHANGELOG.md | 1 + p2p/peer.go | 5 ++++- p2p/switch.go | 35 +++++++++++++++++++++-------------- p2p/switch_test.go | 29 +++++++++++++++++++++++++++++ 5 files changed, 56 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aea9b5450..bad58654c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ IMPROVEMENTS: BUG FIXES: - Graceful handling/recovery for apps that have non-determinism or fail to halt - Graceful handling/recovery for violations of safety, or liveness +- Fix reconnect to persistent peer when first dial fails ## 0.17.1 (March 27th, 2018) diff --git a/p2p/CHANGELOG.md b/p2p/CHANGELOG.md index cae2f4c9f..260924a08 100644 --- a/p2p/CHANGELOG.md +++ b/p2p/CHANGELOG.md @@ -7,6 +7,7 @@ BREAKING CHANGES: - Remove or unexport methods from FuzzedConnection: Active, Mode, ProbDropRW, ProbDropConn, ProbSleep, MaxDelayMilliseconds, Fuzz - switch.AddPeerWithConnection is unexported and replaced by switch.AddPeer - switch.DialPeerWithAddress takes a bool, setting the peer as persistent or not +- PeerConfig requires a Dial function FEATURES: diff --git a/p2p/peer.go b/p2p/peer.go index 4af6eeaae..9e228efde 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -87,6 +87,8 @@ func newPeer(pc peerConn, nodeInfo NodeInfo, type PeerConfig struct { AuthEnc bool `mapstructure:"auth_enc"` // authenticated encryption + Dial func(addr *NetAddress, config *PeerConfig) (net.Conn, error) + // times are in seconds HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` DialTimeout time.Duration `mapstructure:"dial_timeout"` @@ -101,6 +103,7 @@ type PeerConfig struct { func DefaultPeerConfig() *PeerConfig { return &PeerConfig{ AuthEnc: true, + Dial: dial, HandshakeTimeout: 20, // * time.Second, DialTimeout: 3, // * time.Second, MConfig: tmconn.DefaultMConnConfig(), @@ -112,7 +115,7 @@ func DefaultPeerConfig() *PeerConfig { func newOutboundPeerConn(addr *NetAddress, config *PeerConfig, persistent bool, ourNodePrivKey crypto.PrivKey) (peerConn, error) { var pc peerConn - conn, err := dial(addr, config) + conn, err := config.Dial(addr, config) if err != nil { return pc, errors.Wrap(err, "Error creating peer") } diff --git a/p2p/switch.go b/p2p/switch.go index fa037a9b8..1a80d6435 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -56,6 +56,7 @@ type Switch struct { reactorsByCh map[byte]Reactor peers *PeerSet dialing *cmn.CMap + reconnecting *cmn.CMap nodeInfo NodeInfo // our node info nodeKey *NodeKey // our node privkey addrBook AddrBook @@ -75,6 +76,7 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: cmn.NewCMap(), + reconnecting: cmn.NewCMap(), } // Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws @@ -255,7 +257,7 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { sw.stopAndRemovePeer(peer, reason) if peer.IsPersistent() { - go sw.reconnectToPeer(peer) + go sw.reconnectToPeer(peer.NodeInfo().NetAddress()) } } @@ -274,24 +276,28 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { } } -// reconnectToPeer tries to reconnect to the peer, first repeatedly +// reconnectToPeer tries to reconnect to the addr, first repeatedly // with a fixed interval, then with exponential backoff. // If no success after all that, it stops trying, and leaves it -// to the PEX/Addrbook to find the peer again -func (sw *Switch) reconnectToPeer(peer Peer) { - // NOTE this will connect to the self reported address, - // not necessarily the original we dialed - netAddr := peer.NodeInfo().NetAddress() +// to the PEX/Addrbook to find the peer with the addr again +func (sw *Switch) reconnectToPeer(addr *NetAddress) { + if sw.reconnecting.Has(string(addr.ID)) { + return + } + + sw.reconnecting.Set(string(addr.ID), addr) + defer sw.reconnecting.Delete(string(addr.ID)) + start := time.Now() - sw.Logger.Info("Reconnecting to peer", "peer", peer) + sw.Logger.Info("Reconnecting to peer", "addr", addr) for i := 0; i < reconnectAttempts; i++ { if !sw.IsRunning() { return } - err := sw.DialPeerWithAddress(netAddr, true) + err := sw.DialPeerWithAddress(addr, true) if err != nil { - sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) + sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr) // sleep a set amount sw.randomSleep(reconnectInterval) continue @@ -301,7 +307,7 @@ func (sw *Switch) reconnectToPeer(peer Peer) { } sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff", - "peer", peer, "elapsed", time.Since(start)) + "addr", addr, "elapsed", time.Since(start)) for i := 0; i < reconnectBackOffAttempts; i++ { if !sw.IsRunning() { return @@ -310,13 +316,13 @@ func (sw *Switch) reconnectToPeer(peer Peer) { // sleep an exponentially increasing amount sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) - err := sw.DialPeerWithAddress(netAddr, true) + err := sw.DialPeerWithAddress(addr, true) if err == nil { return // success } - sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) + sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr) } - sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start)) + sw.Logger.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start)) } // SetAddrBook allows to set address book on Switch. @@ -470,6 +476,7 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig peerConn, err := newOutboundPeerConn(addr, config, persistent, sw.nodeKey.PrivKey) if err != nil { sw.Logger.Error("Failed to dial peer", "address", addr, "err", err) + go sw.reconnectToPeer(addr) return err } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 06e8b642e..5663ccea7 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -23,6 +24,11 @@ var ( config *cfg.P2PConfig ) +// badDial returns an error for testing dial errors +func badDial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { + return nil, errors.New("dial err") +} + func init() { config = cfg.DefaultP2PConfig() config.PexReactor = true @@ -295,6 +301,29 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { } assert.NotZero(npeers) assert.False(peer.IsRunning()) + + // simulate another remote peer + rp = &remotePeer{PrivKey: crypto.GenPrivKeyEd25519().Wrap(), Config: DefaultPeerConfig()} + rp.Start() + defer rp.Stop() + + // simulate first time dial failure + peerConfig := DefaultPeerConfig() + peerConfig.Dial = badDial + err = sw.addOutboundPeerWithConfig(rp.Addr(), peerConfig, true) + require.NotNil(err) + + // DialPeerWithAddres - sw.peerConfig resets the dialer + + // TODO: same as above + for i := 0; i < 20; i++ { + time.Sleep(250 * time.Millisecond) + npeers = sw.Peers().Size() + if npeers > 1 { + break + } + } + assert.EqualValues(2, npeers) } func TestSwitchFullConnectivity(t *testing.T) {