From 057cfb30f12137cfe65851be71beb50ae8ed617f Mon Sep 17 00:00:00 2001 From: Anton Kalyaev Date: Wed, 11 Jan 2017 15:03:14 +0400 Subject: [PATCH 01/22] remove unused error --- pex_reactor.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pex_reactor.go b/pex_reactor.go index 4ac9306cf..2cffb529c 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -2,7 +2,6 @@ package p2p import ( "bytes" - "errors" "fmt" "math/rand" "reflect" @@ -12,8 +11,6 @@ import ( wire "github.com/tendermint/go-wire" ) -var pexErrInvalidMessage = errors.New("Invalid PEX message") - const ( PexChannel = byte(0x00) ensurePeersPeriodSeconds = 30 From 26f661a5dd0f60c140e6169d7ab90ecefaf51629 Mon Sep 17 00:00:00 2001 From: Anton Kalyaev Date: Wed, 11 Jan 2017 15:03:29 +0400 Subject: [PATCH 02/22] prefer short names --- pex_reactor.go | 112 ++++++++++++++++++++++++------------------------- 1 file changed, 54 insertions(+), 58 deletions(-) diff --git a/pex_reactor.go b/pex_reactor.go index 2cffb529c..01d0cbd18 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -18,10 +18,8 @@ const ( maxPexMessageSize = 1048576 // 1MB ) -/* -PEXReactor handles PEX (peer exchange) and ensures that an -adequate number of peers are connected to the switch. -*/ +// PEXReactor handles PEX (peer exchange) and ensures that an +// adequate number of peers are connected to the switch. type PEXReactor struct { BaseReactor @@ -29,28 +27,28 @@ type PEXReactor struct { book *AddrBook } -func NewPEXReactor(book *AddrBook) *PEXReactor { - pexR := &PEXReactor{ - book: book, +func NewPEXReactor(b *AddrBook) *PEXReactor { + r := &PEXReactor{ + book: b, } - pexR.BaseReactor = *NewBaseReactor(log, "PEXReactor", pexR) - return pexR + r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r) + return r } -func (pexR *PEXReactor) OnStart() error { - pexR.BaseReactor.OnStart() - pexR.book.Start() - go pexR.ensurePeersRoutine() +func (r *PEXReactor) OnStart() error { + r.BaseReactor.OnStart() + r.book.Start() + go r.ensurePeersRoutine() return nil } -func (pexR *PEXReactor) OnStop() { - pexR.BaseReactor.OnStop() - pexR.book.Stop() +func (r *PEXReactor) OnStop() { + r.BaseReactor.OnStop() + r.book.Stop() } -// Implements Reactor -func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { +// GetChannels implements Reactor +func (r *PEXReactor) GetChannels() []*ChannelDescriptor { return []*ChannelDescriptor{ &ChannelDescriptor{ ID: PexChannel, @@ -60,49 +58,45 @@ func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { } } -// Implements Reactor -func (pexR *PEXReactor) AddPeer(peer *Peer) { - // Add the peer to the address book - netAddr, err := NewNetAddressString(peer.ListenAddr) +// AddPeer implements Reactor by adding peer to the address book (if inbound) +// or by requesting more addresses (if outbound). +func (r *PEXReactor) AddPeer(p *Peer) { + netAddr, err := NewNetAddressString(p.ListenAddr) if err != nil { // this should never happen - log.Error("Error in AddPeer: invalid peer address", "addr", peer.ListenAddr, "error", err) + log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err) return } - if peer.IsOutbound() { - if pexR.book.NeedMoreAddrs() { - pexR.RequestPEX(peer) + if p.IsOutbound() { // For outbound peers, the address is already in the books + if r.book.NeedMoreAddrs() { + r.RequestPEX(p) } - } else { - // For inbound connections, the peer is its own source - // (For outbound peers, the address is already in the books) - pexR.book.AddAddress(netAddr, netAddr) + } else { // For inbound connections, the peer is its own source + r.book.AddAddress(netAddr, netAddr) } } -// Implements Reactor -func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) { +// RemovePeer implements Reactor +func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { // TODO } -// Implements Reactor -// Handles incoming PEX messages. -func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { - - // decode message +// Receive implements Reactor by handling incoming PEX messages. +func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Warn("Error decoding message", "error", err) return } + log.Notice("Received message", "msg", msg) switch msg := msg.(type) { case *pexRequestMessage: // src requested some peers. // TODO: prevent abuse. - pexR.SendAddrs(src, pexR.book.GetSelection()) + r.SendAddrs(src, r.book.GetSelection()) case *pexAddrsMessage: // We received some peer addresses from src. // TODO: prevent abuse. @@ -110,7 +104,7 @@ func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { srcAddr := src.Connection().RemoteAddress for _, addr := range msg.Addrs { if addr != nil { - pexR.book.AddAddress(addr, srcAddr) + r.book.AddAddress(addr, srcAddr) } } default: @@ -118,30 +112,32 @@ func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { } } -// Asks peer for more addresses. -func (pexR *PEXReactor) RequestPEX(peer *Peer) { - peer.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}}) +// RequestPEX asks peer for more addresses. +func (r *PEXReactor) RequestPEX(p *Peer) { + p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}}) } -func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) { - peer.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) +// SendAddrs sends addrs to the peer. +func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) { + p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) } // Ensures that sufficient peers are connected. (continuous) -func (pexR *PEXReactor) ensurePeersRoutine() { +func (r *PEXReactor) ensurePeersRoutine() { // Randomize when routine starts time.Sleep(time.Duration(rand.Int63n(500*ensurePeersPeriodSeconds)) * time.Millisecond) // fire once immediately. - pexR.ensurePeers() + r.ensurePeers() + // fire periodically timer := NewRepeatTimer("pex", ensurePeersPeriodSeconds*time.Second) FOR_LOOP: for { select { case <-timer.Ch: - pexR.ensurePeers() - case <-pexR.Quit: + r.ensurePeers() + case <-r.Quit: break FOR_LOOP } } @@ -151,8 +147,8 @@ FOR_LOOP: } // Ensures that sufficient peers are connected. (once) -func (pexR *PEXReactor) ensurePeers() { - numOutPeers, _, numDialing := pexR.Switch.NumPeers() +func (r *PEXReactor) ensurePeers() { + numOutPeers, _, numDialing := r.Switch.NumPeers() numToDial := minNumOutboundPeers - (numOutPeers + numDialing) log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) if numToDial <= 0 { @@ -168,13 +164,13 @@ func (pexR *PEXReactor) ensurePeers() { // Try to fetch a new peer 3 times. // This caps the maximum number of tries to 3 * numToDial. for j := 0; j < 3; j++ { - try := pexR.book.PickAddress(newBias) + try := r.book.PickAddress(newBias) if try == nil { break } alreadySelected := toDial.Has(try.IP.String()) - alreadyDialing := pexR.Switch.IsDialing(try) - alreadyConnected := pexR.Switch.Peers().Has(try.IP.String()) + alreadyDialing := r.Switch.IsDialing(try) + alreadyConnected := r.Switch.Peers().Has(try.IP.String()) if alreadySelected || alreadyDialing || alreadyConnected { /* log.Info("Cannot dial address", "addr", try, @@ -198,20 +194,20 @@ func (pexR *PEXReactor) ensurePeers() { // Dial picked addresses for _, item := range toDial.Values() { go func(picked *NetAddress) { - _, err := pexR.Switch.DialPeerWithAddress(picked, false) + _, err := r.Switch.DialPeerWithAddress(picked, false) if err != nil { - pexR.book.MarkAttempt(picked) + r.book.MarkAttempt(picked) } }(item.(*NetAddress)) } // If we need more addresses, pick a random peer and ask for more. - if pexR.book.NeedMoreAddrs() { - if peers := pexR.Switch.Peers().List(); len(peers) > 0 { + if r.book.NeedMoreAddrs() { + if peers := r.Switch.Peers().List(); len(peers) > 0 { i := rand.Int() % len(peers) peer := peers[i] log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer) - pexR.RequestPEX(peer) + r.RequestPEX(peer) } } } From 3af7c677576f07e5610686e5e6a4a38e54448d6c Mon Sep 17 00:00:00 2001 From: Anton Kalyaev Date: Wed, 11 Jan 2017 17:17:09 +0400 Subject: [PATCH 03/22] add Dockerfile --- Dockerfile | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..3716185f2 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:latest + +RUN curl https://glide.sh/get | sh + +RUN mkdir -p /go/src/github.com/tendermint/go-p2p +WORKDIR /go/src/github.com/tendermint/go-p2p + +COPY glide.yaml /go/src/github.com/tendermint/go-p2p/ +COPY glide.lock /go/src/github.com/tendermint/go-p2p/ + +RUN glide install + +COPY . /go/src/github.com/tendermint/go-p2p From 37d5a2cf3ed9f1562bbab946e960bf54eb41198d Mon Sep 17 00:00:00 2001 From: Anton Kalyaev Date: Wed, 11 Jan 2017 19:23:31 +0400 Subject: [PATCH 04/22] implement RemovePeer for PEXReactor --- pex_reactor.go | 7 +++++-- pex_reactor_tests.go | 50 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 pex_reactor_tests.go diff --git a/pex_reactor.go b/pex_reactor.go index 01d0cbd18..8d49de992 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -73,13 +73,16 @@ func (r *PEXReactor) AddPeer(p *Peer) { r.RequestPEX(p) } } else { // For inbound connections, the peer is its own source - r.book.AddAddress(netAddr, netAddr) + addr := NewNetAddressString(p.ListenAddr) + r.book.AddAddress(addr, addr) } } // RemovePeer implements Reactor func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { - // TODO + addr := NewNetAddressString(p.ListenAddr) + // addr will be ejected from the book + r.book.MarkBad(addr) } // Receive implements Reactor by handling incoming PEX messages. diff --git a/pex_reactor_tests.go b/pex_reactor_tests.go new file mode 100644 index 000000000..5825495f7 --- /dev/null +++ b/pex_reactor_tests.go @@ -0,0 +1,50 @@ +package p2p + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + . "github.com/tendermint/go-common" +) + +func TestBasic(t *testing.T) { + book := NewAddrBook(createTempFileName("addrbook"), true) + r := NewPEXReactor(book) + + assert.NotNil(t, r) + assert.NotEmpty(t, r.GetChannels()) +} + +func TestAddRemovePeer(t *testing.T) { + book := NewAddrBook(createTempFileName("addrbook"), true) + r := NewPEXReactor(book) + + size := book.Size() + peer := createRandomPeer(false) + + r.AddPeer(peer) + assert.Equal(t, size+1, book.Size()) + + r.RemovePeer(peer, "peer not available") + assert.Equal(t, size, book.Size()) + + outboundPeer := createRandomPeer(true) + + r.AddPeer(outboundPeer) + assert.Equal(t, size, book.Size(), "size must not change") + + r.RemovePeer(outboundPeer, "peer not available") + assert.Equal(t, size, book.Size(), "size must not change") +} + +func createRandomPeer(outbound bool) *Peer { + return &Peer{ + Key: RandStr(12), + NodeInfo: &NodeInfo{ + RemoteAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), + ListenAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), + }, + outbound: outbound, + } +} From 0109f1e5245c0fc0f9121fde1d6b4aa8ccd4b079 Mon Sep 17 00:00:00 2001 From: Anton Kalyaev Date: Thu, 12 Jan 2017 00:17:15 +0400 Subject: [PATCH 05/22] test ensurePeers goroutine --- pex_reactor.go | 22 ++++++---- pex_reactor_test.go | 97 ++++++++++++++++++++++++++++++++++++++++++++ pex_reactor_tests.go | 50 ----------------------- switch.go | 10 +++-- 4 files changed, 118 insertions(+), 61 deletions(-) create mode 100644 pex_reactor_test.go delete mode 100644 pex_reactor_tests.go diff --git a/pex_reactor.go b/pex_reactor.go index 8d49de992..56a1e3233 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -13,7 +13,7 @@ import ( const ( PexChannel = byte(0x00) - ensurePeersPeriodSeconds = 30 + defaultEnsurePeersPeriod = 30 * time.Second minNumOutboundPeers = 10 maxPexMessageSize = 1048576 // 1MB ) @@ -23,13 +23,15 @@ const ( type PEXReactor struct { BaseReactor - sw *Switch - book *AddrBook + sw *Switch + book *AddrBook + ensurePeersPeriod time.Duration } func NewPEXReactor(b *AddrBook) *PEXReactor { r := &PEXReactor{ - book: b, + book: b, + ensurePeersPeriod: defaultEnsurePeersPeriod, } r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r) return r @@ -125,16 +127,22 @@ func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) { p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) } +// SetEnsurePeersPeriod sets period to ensure peers connected. +func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) { + r.ensurePeersPeriod = d +} + // Ensures that sufficient peers are connected. (continuous) func (r *PEXReactor) ensurePeersRoutine() { // Randomize when routine starts - time.Sleep(time.Duration(rand.Int63n(500*ensurePeersPeriodSeconds)) * time.Millisecond) + ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6 + time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond) // fire once immediately. r.ensurePeers() // fire periodically - timer := NewRepeatTimer("pex", ensurePeersPeriodSeconds*time.Second) + timer := NewRepeatTimer("pex", r.ensurePeersPeriod) FOR_LOOP: for { select { @@ -149,7 +157,7 @@ FOR_LOOP: timer.Stop() } -// Ensures that sufficient peers are connected. (once) +// ensurePeers ensures that sufficient peers are connected. (once) func (r *PEXReactor) ensurePeers() { numOutPeers, _, numDialing := r.Switch.NumPeers() numToDial := minNumOutboundPeers - (numOutPeers + numDialing) diff --git a/pex_reactor_test.go b/pex_reactor_test.go new file mode 100644 index 000000000..3674d3f31 --- /dev/null +++ b/pex_reactor_test.go @@ -0,0 +1,97 @@ +package p2p + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + . "github.com/tendermint/go-common" +) + +func TestPEXReactorBasic(t *testing.T) { + book := NewAddrBook(createTempFileName("addrbook"), true) + r := NewPEXReactor(book) + + assert.NotNil(t, r) + assert.NotEmpty(t, r.GetChannels()) +} + +func TestPEXReactorAddRemovePeer(t *testing.T) { + book := NewAddrBook(createTempFileName("addrbook"), true) + r := NewPEXReactor(book) + + size := book.Size() + peer := createRandomPeer(false) + + r.AddPeer(peer) + assert.Equal(t, size+1, book.Size()) + + r.RemovePeer(peer, "peer not available") + assert.Equal(t, size, book.Size()) + + outboundPeer := createRandomPeer(true) + + r.AddPeer(outboundPeer) + assert.Equal(t, size, book.Size(), "size must not change") + + r.RemovePeer(outboundPeer, "peer not available") + assert.Equal(t, size, book.Size(), "size must not change") +} + +func TestPEXReactorRunning(t *testing.T) { + N := 3 + switches := make([]*Switch, N) + + book := NewAddrBook(createTempFileName("addrbook"), false) + + // create switches + for i := 0; i < N; i++ { + switches[i] = makeSwitch(i, "172.17.0.2", "123.123.123", func(i int, sw *Switch) *Switch { + r := NewPEXReactor(book) + r.SetEnsurePeersPeriod(250 * time.Millisecond) + sw.AddReactor("pex", r) + return sw + }) + } + + // fill the address book and add listeners + for _, s := range switches { + addr := NewNetAddressString(s.NodeInfo().ListenAddr) + book.AddAddress(addr, addr) + s.AddListener(NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true)) + } + + // start switches + for _, s := range switches { + _, err := s.Start() // start switch and reactors + require.Nil(t, err) + } + + time.Sleep(1 * time.Second) + + // check peers are connected after some time + for _, s := range switches { + outbound, inbound, _ := s.NumPeers() + if outbound+inbound == 0 { + t.Errorf("%v expected to be connected to at least one peer", s.NodeInfo().ListenAddr) + } + } + + // stop them + for _, s := range switches { + s.Stop() + } +} + +func createRandomPeer(outbound bool) *Peer { + return &Peer{ + Key: RandStr(12), + NodeInfo: &NodeInfo{ + RemoteAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), + ListenAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), + }, + outbound: outbound, + } +} diff --git a/pex_reactor_tests.go b/pex_reactor_tests.go deleted file mode 100644 index 5825495f7..000000000 --- a/pex_reactor_tests.go +++ /dev/null @@ -1,50 +0,0 @@ -package p2p - -import ( - "math/rand" - "testing" - - "github.com/stretchr/testify/assert" - . "github.com/tendermint/go-common" -) - -func TestBasic(t *testing.T) { - book := NewAddrBook(createTempFileName("addrbook"), true) - r := NewPEXReactor(book) - - assert.NotNil(t, r) - assert.NotEmpty(t, r.GetChannels()) -} - -func TestAddRemovePeer(t *testing.T) { - book := NewAddrBook(createTempFileName("addrbook"), true) - r := NewPEXReactor(book) - - size := book.Size() - peer := createRandomPeer(false) - - r.AddPeer(peer) - assert.Equal(t, size+1, book.Size()) - - r.RemovePeer(peer, "peer not available") - assert.Equal(t, size, book.Size()) - - outboundPeer := createRandomPeer(true) - - r.AddPeer(outboundPeer) - assert.Equal(t, size, book.Size(), "size must not change") - - r.RemovePeer(outboundPeer, "peer not available") - assert.Equal(t, size, book.Size(), "size must not change") -} - -func createRandomPeer(outbound bool) *Peer { - return &Peer{ - Key: RandStr(12), - NodeInfo: &NodeInfo{ - RemoteAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), - ListenAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), - }, - outbound: outbound, - } -} diff --git a/switch.go b/switch.go index 8ceb1ab70..6835e0a4b 100644 --- a/switch.go +++ b/switch.go @@ -531,10 +531,12 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S // TODO: let the config be passed in? s := initSwitch(i, NewSwitch(cfg.NewMapConfig(nil))) s.SetNodeInfo(&NodeInfo{ - PubKey: privKey.PubKey().(crypto.PubKeyEd25519), - Moniker: Fmt("switch%d", i), - Network: network, - Version: version, + PubKey: privKey.PubKey().(crypto.PubKeyEd25519), + Moniker: Fmt("switch%d", i), + Network: network, + Version: version, + RemoteAddr: Fmt("%v:%v", network, rand.Intn(64512)+1023), + ListenAddr: Fmt("%v:%v", network, rand.Intn(64512)+1023), }) s.SetNodePrivKey(privKey) return s From 1a59b6a3b4e318fe059cfcafa1b7b5f6a029e090 Mon Sep 17 00:00:00 2001 From: Anton Kalyaev Date: Thu, 12 Jan 2017 13:26:29 +0400 Subject: [PATCH 06/22] replace repeate timer with simple ticker no need for repeate timer here (no need for goroutine safety) --- pex_reactor.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pex_reactor.go b/pex_reactor.go index 56a1e3233..5cea1cb3c 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -142,19 +142,18 @@ func (r *PEXReactor) ensurePeersRoutine() { r.ensurePeers() // fire periodically - timer := NewRepeatTimer("pex", r.ensurePeersPeriod) + ticker := time.NewTicker(r.ensurePeersPeriod) FOR_LOOP: for { select { - case <-timer.Ch: + case <-ticker.C: r.ensurePeers() case <-r.Quit: break FOR_LOOP } } - // Cleanup - timer.Stop() + ticker.Stop() } // ensurePeers ensures that sufficient peers are connected. (once) From 47df1fb7d4646a5a69399839317d08d2bf18f086 Mon Sep 17 00:00:00 2001 From: Anton Kalyaev Date: Thu, 12 Jan 2017 15:16:51 +0400 Subject: [PATCH 07/22] test PEXReactor#Receive --- pex_reactor_test.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/pex_reactor_test.go b/pex_reactor_test.go index 3674d3f31..762d96810 100644 --- a/pex_reactor_test.go +++ b/pex_reactor_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" . "github.com/tendermint/go-common" + wire "github.com/tendermint/go-wire" ) func TestPEXReactorBasic(t *testing.T) { @@ -85,13 +86,30 @@ func TestPEXReactorRunning(t *testing.T) { } } +func TestPEXReactorReceive(t *testing.T) { + book := NewAddrBook(createTempFileName("addrbook"), true) + r := NewPEXReactor(book) + + peer := createRandomPeer(false) + + size := book.Size() + addrs := []*NetAddress{NewNetAddressString(peer.ListenAddr)} + msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) + r.Receive(PexChannel, peer, msg) + assert.Equal(t, size+1, book.Size()) + + msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}}) + r.Receive(PexChannel, peer, msg) +} + func createRandomPeer(outbound bool) *Peer { + addr := Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) return &Peer{ Key: RandStr(12), NodeInfo: &NodeInfo{ - RemoteAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), - ListenAddr: Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), + ListenAddr: addr, }, outbound: outbound, + mconn: &MConnection{RemoteAddress: NewNetAddressString(addr)}, } } From 873d34157d51d09116b0c0a99557367aafbd1897 Mon Sep 17 00:00:00 2001 From: Anton Kalyaev Date: Thu, 12 Jan 2017 17:56:40 +0400 Subject: [PATCH 08/22] prevent abuse from peers --- pex_reactor.go | 59 ++++++++++++++++++++++++++++++++++++++------- pex_reactor_test.go | 15 ++++++++++++ 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/pex_reactor.go b/pex_reactor.go index 5cea1cb3c..3e03138c1 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -12,10 +12,15 @@ import ( ) const ( - PexChannel = byte(0x00) + PexChannel = byte(0x00) + // period to ensure peers connected defaultEnsurePeersPeriod = 30 * time.Second minNumOutboundPeers = 10 maxPexMessageSize = 1048576 // 1MB + + // maximum messages one peer can send to us during `msgCountByPeerFlushInterval` + defaultMaxMsgCountByPeer = 1000 + msgCountByPeerFlushInterval = 1 * time.Hour ) // PEXReactor handles PEX (peer exchange) and ensures that an @@ -26,12 +31,18 @@ type PEXReactor struct { sw *Switch book *AddrBook ensurePeersPeriod time.Duration + + // tracks message count by peer, so we can prevent abuse + msgCountByPeer map[string]uint16 + maxMsgCountByPeer uint16 } func NewPEXReactor(b *AddrBook) *PEXReactor { r := &PEXReactor{ book: b, ensurePeersPeriod: defaultEnsurePeersPeriod, + msgCountByPeer: make(map[string]uint16), + maxMsgCountByPeer: defaultMaxMsgCountByPeer, } r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r) return r @@ -41,6 +52,7 @@ func (r *PEXReactor) OnStart() error { r.BaseReactor.OnStart() r.book.Start() go r.ensurePeersRoutine() + go r.flushMsgCountByPeer() return nil } @@ -89,24 +101,29 @@ func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { // Receive implements Reactor by handling incoming PEX messages. func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { + srcAddr := src.Connection().RemoteAddress + srcAddrStr := srcAddr.String() + r.msgCountByPeer[srcAddrStr]++ + if r.ReachedMaxMsgCountForPeer(srcAddrStr) { + log.Warn("Maximum number of messages reached for peer", "peer", srcAddrStr) + // TODO remove src from peers? + return + } + _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Warn("Error decoding message", "error", err) return } - log.Notice("Received message", "msg", msg) switch msg := msg.(type) { case *pexRequestMessage: // src requested some peers. - // TODO: prevent abuse. r.SendAddrs(src, r.book.GetSelection()) case *pexAddrsMessage: // We received some peer addresses from src. - // TODO: prevent abuse. // (We don't want to get spammed with bad peers) - srcAddr := src.Connection().RemoteAddress for _, addr := range msg.Addrs { if addr != nil { r.book.AddAddress(addr, srcAddr) @@ -132,6 +149,17 @@ func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) { r.ensurePeersPeriod = d } +// SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'. +func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) { + r.maxMsgCountByPeer = v +} + +// ReachedMaxMsgCountForPeer returns true if we received too many +// messages from peer with address `addr`. +func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool { + return r.msgCountByPeer[addr] >= r.maxMsgCountByPeer +} + // Ensures that sufficient peers are connected. (continuous) func (r *PEXReactor) ensurePeersRoutine() { // Randomize when routine starts @@ -143,17 +171,16 @@ func (r *PEXReactor) ensurePeersRoutine() { // fire periodically ticker := time.NewTicker(r.ensurePeersPeriod) -FOR_LOOP: + for { select { case <-ticker.C: r.ensurePeers() case <-r.Quit: - break FOR_LOOP + ticker.Stop() + return } } - - ticker.Stop() } // ensurePeers ensures that sufficient peers are connected. (once) @@ -222,6 +249,20 @@ func (r *PEXReactor) ensurePeers() { } } +func (r *PEXReactor) flushMsgCountByPeer() { + ticker := time.NewTicker(msgCountByPeerFlushInterval) + + for { + select { + case <-ticker.C: + r.msgCountByPeer = make(map[string]uint16) + case <-r.Quit: + ticker.Stop() + return + } + } +} + //----------------------------------------------------------------------------- // Messages diff --git a/pex_reactor_test.go b/pex_reactor_test.go index 762d96810..7c3a2dae4 100644 --- a/pex_reactor_test.go +++ b/pex_reactor_test.go @@ -102,6 +102,21 @@ func TestPEXReactorReceive(t *testing.T) { r.Receive(PexChannel, peer, msg) } +func TestPEXReactorAbuseFromPeer(t *testing.T) { + book := NewAddrBook(createTempFileName("addrbook"), true) + r := NewPEXReactor(book) + r.SetMaxMsgCountByPeer(5) + + peer := createRandomPeer(false) + + msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}}) + for i := 0; i < 10; i++ { + r.Receive(PexChannel, peer, msg) + } + + assert.True(t, r.ReachedMaxMsgCountForPeer(peer.ListenAddr)) +} + func createRandomPeer(outbound bool) *Peer { addr := Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) return &Peer{ From 07e7b98c7034befb350c878bf68e6175d6a5658a Mon Sep 17 00:00:00 2001 From: Anton Kalyaev Date: Thu, 12 Jan 2017 22:28:40 +0400 Subject: [PATCH 09/22] improve ensurePeers routine optimizations: - if we move peer to the old bucket as soon as connected and pick only from new group, we can skip alreadyConnected check --- pex_reactor.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/pex_reactor.go b/pex_reactor.go index 3e03138c1..a16823e33 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -191,30 +191,27 @@ func (r *PEXReactor) ensurePeers() { if numToDial <= 0 { return } - toDial := NewCMap() + + toDial := make(map[string]*NetAddress) // Try to pick numToDial addresses to dial. - // TODO: improve logic. for i := 0; i < numToDial; i++ { - newBias := MinInt(numOutPeers, 8)*10 + 10 var picked *NetAddress // Try to fetch a new peer 3 times. // This caps the maximum number of tries to 3 * numToDial. for j := 0; j < 3; j++ { - try := r.book.PickAddress(newBias) + // NOTE always picking from the new group because old one stores already + // connected peers. + try := r.book.PickAddress(100) if try == nil { break } - alreadySelected := toDial.Has(try.IP.String()) + _, alreadySelected := toDial[try.IP.String()] alreadyDialing := r.Switch.IsDialing(try) - alreadyConnected := r.Switch.Peers().Has(try.IP.String()) - if alreadySelected || alreadyDialing || alreadyConnected { - /* - log.Info("Cannot dial address", "addr", try, - "alreadySelected", alreadySelected, - "alreadyDialing", alreadyDialing, - "alreadyConnected", alreadyConnected) - */ + if alreadySelected || alreadyDialing { + // log.Info("Cannot dial address", "addr", try, + // "alreadySelected", alreadySelected, + // "alreadyDialing", alreadyDialing) continue } else { log.Info("Will dial address", "addr", try) @@ -225,17 +222,20 @@ func (r *PEXReactor) ensurePeers() { if picked == nil { continue } - toDial.Set(picked.IP.String(), picked) + toDial[picked.IP.String()] = picked } // Dial picked addresses - for _, item := range toDial.Values() { + for _, item := range toDial { go func(picked *NetAddress) { _, err := r.Switch.DialPeerWithAddress(picked, false) if err != nil { r.book.MarkAttempt(picked) + } else { + // move address to the old group + r.book.MarkGood(picked) } - }(item.(*NetAddress)) + }(item) } // If we need more addresses, pick a random peer and ask for more. From 5eeaffd38ee08ab2b765583df2682225b7f9fc0d Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 16 Jan 2017 20:31:50 +0400 Subject: [PATCH 10/22] do not create file, just temp dir --- pex_reactor_test.go | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/pex_reactor_test.go b/pex_reactor_test.go index 7c3a2dae4..67d123bec 100644 --- a/pex_reactor_test.go +++ b/pex_reactor_test.go @@ -1,18 +1,24 @@ package p2p import ( + "io/ioutil" "math/rand" + "os" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - . "github.com/tendermint/go-common" + cmn "github.com/tendermint/go-common" wire "github.com/tendermint/go-wire" ) func TestPEXReactorBasic(t *testing.T) { - book := NewAddrBook(createTempFileName("addrbook"), true) + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(t, err) + defer os.RemoveAll(dir) + book := NewAddrBook(dir+"addrbook.json", true) + r := NewPEXReactor(book) assert.NotNil(t, r) @@ -20,7 +26,11 @@ func TestPEXReactorBasic(t *testing.T) { } func TestPEXReactorAddRemovePeer(t *testing.T) { - book := NewAddrBook(createTempFileName("addrbook"), true) + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(t, err) + defer os.RemoveAll(dir) + book := NewAddrBook(dir+"addrbook.json", true) + r := NewPEXReactor(book) size := book.Size() @@ -45,11 +55,14 @@ func TestPEXReactorRunning(t *testing.T) { N := 3 switches := make([]*Switch, N) - book := NewAddrBook(createTempFileName("addrbook"), false) + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(t, err) + defer os.RemoveAll(dir) + book := NewAddrBook(dir+"addrbook.json", false) // create switches for i := 0; i < N; i++ { - switches[i] = makeSwitch(i, "172.17.0.2", "123.123.123", func(i int, sw *Switch) *Switch { + switches[i] = makeSwitch(i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { r := NewPEXReactor(book) r.SetEnsurePeersPeriod(250 * time.Millisecond) sw.AddReactor("pex", r) @@ -87,7 +100,11 @@ func TestPEXReactorRunning(t *testing.T) { } func TestPEXReactorReceive(t *testing.T) { - book := NewAddrBook(createTempFileName("addrbook"), true) + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(t, err) + defer os.RemoveAll(dir) + book := NewAddrBook(dir+"addrbook.json", true) + r := NewPEXReactor(book) peer := createRandomPeer(false) @@ -103,7 +120,11 @@ func TestPEXReactorReceive(t *testing.T) { } func TestPEXReactorAbuseFromPeer(t *testing.T) { - book := NewAddrBook(createTempFileName("addrbook"), true) + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(t, err) + defer os.RemoveAll(dir) + book := NewAddrBook(dir+"addrbook.json", true) + r := NewPEXReactor(book) r.SetMaxMsgCountByPeer(5) @@ -118,9 +139,9 @@ func TestPEXReactorAbuseFromPeer(t *testing.T) { } func createRandomPeer(outbound bool) *Peer { - addr := Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) + addr := cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) return &Peer{ - Key: RandStr(12), + Key: cmn.RandStr(12), NodeInfo: &NodeInfo{ ListenAddr: addr, }, From 590efc10404e3bf7795e382012d71b0005f44693 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 16 Jan 2017 23:36:10 +0400 Subject: [PATCH 11/22] call saveToFile OnStop This is better than waiting because while we wait, anything could happen (crash, timeout of the code who's using addrbook, ...). If we save immediately, we have much greater chances of success. --- addrbook.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/addrbook.go b/addrbook.go index ace0dba4a..7a65cb4be 100644 --- a/addrbook.go +++ b/addrbook.go @@ -15,7 +15,6 @@ import ( "time" . "github.com/tendermint/go-common" - "github.com/tendermint/go-crypto" ) const ( @@ -86,7 +85,6 @@ type AddrBook struct { addrLookup map[string]*knownAddress // new & old addrNew []map[string]*knownAddress addrOld []map[string]*knownAddress - wg sync.WaitGroup nOld int nNew int } @@ -128,7 +126,6 @@ func (a *AddrBook) init() { func (a *AddrBook) OnStart() error { a.BaseService.OnStart() a.loadFromFile(a.filePath) - a.wg.Add(1) go a.saveRoutine() return nil } @@ -139,6 +136,7 @@ func (a *AddrBook) OnStop() { func (a *AddrBook) Wait() { a.wg.Wait() + a.saveToFile(a.filePath) } func (a *AddrBook) AddOurAddress(addr *NetAddress) { @@ -309,6 +307,8 @@ type addrBookJSON struct { } func (a *AddrBook) saveToFile(filePath string) { + log.Info("Saving AddrBook to file", "size", a.Size()) + // Compile Addrs addrs := []*knownAddress{} for _, ka := range a.addrLookup { @@ -386,7 +386,6 @@ out: for { select { case <-dumpAddressTicker.C: - log.Info("Saving AddrBook to file", "size", a.Size()) a.saveToFile(a.filePath) case <-a.Quit: break out @@ -394,7 +393,6 @@ out: } dumpAddressTicker.Stop() a.saveToFile(a.filePath) - a.wg.Done() log.Notice("Address handler done") } From 52d9cf080e08fd539a8aebbb82aacaa9a5459551 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 16 Jan 2017 23:57:07 +0400 Subject: [PATCH 12/22] make GoLint happy --- pex_reactor.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pex_reactor.go b/pex_reactor.go index a16823e33..89599f4b4 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -7,12 +7,13 @@ import ( "reflect" "time" - . "github.com/tendermint/go-common" wire "github.com/tendermint/go-wire" ) const ( + // PexChannel is a channel for PEX messages PexChannel = byte(0x00) + // period to ensure peers connected defaultEnsurePeersPeriod = 30 * time.Second minNumOutboundPeers = 10 @@ -25,6 +26,8 @@ const ( // PEXReactor handles PEX (peer exchange) and ensures that an // adequate number of peers are connected to the switch. +// +// It uses `AddrBook` (address book) to store `NetAddress`es of the peers. type PEXReactor struct { BaseReactor @@ -37,6 +40,7 @@ type PEXReactor struct { maxMsgCountByPeer uint16 } +// NewPEXReactor creates new PEX reactor. func NewPEXReactor(b *AddrBook) *PEXReactor { r := &PEXReactor{ book: b, @@ -48,6 +52,7 @@ func NewPEXReactor(b *AddrBook) *PEXReactor { return r } +// OnStart implements BaseService func (r *PEXReactor) OnStart() error { r.BaseReactor.OnStart() r.book.Start() @@ -56,6 +61,7 @@ func (r *PEXReactor) OnStart() error { return nil } +// OnStop implements BaseService func (r *PEXReactor) OnStop() { r.BaseReactor.OnStop() r.book.Stop() @@ -92,7 +98,7 @@ func (r *PEXReactor) AddPeer(p *Peer) { } } -// RemovePeer implements Reactor +// RemovePeer implements Reactor by removing peer from the address book. func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { addr := NewNetAddressString(p.ListenAddr) // addr will be ejected from the book @@ -130,7 +136,7 @@ func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { } } default: - log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + log.Warn(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } } @@ -271,6 +277,8 @@ const ( msgTypeAddrs = byte(0x02) ) +// PexMessage is a primary type for PEX messages. Underneath, it could contain +// either pexRequestMessage, or pexAddrsMessage messages. type PexMessage interface{} var _ = wire.RegisterInterface( @@ -279,6 +287,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs}, ) +// DecodeMessage implements interface registered above. func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) { msgType = bz[0] n := new(int) From 324293f4cbbc108c66fa09bc24ac3e863f2b7671 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 17 Jan 2017 22:30:03 +0400 Subject: [PATCH 13/22] note on preventing abuse [ci skip] --- pex_reactor.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pex_reactor.go b/pex_reactor.go index 89599f4b4..3a3d66714 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -28,6 +28,18 @@ const ( // adequate number of peers are connected to the switch. // // It uses `AddrBook` (address book) to store `NetAddress`es of the peers. +// +// ## Preventing abuse +// +// For now, it just limits the number of messages from one peer to +// `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000 +// msg/hour). +// +// NOTE [2017-01-17]: +// Limiting is fine for now. Maybe down the road we want to keep track of the +// quality of peer messages so if peerA keeps telling us about peers we can't +// connect to then maybe we should care less about peerA. But I don't think +// that kind of complexity is priority right now. type PEXReactor struct { BaseReactor From cf18bf296628e6b32f6562c40671e0c331102b0e Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 20 Jan 2017 15:27:33 +0400 Subject: [PATCH 14/22] add public RemoveAddress API after discussion with @ebuchman (https://github.com/tendermint/go-p2p/pull/10#discussion_r96471729) --- addrbook.go | 10 ++++++++-- addrbook_test.go | 19 +++++++++++++++++++ pex_reactor.go | 8 ++++++-- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/addrbook.go b/addrbook.go index 7a65cb4be..9dbbc2bf5 100644 --- a/addrbook.go +++ b/addrbook.go @@ -252,15 +252,21 @@ func (a *AddrBook) MarkAttempt(addr *NetAddress) { ka.markAttempt() } +// MarkBad currently just ejects the address. In the future, consider +// blacklisting. func (a *AddrBook) MarkBad(addr *NetAddress) { + a.RemoveAddress(addr) +} + +// RemoveAddress removes the address from the book. +func (a *AddrBook) RemoveAddress(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() ka := a.addrLookup[addr.String()] if ka == nil { return } - // We currently just eject the address. - // In the future, consider blacklisting. + log.Info("Remove address from book", "addr", addr) a.removeFromAllBuckets(ka) } diff --git a/addrbook_test.go b/addrbook_test.go index 7e8cb8d76..0f5ced5cf 100644 --- a/addrbook_test.go +++ b/addrbook_test.go @@ -9,6 +9,9 @@ import ( "github.com/stretchr/testify/assert" ) + "github.com/stretchr/testify/assert" +) + func createTempFileName(prefix string) string { f, err := ioutil.TempFile("", prefix) if err != nil { @@ -148,3 +151,19 @@ func randIPv4Address(t *testing.T) *NetAddress { } } } + +func TestAddrBookRemoveAddress(t *testing.T) { + fname := createTempFileName("addrbook_test") + book := NewAddrBook(fname, true) + + addr := randIPv4Address() + book.AddAddress(addr, addr) + assert.Equal(t, 1, book.Size()) + + book.RemoveAddress(addr) + assert.Equal(t, 0, book.Size()) + + nonExistingAddr := randIPv4Address() + book.RemoveAddress(nonExistingAddr) + assert.Equal(t, 0, book.Size()) +} diff --git a/pex_reactor.go b/pex_reactor.go index 3a3d66714..79a9200ff 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -111,10 +111,14 @@ func (r *PEXReactor) AddPeer(p *Peer) { } // RemovePeer implements Reactor by removing peer from the address book. +// +// The peer will be proposed to us by other peers (PexAddrsMessage) or himself +// and we will add him again upon successful connection. Note that other peers +// will remove him too. The peer will need to send first requests to others by +// himself (he will have an addrbook or the seeds). func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { addr := NewNetAddressString(p.ListenAddr) - // addr will be ejected from the book - r.book.MarkBad(addr) + r.book.RemoveAddress(addr) } // Receive implements Reactor by handling incoming PEX messages. From 0277e52bd5f27e4f2148353be6d674871a6ba41e Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 14 Apr 2017 23:59:22 +0400 Subject: [PATCH 15/22] fix merge --- addrbook.go | 2 +- addrbook_test.go | 7 ++----- pex_reactor.go | 21 ++++++++++++--------- pex_reactor_test.go | 8 +++++--- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/addrbook.go b/addrbook.go index 9dbbc2bf5..c33b97aa8 100644 --- a/addrbook.go +++ b/addrbook.go @@ -15,6 +15,7 @@ import ( "time" . "github.com/tendermint/go-common" + crypto "github.com/tendermint/go-crypto" ) const ( @@ -135,7 +136,6 @@ func (a *AddrBook) OnStop() { } func (a *AddrBook) Wait() { - a.wg.Wait() a.saveToFile(a.filePath) } diff --git a/addrbook_test.go b/addrbook_test.go index 0f5ced5cf..16aea8ef9 100644 --- a/addrbook_test.go +++ b/addrbook_test.go @@ -9,9 +9,6 @@ import ( "github.com/stretchr/testify/assert" ) - "github.com/stretchr/testify/assert" -) - func createTempFileName(prefix string) string { f, err := ioutil.TempFile("", prefix) if err != nil { @@ -156,14 +153,14 @@ func TestAddrBookRemoveAddress(t *testing.T) { fname := createTempFileName("addrbook_test") book := NewAddrBook(fname, true) - addr := randIPv4Address() + addr := randIPv4Address(t) book.AddAddress(addr, addr) assert.Equal(t, 1, book.Size()) book.RemoveAddress(addr) assert.Equal(t, 0, book.Size()) - nonExistingAddr := randIPv4Address() + nonExistingAddr := randIPv4Address(t) book.RemoveAddress(nonExistingAddr) assert.Equal(t, 0, book.Size()) } diff --git a/pex_reactor.go b/pex_reactor.go index 79a9200ff..a86bebe12 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -93,19 +93,17 @@ func (r *PEXReactor) GetChannels() []*ChannelDescriptor { // AddPeer implements Reactor by adding peer to the address book (if inbound) // or by requesting more addresses (if outbound). func (r *PEXReactor) AddPeer(p *Peer) { - netAddr, err := NewNetAddressString(p.ListenAddr) - if err != nil { - // this should never happen - log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err) - return - } - if p.IsOutbound() { // For outbound peers, the address is already in the books if r.book.NeedMoreAddrs() { r.RequestPEX(p) } } else { // For inbound connections, the peer is its own source - addr := NewNetAddressString(p.ListenAddr) + addr, err := NewNetAddressString(p.ListenAddr) + if err != nil { + // this should never happen + log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err) + return + } r.book.AddAddress(addr, addr) } } @@ -117,7 +115,12 @@ func (r *PEXReactor) AddPeer(p *Peer) { // will remove him too. The peer will need to send first requests to others by // himself (he will have an addrbook or the seeds). func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { - addr := NewNetAddressString(p.ListenAddr) + addr, err := NewNetAddressString(p.ListenAddr) + if err != nil { + // this should never happen + log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err) + return + } r.book.RemoveAddress(addr) } diff --git a/pex_reactor_test.go b/pex_reactor_test.go index 67d123bec..525efd3cb 100644 --- a/pex_reactor_test.go +++ b/pex_reactor_test.go @@ -72,7 +72,7 @@ func TestPEXReactorRunning(t *testing.T) { // fill the address book and add listeners for _, s := range switches { - addr := NewNetAddressString(s.NodeInfo().ListenAddr) + addr, _ := NewNetAddressString(s.NodeInfo().ListenAddr) book.AddAddress(addr, addr) s.AddListener(NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true)) } @@ -110,7 +110,8 @@ func TestPEXReactorReceive(t *testing.T) { peer := createRandomPeer(false) size := book.Size() - addrs := []*NetAddress{NewNetAddressString(peer.ListenAddr)} + netAddr, _ := NewNetAddressString(peer.ListenAddr) + addrs := []*NetAddress{netAddr} msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) r.Receive(PexChannel, peer, msg) assert.Equal(t, size+1, book.Size()) @@ -140,12 +141,13 @@ func TestPEXReactorAbuseFromPeer(t *testing.T) { func createRandomPeer(outbound bool) *Peer { addr := cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) + netAddr, _ := NewNetAddressString(addr) return &Peer{ Key: cmn.RandStr(12), NodeInfo: &NodeInfo{ ListenAddr: addr, }, outbound: outbound, - mconn: &MConnection{RemoteAddress: NewNetAddressString(addr)}, + mconn: &MConnection{RemoteAddress: netAddr}, } } From 4c0d1d3ad2987fe99826915dc935d4c51339aade Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 17 Apr 2017 13:03:26 +0400 Subject: [PATCH 16/22] return wg to addrbook --- addrbook.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/addrbook.go b/addrbook.go index c33b97aa8..5450c515f 100644 --- a/addrbook.go +++ b/addrbook.go @@ -73,7 +73,12 @@ const ( serializationVersion = 1 ) -/* AddrBook - concurrency safe peer address manager */ +const ( + bucketTypeNew = 0x01 + bucketTypeOld = 0x02 +) + +// AddrBook - concurrency safe peer address manager. type AddrBook struct { BaseService @@ -86,15 +91,12 @@ type AddrBook struct { addrLookup map[string]*knownAddress // new & old addrNew []map[string]*knownAddress addrOld []map[string]*knownAddress + wg sync.WaitGroup nOld int nNew int } -const ( - bucketTypeNew = 0x01 - bucketTypeOld = 0x02 -) - +// NewAddrBook creates a new address book. // Use Start to begin processing asynchronous address updates. func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook { am := &AddrBook{ @@ -124,19 +126,22 @@ func (a *AddrBook) init() { } } +// OnStart implements Service. func (a *AddrBook) OnStart() error { a.BaseService.OnStart() a.loadFromFile(a.filePath) + a.wg.Add(1) go a.saveRoutine() return nil } -func (a *AddrBook) OnStop() { - a.BaseService.OnStop() +func (a *AddrBook) Wait() { + a.wg.Wait() } -func (a *AddrBook) Wait() { - a.saveToFile(a.filePath) +// OnStop implements Service. +func (a *AddrBook) OnStop() { + a.BaseService.OnStop() } func (a *AddrBook) AddOurAddress(addr *NetAddress) { @@ -399,6 +404,7 @@ out: } dumpAddressTicker.Stop() a.saveToFile(a.filePath) + a.wg.Done() log.Notice("Address handler done") } From 5ab8ca0868a94e776e297d47c31226d9e66cca38 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 17 Apr 2017 13:22:59 +0400 Subject: [PATCH 17/22] fix race --- addrbook.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/addrbook.go b/addrbook.go index 5450c515f..8a1698f41 100644 --- a/addrbook.go +++ b/addrbook.go @@ -135,15 +135,15 @@ func (a *AddrBook) OnStart() error { return nil } -func (a *AddrBook) Wait() { - a.wg.Wait() -} - // OnStop implements Service. func (a *AddrBook) OnStop() { a.BaseService.OnStop() } +func (a *AddrBook) Wait() { + a.wg.Wait() +} + func (a *AddrBook) AddOurAddress(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() @@ -320,6 +320,8 @@ type addrBookJSON struct { func (a *AddrBook) saveToFile(filePath string) { log.Info("Saving AddrBook to file", "size", a.Size()) + a.mtx.Lock() + defer a.mtx.Unlock() // Compile Addrs addrs := []*knownAddress{} for _, ka := range a.addrLookup { From 9ce71013df9ccf08890307314ec13500ec58f6f3 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 20 Apr 2017 12:49:54 +0400 Subject: [PATCH 18/22] revert e448199 --- pex_reactor.go | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/pex_reactor.go b/pex_reactor.go index a86bebe12..489635556 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -7,6 +7,7 @@ import ( "reflect" "time" + cmn "github.com/tendermint/go-common" wire "github.com/tendermint/go-wire" ) @@ -209,6 +210,17 @@ func (r *PEXReactor) ensurePeersRoutine() { } // ensurePeers ensures that sufficient peers are connected. (once) +// +// Old bucket / New bucket are arbitrary categories to denote whether an +// address is vetted or not, and this needs to be determined over time via a +// heuristic that we haven't perfected yet, or, perhaps is manually edited by +// the node operator. It should not be used to compute what addresses are +// already connected or not. +// +// TODO Basically, we need to work harder on our good-peer/bad-peer marking. +// What we're currently doing in terms of marking good/bad peers is just a +// placeholder. It should not be the case that an address becomes old/vetted +// upon a single successful connection. func (r *PEXReactor) ensurePeers() { numOutPeers, _, numDialing := r.Switch.NumPeers() numToDial := minNumOutboundPeers - (numOutPeers + numDialing) @@ -221,22 +233,28 @@ func (r *PEXReactor) ensurePeers() { // Try to pick numToDial addresses to dial. for i := 0; i < numToDial; i++ { + // The purpose of newBias is to first prioritize old (more vetted) peers + // when we have few connections, but to allow for new (less vetted) peers + // if we already have many connections. This algorithm isn't perfect, but + // it somewhat ensures that we prioritize connecting to more-vetted + // peers. + newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 var picked *NetAddress // Try to fetch a new peer 3 times. // This caps the maximum number of tries to 3 * numToDial. for j := 0; j < 3; j++ { - // NOTE always picking from the new group because old one stores already - // connected peers. - try := r.book.PickAddress(100) + try := r.book.PickAddress(newBias) if try == nil { break } _, alreadySelected := toDial[try.IP.String()] alreadyDialing := r.Switch.IsDialing(try) - if alreadySelected || alreadyDialing { + alreadyConnected := r.Switch.Peers().Has(try.IP.String()) + if alreadySelected || alreadyDialing || alreadyConnected { // log.Info("Cannot dial address", "addr", try, // "alreadySelected", alreadySelected, - // "alreadyDialing", alreadyDialing) + // "alreadyDialing", alreadyDialing, + // "alreadyConnected", alreadyConnected) continue } else { log.Info("Will dial address", "addr", try) @@ -256,9 +274,6 @@ func (r *PEXReactor) ensurePeers() { _, err := r.Switch.DialPeerWithAddress(picked, false) if err != nil { r.book.MarkAttempt(picked) - } else { - // move address to the old group - r.book.MarkGood(picked) } }(item) } From 17ec70fc096be1cc3eee7b8580caee21ab0f0c71 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 20 Apr 2017 13:04:40 +0400 Subject: [PATCH 19/22] revert 2710873 --- pex_reactor.go | 16 +++------------- pex_reactor_test.go | 38 ++++++++++++++++++++++++-------------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/pex_reactor.go b/pex_reactor.go index 489635556..c6e4fbf3d 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -109,20 +109,10 @@ func (r *PEXReactor) AddPeer(p *Peer) { } } -// RemovePeer implements Reactor by removing peer from the address book. -// -// The peer will be proposed to us by other peers (PexAddrsMessage) or himself -// and we will add him again upon successful connection. Note that other peers -// will remove him too. The peer will need to send first requests to others by -// himself (he will have an addrbook or the seeds). +// RemovePeer implements Reactor. func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { - addr, err := NewNetAddressString(p.ListenAddr) - if err != nil { - // this should never happen - log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err) - return - } - r.book.RemoveAddress(addr) + // If we aren't keeping track of local temp data for each peer here, then we + // don't have to do anything. } // Receive implements Reactor by handling incoming PEX messages. diff --git a/pex_reactor_test.go b/pex_reactor_test.go index 525efd3cb..13f2fa208 100644 --- a/pex_reactor_test.go +++ b/pex_reactor_test.go @@ -14,20 +14,24 @@ import ( ) func TestPEXReactorBasic(t *testing.T) { + assert, require := assert.New(t), require.New(t) + dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(t, err) + require.Nil(err) defer os.RemoveAll(dir) book := NewAddrBook(dir+"addrbook.json", true) r := NewPEXReactor(book) - assert.NotNil(t, r) - assert.NotEmpty(t, r.GetChannels()) + assert.NotNil(r) + assert.NotEmpty(r.GetChannels()) } func TestPEXReactorAddRemovePeer(t *testing.T) { + assert, require := assert.New(t), require.New(t) + dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(t, err) + require.Nil(err) defer os.RemoveAll(dir) book := NewAddrBook(dir+"addrbook.json", true) @@ -37,26 +41,28 @@ func TestPEXReactorAddRemovePeer(t *testing.T) { peer := createRandomPeer(false) r.AddPeer(peer) - assert.Equal(t, size+1, book.Size()) + assert.Equal(size+1, book.Size()) r.RemovePeer(peer, "peer not available") - assert.Equal(t, size, book.Size()) + assert.Equal(size+1, book.Size()) outboundPeer := createRandomPeer(true) r.AddPeer(outboundPeer) - assert.Equal(t, size, book.Size(), "size must not change") + assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book") r.RemovePeer(outboundPeer, "peer not available") - assert.Equal(t, size, book.Size(), "size must not change") + assert.Equal(size+1, book.Size()) } func TestPEXReactorRunning(t *testing.T) { + require := require.New(t) + N := 3 switches := make([]*Switch, N) dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(t, err) + require.Nil(err) defer os.RemoveAll(dir) book := NewAddrBook(dir+"addrbook.json", false) @@ -80,7 +86,7 @@ func TestPEXReactorRunning(t *testing.T) { // start switches for _, s := range switches { _, err := s.Start() // start switch and reactors - require.Nil(t, err) + require.Nil(err) } time.Sleep(1 * time.Second) @@ -100,8 +106,10 @@ func TestPEXReactorRunning(t *testing.T) { } func TestPEXReactorReceive(t *testing.T) { + assert, require := assert.New(t), require.New(t) + dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(t, err) + require.Nil(err) defer os.RemoveAll(dir) book := NewAddrBook(dir+"addrbook.json", true) @@ -114,15 +122,17 @@ func TestPEXReactorReceive(t *testing.T) { addrs := []*NetAddress{netAddr} msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) r.Receive(PexChannel, peer, msg) - assert.Equal(t, size+1, book.Size()) + assert.Equal(size+1, book.Size()) msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}}) r.Receive(PexChannel, peer, msg) } func TestPEXReactorAbuseFromPeer(t *testing.T) { + assert, require := assert.New(t), require.New(t) + dir, err := ioutil.TempDir("", "pex_reactor") - require.Nil(t, err) + require.Nil(err) defer os.RemoveAll(dir) book := NewAddrBook(dir+"addrbook.json", true) @@ -136,7 +146,7 @@ func TestPEXReactorAbuseFromPeer(t *testing.T) { r.Receive(PexChannel, peer, msg) } - assert.True(t, r.ReachedMaxMsgCountForPeer(peer.ListenAddr)) + assert.True(r.ReachedMaxMsgCountForPeer(peer.ListenAddr)) } func createRandomPeer(outbound bool) *Peer { From 8655e2456e7f7551549f1c2b7d65115925a9b0f8 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 20 Apr 2017 13:33:27 +0400 Subject: [PATCH 20/22] it is non-deterministic (could fail sometimes) --- connection_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/connection_test.go b/connection_test.go index 84e20eee3..33d8adfd1 100644 --- a/connection_test.go +++ b/connection_test.go @@ -39,7 +39,6 @@ func TestMConnectionSend(t *testing.T) { assert.True(mconn.Send(0x01, msg)) // Note: subsequent Send/TrySend calls could pass because we are reading from // the send queue in a separate goroutine. - assert.False(mconn.CanSend(0x01), "CanSend should return false because queue is full") server.Read(make([]byte, len(msg))) assert.True(mconn.CanSend(0x01)) From 391c738959f7c800e0ff04d49b7d3cbfda5df427 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 20 Apr 2017 12:21:45 -0400 Subject: [PATCH 21/22] update comment about outbound peers and addrbook --- pex_reactor.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pex_reactor.go b/pex_reactor.go index c6e4fbf3d..0244416d1 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -94,7 +94,10 @@ func (r *PEXReactor) GetChannels() []*ChannelDescriptor { // AddPeer implements Reactor by adding peer to the address book (if inbound) // or by requesting more addresses (if outbound). func (r *PEXReactor) AddPeer(p *Peer) { - if p.IsOutbound() { // For outbound peers, the address is already in the books + if p.IsOutbound() { + // For outbound peers, the address is already in the books. + // Either it was added in DialSeeds or when we + // received the peer's address in r.Receive if r.book.NeedMoreAddrs() { r.RequestPEX(p) } From 75bad132fc71e08b13dbf3b1b15b6fa8026d7adf Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 20 Apr 2017 17:29:43 -0400 Subject: [PATCH 22/22] msgCountByPeer is a CMap --- pex_reactor.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/pex_reactor.go b/pex_reactor.go index 0244416d1..4b6129762 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -49,7 +49,7 @@ type PEXReactor struct { ensurePeersPeriod time.Duration // tracks message count by peer, so we can prevent abuse - msgCountByPeer map[string]uint16 + msgCountByPeer *cmn.CMap maxMsgCountByPeer uint16 } @@ -58,7 +58,7 @@ func NewPEXReactor(b *AddrBook) *PEXReactor { r := &PEXReactor{ book: b, ensurePeersPeriod: defaultEnsurePeersPeriod, - msgCountByPeer: make(map[string]uint16), + msgCountByPeer: cmn.NewCMap(), maxMsgCountByPeer: defaultMaxMsgCountByPeer, } r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r) @@ -122,7 +122,8 @@ func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { srcAddr := src.Connection().RemoteAddress srcAddrStr := srcAddr.String() - r.msgCountByPeer[srcAddrStr]++ + + r.IncrementMsgCountForPeer(srcAddrStr) if r.ReachedMaxMsgCountForPeer(srcAddrStr) { log.Warn("Maximum number of messages reached for peer", "peer", srcAddrStr) // TODO remove src from peers? @@ -175,8 +176,20 @@ func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) { // ReachedMaxMsgCountForPeer returns true if we received too many // messages from peer with address `addr`. +// NOTE: assumes the value in the CMap is non-nil func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool { - return r.msgCountByPeer[addr] >= r.maxMsgCountByPeer + return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer +} + +// Increment or initialize the msg count for the peer in the CMap +func (r *PEXReactor) IncrementMsgCountForPeer(addr string) { + var count uint16 + countI := r.msgCountByPeer.Get(addr) + if countI != nil { + count = countI.(uint16) + } + count++ + r.msgCountByPeer.Set(addr, count) } // Ensures that sufficient peers are connected. (continuous) @@ -288,7 +301,7 @@ func (r *PEXReactor) flushMsgCountByPeer() { for { select { case <-ticker.C: - r.msgCountByPeer = make(map[string]uint16) + r.msgCountByPeer.Clear() case <-r.Quit: ticker.Stop() return