diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go index 95ad70fe7..3a3920486 100644 --- a/p2p/pex/addrbook.go +++ b/p2p/pex/addrbook.go @@ -139,6 +139,10 @@ func (a *addrBook) Wait() { a.wg.Wait() } +func (a *addrBook) FilePath() string { + return a.filePath +} + //------------------------------------------------------- // AddOurAddress one of our addresses. diff --git a/p2p/pex/addrbook_test.go b/p2p/pex/addrbook_test.go index 166d31847..4a8df7166 100644 --- a/p2p/pex/addrbook_test.go +++ b/p2p/pex/addrbook_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "math/rand" + "os" "testing" "github.com/stretchr/testify/assert" @@ -26,17 +27,24 @@ func createTempFileName(prefix string) string { return fname } +func deleteTempFile(fname string) { + err := os.Remove(fname) + if err != nil { + panic(err) + } +} + func TestAddrBookPickAddress(t *testing.T) { - assert := assert.New(t) fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) // 0 addresses book := NewAddrBook(fname, true) book.SetLogger(log.TestingLogger()) - assert.Zero(book.Size()) + assert.Zero(t, book.Size()) addr := book.PickAddress(50) - assert.Nil(addr, "expected no address") + assert.Nil(t, addr, "expected no address") randAddrs := randNetAddressPairs(t, 1) addrSrc := randAddrs[0] @@ -44,26 +52,27 @@ func TestAddrBookPickAddress(t *testing.T) { // pick an address when we only have new address addr = book.PickAddress(0) - assert.NotNil(addr, "expected an address") + assert.NotNil(t, addr, "expected an address") addr = book.PickAddress(50) - assert.NotNil(addr, "expected an address") + assert.NotNil(t, addr, "expected an address") addr = book.PickAddress(100) - assert.NotNil(addr, "expected an address") + assert.NotNil(t, addr, "expected an address") // pick an address when we only have old address book.MarkGood(addrSrc.addr) addr = book.PickAddress(0) - assert.NotNil(addr, "expected an address") + assert.NotNil(t, addr, "expected an address") addr = book.PickAddress(50) - assert.NotNil(addr, "expected an address") + assert.NotNil(t, addr, "expected an address") // in this case, nNew==0 but we biased 100% to new, so we return nil addr = book.PickAddress(100) - assert.Nil(addr, "did not expected an address") + assert.Nil(t, addr, "did not expected an address") } func TestAddrBookSaveLoad(t *testing.T) { fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) // 0 addresses book := NewAddrBook(fname, true) @@ -95,6 +104,7 @@ func TestAddrBookSaveLoad(t *testing.T) { func TestAddrBookLookup(t *testing.T) { fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) randAddrs := randNetAddressPairs(t, 100) @@ -115,8 +125,8 @@ func TestAddrBookLookup(t *testing.T) { } func TestAddrBookPromoteToOld(t *testing.T) { - assert := assert.New(t) fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) randAddrs := randNetAddressPairs(t, 100) @@ -147,11 +157,12 @@ func TestAddrBookPromoteToOld(t *testing.T) { t.Errorf("selection could not be bigger than the book") } - assert.Equal(book.Size(), 100, "expecting book size to be 100") + assert.Equal(t, book.Size(), 100, "expecting book size to be 100") } func TestAddrBookHandlesDuplicates(t *testing.T) { fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) book := NewAddrBook(fname, true) book.SetLogger(log.TestingLogger()) @@ -202,6 +213,8 @@ func randIPv4Address(t *testing.T) *p2p.NetAddress { func TestAddrBookRemoveAddress(t *testing.T) { fname := createTempFileName("addrbook_test") + defer deleteTempFile(fname) + book := NewAddrBook(fname, true) book.SetLogger(log.TestingLogger()) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 746fb3b39..193efc88d 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -6,6 +6,7 @@ import ( "math/rand" "reflect" "sort" + "sync" "time" "github.com/pkg/errors" @@ -38,6 +39,8 @@ const ( defaultSeedDisconnectWaitPeriod = 2 * time.Minute // disconnect after this defaultCrawlPeerInterval = 2 * time.Minute // dont redial for this. TODO: back-off defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this + + maxAttemptsToDial = 16 // ~ 35h in total (last attempt - 18h) ) // PEXReactor handles PEX (peer exchange) and ensures that an @@ -59,6 +62,8 @@ type PEXReactor struct { // maps to prevent abuse requestsSent *cmn.CMap // ID->struct{}: unanswered send requests lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us + + attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)} } // PEXReactorConfig holds reactor specific configuration data. @@ -71,6 +76,11 @@ type PEXReactorConfig struct { Seeds []string } +type _attemptsToDial struct { + number int + lastDialed time.Time +} + // NewPEXReactor creates new PEX reactor. func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor { r := &PEXReactor{ @@ -339,19 +349,8 @@ func (r *PEXReactor) ensurePeers() { } // Dial picked addresses - for _, item := range toDial { - go func(picked *p2p.NetAddress) { - err := r.Switch.DialPeerWithAddress(picked, false) - if err != nil { - r.Logger.Error("Dialing failed", "err", err) - // TODO: detect more "bad peer" scenarios - if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok { - r.book.MarkBad(picked) - } else { - r.book.MarkAttempt(picked) - } - } - }(item) + for _, addr := range toDial { + go r.dialPeer(addr) } // If we need more addresses, pick a random peer and ask for more. @@ -372,6 +371,48 @@ func (r *PEXReactor) ensurePeers() { } } +func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { + var attempts int + var lastDialed time.Time + if lAttempts, attempted := r.attemptsToDial.Load(addr.DialString()); attempted { + attempts = lAttempts.(_attemptsToDial).number + lastDialed = lAttempts.(_attemptsToDial).lastDialed + } + + if attempts > maxAttemptsToDial { + r.Logger.Error("Reached max attempts to dial", "addr", addr, "attempts", attempts) + r.book.MarkBad(addr) + return + } + + // exponential backoff if it's not our first attempt to dial given address + if attempts > 0 { + jitterSeconds := time.Duration(rand.Float64() * float64(time.Second)) // 1s == (1e9 ns) + backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second) + sinceLastDialed := time.Since(lastDialed) + if sinceLastDialed < backoffDuration { + r.Logger.Debug("Too early to dial", "addr", addr, "backoff_duration", backoffDuration, "last_dialed", lastDialed, "time_since", sinceLastDialed) + return + } + } + + err := r.Switch.DialPeerWithAddress(addr, false) + if err != nil { + r.Logger.Error("Dialing failed", "addr", addr, "err", err, "attempts", attempts) + // TODO: detect more "bad peer" scenarios + if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok { + r.book.MarkBad(addr) + } else { + r.book.MarkAttempt(addr) + } + // record attempt + r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()}) + } else { + // cleanup any history + r.attemptsToDial.Delete(addr.DialString()) + } +} + // check seed addresses are well formed func (r *PEXReactor) checkSeeds() error { lSeeds := len(r.config.Seeds) @@ -409,6 +450,17 @@ func (r *PEXReactor) dialSeeds() { r.Switch.Logger.Error("Couldn't connect to any seeds") } +// AttemptsToDial returns the number of attempts to dial specific address. It +// returns 0 if never attempted or successfully connected. +func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int { + lAttempts, attempted := r.attemptsToDial.Load(addr.DialString()) + if attempted { + return lAttempts.(_attemptsToDial).number + } else { + return 0 + } +} + //---------------------------------------------------------- // Explores the network searching for more peers. (continuous) diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 41da867ad..f5d815037 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" "testing" "time" @@ -12,12 +13,11 @@ import ( crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" - cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/log" - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/conn" + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" ) var ( @@ -30,49 +30,33 @@ func init() { } func TestPEXReactorBasic(t *testing.T) { - assert, require := assert.New(t), require.New(t) - - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", true) - book.SetLogger(log.TestingLogger()) + r, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) - r := NewPEXReactor(book, &PEXReactorConfig{}) - r.SetLogger(log.TestingLogger()) - - assert.NotNil(r) - assert.NotEmpty(r.GetChannels()) + assert.NotNil(t, r) + assert.NotEmpty(t, r.GetChannels()) } func TestPEXReactorAddRemovePeer(t *testing.T) { - assert, require := assert.New(t), require.New(t) - - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", true) - book.SetLogger(log.TestingLogger()) - - r := NewPEXReactor(book, &PEXReactorConfig{}) - r.SetLogger(log.TestingLogger()) + r, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) size := book.Size() peer := p2p.CreateRandomPeer(false) r.AddPeer(peer) - assert.Equal(size+1, book.Size()) + assert.Equal(t, size+1, book.Size()) r.RemovePeer(peer, "peer not available") - assert.Equal(size+1, book.Size()) + assert.Equal(t, size+1, book.Size()) outboundPeer := p2p.CreateRandomPeer(true) r.AddPeer(outboundPeer) - assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book") + assert.Equal(t, size+1, book.Size(), "outbound peers should not be added to the address book") r.RemovePeer(outboundPeer, "peer not available") - assert.Equal(size+1, book.Size()) + assert.Equal(t, size+1, book.Size()) } func TestPEXReactorRunning(t *testing.T) { @@ -82,7 +66,7 @@ func TestPEXReactorRunning(t *testing.T) { dir, err := ioutil.TempDir("", "pex_reactor") require.Nil(t, err) defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", false) + book := NewAddrBook(filepath.Join(dir, "addrbook.json"), false) book.SetLogger(log.TestingLogger()) // create switches @@ -120,16 +104,8 @@ func TestPEXReactorRunning(t *testing.T) { } func TestPEXReactorReceive(t *testing.T) { - assert, require := assert.New(t), require.New(t) - - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", false) - book.SetLogger(log.TestingLogger()) - - r := NewPEXReactor(book, &PEXReactorConfig{}) - r.SetLogger(log.TestingLogger()) + r, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) peer := p2p.CreateRandomPeer(false) @@ -140,98 +116,77 @@ func TestPEXReactorReceive(t *testing.T) { addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) r.Receive(PexChannel, peer, msg) - assert.Equal(size+1, book.Size()) + assert.Equal(t, size+1, book.Size()) msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}}) r.Receive(PexChannel, peer, msg) } func TestPEXReactorRequestMessageAbuse(t *testing.T) { - assert, require := assert.New(t), require.New(t) + r, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", true) - book.SetLogger(log.TestingLogger()) - - r := NewPEXReactor(book, &PEXReactorConfig{}) - sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) - sw.SetLogger(log.TestingLogger()) - sw.AddReactor("PEX", r) - r.SetSwitch(sw) - r.SetLogger(log.TestingLogger()) + sw := createSwitchAndAddReactors(r) peer := newMockPeer() p2p.AddPeerToSwitch(sw, peer) - assert.True(sw.Peers().Has(peer.ID())) + assert.True(t, sw.Peers().Has(peer.ID())) id := string(peer.ID()) msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}}) // first time creates the entry r.Receive(PexChannel, peer, msg) - assert.True(r.lastReceivedRequests.Has(id)) - assert.True(sw.Peers().Has(peer.ID())) + assert.True(t, r.lastReceivedRequests.Has(id)) + assert.True(t, sw.Peers().Has(peer.ID())) // next time sets the last time value r.Receive(PexChannel, peer, msg) - assert.True(r.lastReceivedRequests.Has(id)) - assert.True(sw.Peers().Has(peer.ID())) + assert.True(t, r.lastReceivedRequests.Has(id)) + assert.True(t, sw.Peers().Has(peer.ID())) // third time is too many too soon - peer is removed r.Receive(PexChannel, peer, msg) - assert.False(r.lastReceivedRequests.Has(id)) - assert.False(sw.Peers().Has(peer.ID())) + assert.False(t, r.lastReceivedRequests.Has(id)) + assert.False(t, sw.Peers().Has(peer.ID())) } func TestPEXReactorAddrsMessageAbuse(t *testing.T) { - assert, require := assert.New(t), require.New(t) + r, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", true) - book.SetLogger(log.TestingLogger()) - - r := NewPEXReactor(book, &PEXReactorConfig{}) - sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) - sw.SetLogger(log.TestingLogger()) - sw.AddReactor("PEX", r) - r.SetSwitch(sw) - r.SetLogger(log.TestingLogger()) + sw := createSwitchAndAddReactors(r) peer := newMockPeer() p2p.AddPeerToSwitch(sw, peer) - assert.True(sw.Peers().Has(peer.ID())) + assert.True(t, sw.Peers().Has(peer.ID())) id := string(peer.ID()) // request addrs from the peer r.RequestAddrs(peer) - assert.True(r.requestsSent.Has(id)) - assert.True(sw.Peers().Has(peer.ID())) + assert.True(t, r.requestsSent.Has(id)) + assert.True(t, sw.Peers().Has(peer.ID())) addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) // receive some addrs. should clear the request r.Receive(PexChannel, peer, msg) - assert.False(r.requestsSent.Has(id)) - assert.True(sw.Peers().Has(peer.ID())) + assert.False(t, r.requestsSent.Has(id)) + assert.True(t, sw.Peers().Has(peer.ID())) // receiving more addrs causes a disconnect r.Receive(PexChannel, peer, msg) - assert.False(sw.Peers().Has(peer.ID())) + assert.False(t, sw.Peers().Has(peer.ID())) } func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { - dir, err := ioutil.TempDir("", "pex_reactor") require.Nil(t, err) defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", false) + book := NewAddrBook(filepath.Join(dir, "addrbook.json"), false) book.SetLogger(log.TestingLogger()) // 1. create seed @@ -288,22 +243,11 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { } func TestPEXReactorCrawlStatus(t *testing.T) { - assert, require := assert.New(t), require.New(t) + pexR, book := createReactor(&PEXReactorConfig{SeedMode: true}) + defer teardownReactor(book) - dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(err) - defer os.RemoveAll(dir) // nolint: errcheck - book := NewAddrBook(dir+"addrbook.json", false) - book.SetLogger(log.TestingLogger()) - - pexR := NewPEXReactor(book, &PEXReactorConfig{SeedMode: true}) // Seed/Crawler mode uses data from the Switch - p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { - pexR.SetLogger(log.TestingLogger()) - sw.SetLogger(log.TestingLogger().With("switch", i)) - sw.AddReactor("pex", pexR) - return sw - }) + _ = createSwitchAndAddReactors(pexR) // Create a peer, add it to the peer set and the addrbook. peer := p2p.CreateRandomPeer(false) @@ -319,11 +263,34 @@ func TestPEXReactorCrawlStatus(t *testing.T) { peerInfos := pexR.getPeersToCrawl() // Make sure it has the proper number of elements - assert.Equal(2, len(peerInfos)) + assert.Equal(t, 2, len(peerInfos)) // TODO: test } +func TestPEXReactorDialPeer(t *testing.T) { + pexR, book := createReactor(&PEXReactorConfig{}) + defer teardownReactor(book) + + _ = createSwitchAndAddReactors(pexR) + + peer := newMockPeer() + addr := peer.NodeInfo().NetAddress() + + assert.Equal(t, 0, pexR.AttemptsToDial(addr)) + + // 1st unsuccessful attempt + pexR.dialPeer(addr) + + assert.Equal(t, 1, pexR.AttemptsToDial(addr)) + + // 2nd unsuccessful attempt + pexR.dialPeer(addr) + + // must be skipped because it is too early + assert.Equal(t, 1, pexR.AttemptsToDial(addr)) +} + type mockPeer struct { *cmn.BaseService pubKey crypto.PubKey @@ -400,3 +367,33 @@ func assertPeersWithTimeout( } } } + +func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) { + dir, err := ioutil.TempDir("", "pex_reactor") + if err != nil { + panic(err) + } + book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true) + book.SetLogger(log.TestingLogger()) + + r = NewPEXReactor(book, &PEXReactorConfig{}) + r.SetLogger(log.TestingLogger()) + return +} + +func teardownReactor(book *addrBook) { + err := os.RemoveAll(filepath.Dir(book.FilePath())) + if err != nil { + panic(err) + } +} + +func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch { + sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) + sw.SetLogger(log.TestingLogger()) + for _, r := range reactors { + sw.AddReactor(r.String(), r) + r.SetSwitch(sw) + } + return sw +} diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index ca75ad561..fe15cda21 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -254,10 +254,8 @@ func (c *WSClient) reconnect() error { c.mtx.Unlock() }() - // 1s == (1e9 ns) == (1 Billion ns) - billionNs := float64(time.Second.Nanoseconds()) for { - jitterSeconds := time.Duration(rand.Float64() * billionNs) + jitterSeconds := time.Duration(rand.Float64() * float64(time.Second)) // 1s == (1e9 ns) backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second) c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)