|
|
@ -2,6 +2,7 @@ package p2p |
|
|
|
|
|
|
|
import ( |
|
|
|
"fmt" |
|
|
|
"math" |
|
|
|
"math/rand" |
|
|
|
"net" |
|
|
|
"time" |
|
|
@ -14,8 +15,19 @@ import ( |
|
|
|
) |
|
|
|
|
|
|
|
const ( |
|
|
|
reconnectAttempts = 30 |
|
|
|
reconnectInterval = 3 * time.Second |
|
|
|
// wait a random amount of time from this interval
|
|
|
|
// before dialing seeds or reconnecting to help prevent DoS
|
|
|
|
dialRandomizerIntervalMilliseconds = 3000 |
|
|
|
|
|
|
|
// repeatedly try to reconnect for a few minutes
|
|
|
|
// ie. 5 * 20 = 100s
|
|
|
|
reconnectAttempts = 20 |
|
|
|
reconnectInterval = 5 * time.Second |
|
|
|
|
|
|
|
// then move into exponential backoff mode for ~1day
|
|
|
|
// ie. 3**10 = 16hrs
|
|
|
|
reconnectBackOffAttempts = 10 |
|
|
|
reconnectBackOffBaseSeconds = 3 |
|
|
|
) |
|
|
|
|
|
|
|
type Reactor interface { |
|
|
@ -74,6 +86,8 @@ type Switch struct { |
|
|
|
|
|
|
|
filterConnByAddr func(net.Addr) error |
|
|
|
filterConnByPubKey func(crypto.PubKeyEd25519) error |
|
|
|
|
|
|
|
rng *rand.Rand // seed for randomizing dial times and orders
|
|
|
|
} |
|
|
|
|
|
|
|
var ( |
|
|
@ -92,6 +106,10 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { |
|
|
|
nodeInfo: nil, |
|
|
|
} |
|
|
|
|
|
|
|
// Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws
|
|
|
|
// from a seed that's initialized with OS entropy on process start.
|
|
|
|
sw.rng = rand.New(rand.NewSource(cmn.RandInt64())) |
|
|
|
|
|
|
|
// TODO: collapse the peerConfig into the config ?
|
|
|
|
sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond |
|
|
|
sw.peerConfig.MConfig.SendRate = config.SendRate |
|
|
@ -317,15 +335,11 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { |
|
|
|
addrBook.Save() |
|
|
|
} |
|
|
|
|
|
|
|
// Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws
|
|
|
|
// from a seed that's initialized with OS entropy on process start.
|
|
|
|
rng := rand.New(rand.NewSource(cmn.RandInt64())) |
|
|
|
|
|
|
|
// permute the list, dial them in random order.
|
|
|
|
perm := rng.Perm(len(netAddrs)) |
|
|
|
perm := sw.rng.Perm(len(netAddrs)) |
|
|
|
for i := 0; i < len(perm); i++ { |
|
|
|
go func(i int) { |
|
|
|
time.Sleep(time.Duration(rng.Int63n(3000)) * time.Millisecond) |
|
|
|
sw.randomSleep(0) |
|
|
|
j := perm[i] |
|
|
|
sw.dialSeed(netAddrs[j]) |
|
|
|
}(i) |
|
|
@ -333,6 +347,12 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
|
|
|
|
func (sw *Switch) randomSleep(interval time.Duration) { |
|
|
|
r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond |
|
|
|
time.Sleep(r + interval) |
|
|
|
} |
|
|
|
|
|
|
|
func (sw *Switch) dialSeed(addr *NetAddress) { |
|
|
|
peer, err := sw.DialPeerWithAddress(addr, true) |
|
|
|
if err != nil { |
|
|
@ -413,34 +433,59 @@ func (sw *Switch) Peers() IPeerSet { |
|
|
|
// If the peer is persistent, it will attempt to reconnect.
|
|
|
|
// TODO: make record depending on reason.
|
|
|
|
func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { |
|
|
|
addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr) |
|
|
|
sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason) |
|
|
|
sw.stopAndRemovePeer(peer, reason) |
|
|
|
|
|
|
|
if peer.IsPersistent() { |
|
|
|
go func() { |
|
|
|
sw.Logger.Info("Reconnecting to peer", "peer", peer) |
|
|
|
for i := 1; i < reconnectAttempts; i++ { |
|
|
|
if !sw.IsRunning() { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
peer, err := sw.DialPeerWithAddress(addr, true) |
|
|
|
if err != nil { |
|
|
|
if i == reconnectAttempts { |
|
|
|
sw.Logger.Info("Error reconnecting to peer. Giving up", "tries", i, "err", err) |
|
|
|
return |
|
|
|
} |
|
|
|
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err) |
|
|
|
time.Sleep(reconnectInterval) |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
sw.Logger.Info("Reconnected to peer", "peer", peer) |
|
|
|
return |
|
|
|
} |
|
|
|
}() |
|
|
|
go sw.reconnectToPeer(peer) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// reconnectToPeer tries to reconnect to the peer, first repeatedly
|
|
|
|
// with a fixed interval, then with exponential backoff.
|
|
|
|
// If no success after all that, it stops trying, and leaves it
|
|
|
|
// to the PEX/Addrbook to find the peer again
|
|
|
|
func (sw *Switch) reconnectToPeer(peer Peer) { |
|
|
|
addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr) |
|
|
|
start := time.Now() |
|
|
|
sw.Logger.Info("Reconnecting to peer", "peer", peer) |
|
|
|
for i := 0; i < reconnectAttempts; i++ { |
|
|
|
if !sw.IsRunning() { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
peer, err := sw.DialPeerWithAddress(addr, true) |
|
|
|
if err != nil { |
|
|
|
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) |
|
|
|
// sleep a set amount
|
|
|
|
sw.randomSleep(reconnectInterval) |
|
|
|
continue |
|
|
|
} else { |
|
|
|
sw.Logger.Info("Reconnected to peer", "peer", peer) |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff", |
|
|
|
"peer", peer, "elapsed", time.Since(start)) |
|
|
|
for i := 0; i < reconnectBackOffAttempts; i++ { |
|
|
|
if !sw.IsRunning() { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// sleep an exponentially increasing amount
|
|
|
|
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) |
|
|
|
sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) |
|
|
|
peer, err := sw.DialPeerWithAddress(addr, true) |
|
|
|
if err != nil { |
|
|
|
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) |
|
|
|
continue |
|
|
|
} else { |
|
|
|
sw.Logger.Info("Reconnected to peer", "peer", peer) |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start)) |
|
|
|
} |
|
|
|
|
|
|
|
// StopPeerGracefully disconnects from a peer gracefully.
|
|
|
|