From 5511bd8e85d7a817a836d1fe40d432f6342bed1d Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 11 Dec 2017 13:41:09 -0500 Subject: [PATCH] p2p: exponential backoff on reconnect. closes #939 --- p2p/switch.go | 107 ++++++++++++++++++++++++++++++++------------- p2p/switch_test.go | 4 +- 2 files changed, 78 insertions(+), 33 deletions(-) diff --git a/p2p/switch.go b/p2p/switch.go index 4fdaec6ec..33ad28ea7 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -2,6 +2,7 @@ package p2p import ( "fmt" + "math" "math/rand" "net" "time" @@ -14,8 +15,19 @@ import ( ) const ( - reconnectAttempts = 30 - reconnectInterval = 3 * time.Second + // wait a random amount of time from this interval + // before dialing seeds or reconnecting to help prevent DoS + dialRandomizerIntervalMilliseconds = 3000 + + // repeatedly try to reconnect for a few minutes + // ie. 5 * 20 = 100s + reconnectAttempts = 20 + reconnectInterval = 5 * time.Second + + // then move into exponential backoff mode for ~1day + // ie. 3**10 = 16hrs + reconnectBackOffAttempts = 10 + reconnectBackOffBaseSeconds = 3 ) type Reactor interface { @@ -74,6 +86,8 @@ type Switch struct { filterConnByAddr func(net.Addr) error filterConnByPubKey func(crypto.PubKeyEd25519) error + + rng *rand.Rand // seed for randomizing dial times and orders } var ( @@ -92,6 +106,10 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { nodeInfo: nil, } + // Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws + // from a seed that's initialized with OS entropy on process start. + sw.rng = rand.New(rand.NewSource(cmn.RandInt64())) + // TODO: collapse the peerConfig into the config ? sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond sw.peerConfig.MConfig.SendRate = config.SendRate @@ -317,15 +335,11 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { addrBook.Save() } - // Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws - // from a seed that's initialized with OS entropy on process start. - rng := rand.New(rand.NewSource(cmn.RandInt64())) - // permute the list, dial them in random order. - perm := rng.Perm(len(netAddrs)) + perm := sw.rng.Perm(len(netAddrs)) for i := 0; i < len(perm); i++ { go func(i int) { - time.Sleep(time.Duration(rng.Int63n(3000)) * time.Millisecond) + sw.randomSleep(0) j := perm[i] sw.dialSeed(netAddrs[j]) }(i) @@ -333,6 +347,12 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { return nil } +// sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds] +func (sw *Switch) randomSleep(interval time.Duration) { + r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond + time.Sleep(r + interval) +} + func (sw *Switch) dialSeed(addr *NetAddress) { peer, err := sw.DialPeerWithAddress(addr, true) if err != nil { @@ -413,34 +433,59 @@ func (sw *Switch) Peers() IPeerSet { // If the peer is persistent, it will attempt to reconnect. // TODO: make record depending on reason. func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { - addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr) sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason) sw.stopAndRemovePeer(peer, reason) if peer.IsPersistent() { - go func() { - sw.Logger.Info("Reconnecting to peer", "peer", peer) - for i := 1; i < reconnectAttempts; i++ { - if !sw.IsRunning() { - return - } - - peer, err := sw.DialPeerWithAddress(addr, true) - if err != nil { - if i == reconnectAttempts { - sw.Logger.Info("Error reconnecting to peer. Giving up", "tries", i, "err", err) - return - } - sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err) - time.Sleep(reconnectInterval) - continue - } - - sw.Logger.Info("Reconnected to peer", "peer", peer) - return - } - }() + go sw.reconnectToPeer(peer) + } +} + +// reconnectToPeer tries to reconnect to the peer, first repeatedly +// with a fixed interval, then with exponential backoff. +// If no success after all that, it stops trying, and leaves it +// to the PEX/Addrbook to find the peer again +func (sw *Switch) reconnectToPeer(peer Peer) { + addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr) + start := time.Now() + sw.Logger.Info("Reconnecting to peer", "peer", peer) + for i := 0; i < reconnectAttempts; i++ { + if !sw.IsRunning() { + return + } + + peer, err := sw.DialPeerWithAddress(addr, true) + if err != nil { + sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) + // sleep a set amount + sw.randomSleep(reconnectInterval) + continue + } else { + sw.Logger.Info("Reconnected to peer", "peer", peer) + return + } + } + + sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff", + "peer", peer, "elapsed", time.Since(start)) + for i := 0; i < reconnectBackOffAttempts; i++ { + if !sw.IsRunning() { + return + } + + // sleep an exponentially increasing amount + sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) + sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) + peer, err := sw.DialPeerWithAddress(addr, true) + if err != nil { + sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) + continue + } else { + sw.Logger.Info("Reconnected to peer", "peer", peer) + return + } } + sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start)) } // StopPeerGracefully disconnects from a peer gracefully. diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 3ce24d082..72807d36a 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -272,10 +272,10 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { // simulate failure by closing connection peer.CloseConn() - // TODO: actually detect the disconnection and wait for reconnect + // TODO: remove sleep, detect the disconnection, wait for reconnect npeers := sw.Peers().Size() for i := 0; i < 20; i++ { - time.Sleep(100 * time.Millisecond) + time.Sleep(250 * time.Millisecond) npeers = sw.Peers().Size() if npeers > 0 { break