Browse Source

test ensurePeers goroutine

pull/456/head
Anton Kalyaev 8 years ago
committed by Anton Kaliaev
parent
commit
0109f1e524
No known key found for this signature in database GPG Key ID: 7B6881D965918214
4 changed files with 118 additions and 61 deletions
  1. +15
    -7
      pex_reactor.go
  2. +97
    -0
      pex_reactor_test.go
  3. +0
    -50
      pex_reactor_tests.go
  4. +6
    -4
      switch.go

+ 15
- 7
pex_reactor.go View File

@ -13,7 +13,7 @@ import (
const (
PexChannel = byte(0x00)
ensurePeersPeriodSeconds = 30
defaultEnsurePeersPeriod = 30 * time.Second
minNumOutboundPeers = 10
maxPexMessageSize = 1048576 // 1MB
)
@ -23,13 +23,15 @@ const (
type PEXReactor struct {
BaseReactor
sw *Switch
book *AddrBook
sw *Switch
book *AddrBook
ensurePeersPeriod time.Duration
}
func NewPEXReactor(b *AddrBook) *PEXReactor {
r := &PEXReactor{
book: b,
book: b,
ensurePeersPeriod: defaultEnsurePeersPeriod,
}
r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r)
return r
@ -125,16 +127,22 @@ func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) {
p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
}
// SetEnsurePeersPeriod sets period to ensure peers connected.
func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
r.ensurePeersPeriod = d
}
// Ensures that sufficient peers are connected. (continuous)
func (r *PEXReactor) ensurePeersRoutine() {
// Randomize when routine starts
time.Sleep(time.Duration(rand.Int63n(500*ensurePeersPeriodSeconds)) * time.Millisecond)
ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6
time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
// fire once immediately.
r.ensurePeers()
// fire periodically
timer := NewRepeatTimer("pex", ensurePeersPeriodSeconds*time.Second)
timer := NewRepeatTimer("pex", r.ensurePeersPeriod)
FOR_LOOP:
for {
select {
@ -149,7 +157,7 @@ FOR_LOOP:
timer.Stop()
}
// Ensures that sufficient peers are connected. (once)
// ensurePeers ensures that sufficient peers are connected. (once)
func (r *PEXReactor) ensurePeers() {
numOutPeers, _, numDialing := r.Switch.NumPeers()
numToDial := minNumOutboundPeers - (numOutPeers + numDialing)


+ 97
- 0
pex_reactor_test.go View File

@ -0,0 +1,97 @@
package p2p
import (
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
. "github.com/tendermint/go-common"
)
func TestPEXReactorBasic(t *testing.T) {
book := NewAddrBook(createTempFileName("addrbook"), true)
r := NewPEXReactor(book)
assert.NotNil(t, r)
assert.NotEmpty(t, r.GetChannels())
}
func TestPEXReactorAddRemovePeer(t *testing.T) {
book := NewAddrBook(createTempFileName("addrbook"), true)
r := NewPEXReactor(book)
size := book.Size()
peer := createRandomPeer(false)
r.AddPeer(peer)
assert.Equal(t, size+1, book.Size())
r.RemovePeer(peer, "peer not available")
assert.Equal(t, size, book.Size())
outboundPeer := createRandomPeer(true)
r.AddPeer(outboundPeer)
assert.Equal(t, size, book.Size(), "size must not change")
r.RemovePeer(outboundPeer, "peer not available")
assert.Equal(t, size, book.Size(), "size must not change")
}
func TestPEXReactorRunning(t *testing.T) {
N := 3
switches := make([]*Switch, N)
book := NewAddrBook(createTempFileName("addrbook"), false)
// create switches
for i := 0; i < N; i++ {
switches[i] = makeSwitch(i, "172.17.0.2", "123.123.123", func(i int, sw *Switch) *Switch {
r := NewPEXReactor(book)
r.SetEnsurePeersPeriod(250 * time.Millisecond)
sw.AddReactor("pex", r)
return sw
})
}
// fill the address book and add listeners
for _, s := range switches {
addr := NewNetAddressString(s.NodeInfo().ListenAddr)
book.AddAddress(addr, addr)
s.AddListener(NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true))
}
// start switches
for _, s := range switches {
_, err := s.Start() // start switch and reactors
require.Nil(t, err)
}
time.Sleep(1 * time.Second)
// check peers are connected after some time
for _, s := range switches {
outbound, inbound, _ := s.NumPeers()
if outbound+inbound == 0 {
t.Errorf("%v expected to be connected to at least one peer", s.NodeInfo().ListenAddr)
}
}
// stop them
for _, s := range switches {
s.Stop()
}
}
func createRandomPeer(outbound bool) *Peer {
return &Peer{
Key: RandStr(12),
NodeInfo: &NodeInfo{
RemoteAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256),
ListenAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256),
},
outbound: outbound,
}
}

+ 0
- 50
pex_reactor_tests.go View File

@ -1,50 +0,0 @@
package p2p
import (
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
. "github.com/tendermint/go-common"
)
func TestBasic(t *testing.T) {
book := NewAddrBook(createTempFileName("addrbook"), true)
r := NewPEXReactor(book)
assert.NotNil(t, r)
assert.NotEmpty(t, r.GetChannels())
}
func TestAddRemovePeer(t *testing.T) {
book := NewAddrBook(createTempFileName("addrbook"), true)
r := NewPEXReactor(book)
size := book.Size()
peer := createRandomPeer(false)
r.AddPeer(peer)
assert.Equal(t, size+1, book.Size())
r.RemovePeer(peer, "peer not available")
assert.Equal(t, size, book.Size())
outboundPeer := createRandomPeer(true)
r.AddPeer(outboundPeer)
assert.Equal(t, size, book.Size(), "size must not change")
r.RemovePeer(outboundPeer, "peer not available")
assert.Equal(t, size, book.Size(), "size must not change")
}
func createRandomPeer(outbound bool) *Peer {
return &Peer{
Key: RandStr(12),
NodeInfo: &NodeInfo{
RemoteAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256),
ListenAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256),
},
outbound: outbound,
}
}

+ 6
- 4
switch.go View File

@ -531,10 +531,12 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S
// TODO: let the config be passed in?
s := initSwitch(i, NewSwitch(cfg.NewMapConfig(nil)))
s.SetNodeInfo(&NodeInfo{
PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
Moniker: Fmt("switch%d", i),
Network: network,
Version: version,
PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
Moniker: Fmt("switch%d", i),
Network: network,
Version: version,
RemoteAddr: Fmt("%v:%v", network, rand.Intn(64512)+1023),
ListenAddr: Fmt("%v:%v", network, rand.Intn(64512)+1023),
})
s.SetNodePrivKey(privKey)
return s


Loading…
Cancel
Save