diff --git a/pex_reactor.go b/pex_reactor.go index 8d49de992..56a1e3233 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -13,7 +13,7 @@ import ( const ( PexChannel = byte(0x00) - ensurePeersPeriodSeconds = 30 + defaultEnsurePeersPeriod = 30 * time.Second minNumOutboundPeers = 10 maxPexMessageSize = 1048576 // 1MB ) @@ -23,13 +23,15 @@ const ( type PEXReactor struct { BaseReactor - sw *Switch - book *AddrBook + sw *Switch + book *AddrBook + ensurePeersPeriod time.Duration } func NewPEXReactor(b *AddrBook) *PEXReactor { r := &PEXReactor{ - book: b, + book: b, + ensurePeersPeriod: defaultEnsurePeersPeriod, } r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r) return r @@ -125,16 +127,22 @@ func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) { p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) } +// SetEnsurePeersPeriod sets period to ensure peers connected. +func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) { + r.ensurePeersPeriod = d +} + // Ensures that sufficient peers are connected. (continuous) func (r *PEXReactor) ensurePeersRoutine() { // Randomize when routine starts - time.Sleep(time.Duration(rand.Int63n(500*ensurePeersPeriodSeconds)) * time.Millisecond) + ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6 + time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond) // fire once immediately. r.ensurePeers() // fire periodically - timer := NewRepeatTimer("pex", ensurePeersPeriodSeconds*time.Second) + timer := NewRepeatTimer("pex", r.ensurePeersPeriod) FOR_LOOP: for { select { @@ -149,7 +157,7 @@ FOR_LOOP: timer.Stop() } -// Ensures that sufficient peers are connected. (once) +// ensurePeers ensures that sufficient peers are connected. (once) func (r *PEXReactor) ensurePeers() { numOutPeers, _, numDialing := r.Switch.NumPeers() numToDial := minNumOutboundPeers - (numOutPeers + numDialing) diff --git a/pex_reactor_test.go b/pex_reactor_test.go new file mode 100644 index 000000000..3674d3f31 --- /dev/null +++ b/pex_reactor_test.go @@ -0,0 +1,97 @@ +package p2p + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + . "github.com/tendermint/go-common" +) + +func TestPEXReactorBasic(t *testing.T) { + book := NewAddrBook(createTempFileName("addrbook"), true) + r := NewPEXReactor(book) + + assert.NotNil(t, r) + assert.NotEmpty(t, r.GetChannels()) +} + +func TestPEXReactorAddRemovePeer(t *testing.T) { + book := NewAddrBook(createTempFileName("addrbook"), true) + r := NewPEXReactor(book) + + size := book.Size() + peer := createRandomPeer(false) + + r.AddPeer(peer) + assert.Equal(t, size+1, book.Size()) + + r.RemovePeer(peer, "peer not available") + assert.Equal(t, size, book.Size()) + + outboundPeer := createRandomPeer(true) + + r.AddPeer(outboundPeer) + assert.Equal(t, size, book.Size(), "size must not change") + + r.RemovePeer(outboundPeer, "peer not available") + assert.Equal(t, size, book.Size(), "size must not change") +} + +func TestPEXReactorRunning(t *testing.T) { + N := 3 + switches := make([]*Switch, N) + + book := NewAddrBook(createTempFileName("addrbook"), false) + + // create switches + for i := 0; i < N; i++ { + switches[i] = makeSwitch(i, "172.17.0.2", "123.123.123", func(i int, sw *Switch) *Switch { + r := NewPEXReactor(book) + r.SetEnsurePeersPeriod(250 * time.Millisecond) + sw.AddReactor("pex", r) + return sw + }) + } + + // fill the address book and add listeners + for _, s := range switches { + addr := NewNetAddressString(s.NodeInfo().ListenAddr) + book.AddAddress(addr, addr) + s.AddListener(NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true)) + } + + // start switches + for _, s := range switches { + _, err := s.Start() // start switch and reactors + require.Nil(t, err) + } + + time.Sleep(1 * time.Second) + + // check peers are connected after some time + for _, s := range switches { + outbound, inbound, _ := s.NumPeers() + if outbound+inbound == 0 { + t.Errorf("%v expected to be connected to at least one peer", s.NodeInfo().ListenAddr) + } + } + + // stop them + for _, s := range switches { + s.Stop() + } +} + +func createRandomPeer(outbound bool) *Peer { + return &Peer{ + Key: RandStr(12), + NodeInfo: &NodeInfo{ + RemoteAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), + ListenAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), + }, + outbound: outbound, + } +} diff --git a/pex_reactor_tests.go b/pex_reactor_tests.go deleted file mode 100644 index 5825495f7..000000000 --- a/pex_reactor_tests.go +++ /dev/null @@ -1,50 +0,0 @@ -package p2p - -import ( - "math/rand" - "testing" - - "github.com/stretchr/testify/assert" - . "github.com/tendermint/go-common" -) - -func TestBasic(t *testing.T) { - book := NewAddrBook(createTempFileName("addrbook"), true) - r := NewPEXReactor(book) - - assert.NotNil(t, r) - assert.NotEmpty(t, r.GetChannels()) -} - -func TestAddRemovePeer(t *testing.T) { - book := NewAddrBook(createTempFileName("addrbook"), true) - r := NewPEXReactor(book) - - size := book.Size() - peer := createRandomPeer(false) - - r.AddPeer(peer) - assert.Equal(t, size+1, book.Size()) - - r.RemovePeer(peer, "peer not available") - assert.Equal(t, size, book.Size()) - - outboundPeer := createRandomPeer(true) - - r.AddPeer(outboundPeer) - assert.Equal(t, size, book.Size(), "size must not change") - - r.RemovePeer(outboundPeer, "peer not available") - assert.Equal(t, size, book.Size(), "size must not change") -} - -func createRandomPeer(outbound bool) *Peer { - return &Peer{ - Key: RandStr(12), - NodeInfo: &NodeInfo{ - RemoteAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), - ListenAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), - }, - outbound: outbound, - } -} diff --git a/switch.go b/switch.go index 8ceb1ab70..6835e0a4b 100644 --- a/switch.go +++ b/switch.go @@ -531,10 +531,12 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S // TODO: let the config be passed in? s := initSwitch(i, NewSwitch(cfg.NewMapConfig(nil))) s.SetNodeInfo(&NodeInfo{ - PubKey: privKey.PubKey().(crypto.PubKeyEd25519), - Moniker: Fmt("switch%d", i), - Network: network, - Version: version, + PubKey: privKey.PubKey().(crypto.PubKeyEd25519), + Moniker: Fmt("switch%d", i), + Network: network, + Version: version, + RemoteAddr: Fmt("%v:%v", network, rand.Intn(64512)+1023), + ListenAddr: Fmt("%v:%v", network, rand.Intn(64512)+1023), }) s.SetNodePrivKey(privKey) return s