Browse Source

p2p: peer state init too late and pex message too soon (#3634)

* fix peer state init to late

Peer does not have a state yet. We set it in AddPeer.
We need an new interface before mconnection is started

* pex message to soon

fix reconnection pex send too fast,
error is caused lastReceivedRequests is still
not deleted when a peer reconnected

* add test case for initpeer

* add prove case

* remove potentially infinite loop

* Update consensus/reactor.go

Co-Authored-By: guagualvcha <baifudong@lancai.cn>

* Update consensus/reactor_test.go

Co-Authored-By: guagualvcha <baifudong@lancai.cn>

* document Reactor interface better

* refactor TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet

* fix merge conflicts

* blockchain: remove peer's ID from the pool in InitPeer

Refs #3338

* pex: resetPeersRequestsInfo both upon InitPeer and RemovePeer

* ensure RemovePeer is always called before InitPeer

by removing the peer from the switch last (after we've stopped it and
removed from all reactors)

* add some comments for ConsensusReactor#InitPeer

* fix pex reactor

* format code

* fix spelling

* update changelog

* remove unused methods

* do not clear lastReceivedRequests upon error

only in RemovePeer

* call InitPeer before we start the peer!

* add a comment to InitPeer

* write a test

* use waitUntilSwitchHasAtLeastNPeers func

* bring back timeouts

* Test to ensure Receive panics if InitPeer has not been called
v0.31.6-changelog
Anton Kaliaev 5 years ago
committed by Ethan Buchman
parent
commit
bcf10d5bae
11 changed files with 180 additions and 32 deletions
  1. +5
    -0
      CHANGELOG_PENDING.md
  2. +1
    -0
      consensus/byzantine_test.go
  3. +14
    -6
      consensus/reactor.go
  4. +44
    -0
      consensus/reactor_test.go
  5. +0
    -5
      evidence/reactor.go
  6. +21
    -3
      p2p/base_reactor.go
  7. +12
    -12
      p2p/pex/pex_reactor.go
  8. +2
    -2
      p2p/pex/pex_reactor_test.go
  9. +14
    -3
      p2p/switch.go
  10. +66
    -0
      p2p/switch_test.go
  11. +1
    -1
      p2p/test_util.go

+ 5
- 0
CHANGELOG_PENDING.md View File

@ -22,6 +22,7 @@
* `MockMempool` moved to top-level mock package and renamed to `Mempool`
- [libs/common] Removed `PanicSanity`, `PanicCrisis`, `PanicConsensus` and `PanicQ`
- [node] Moved `GenesisDocProvider` and `DefaultGenesisDocProviderFunc` to state package
- [p2p] \#3346 Reactor#InitPeer method is added to Reactor interface
* Blockchain Protocol
@ -50,6 +51,10 @@
- [p2p] \#3532 limit the number of attempts to connect to a peer in seed mode
to 16 (as a result, the node will stop retrying after a 35 hours time window)
- [consensus] \#2723, \#3451 and \#3317 Fix non-deterministic tests
- [consensus] \#3346 Create a peer state in consensus reactor before the peer
is started (@guagualvcha)
- [p2p] \#3338 Ensure RemovePeer is always called before InitPeer (upon a peer
reconnecting to our node)
- [p2p] \#3362 make persistent prop independent of conn direction
* `Switch#DialPeersAsync` now only takes a list of peers
* `Switch#DialPeerWithAddress` now only takes an address


+ 1
- 0
consensus/byzantine_test.go View File

@ -270,3 +270,4 @@ func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

+ 14
- 6
consensus/reactor.go View File

@ -155,16 +155,24 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
}
}
// AddPeer implements Reactor
// InitPeer implements Reactor by creating a state for the peer.
func (conR *ConsensusReactor) InitPeer(peer p2p.Peer) p2p.Peer {
peerState := NewPeerState(peer).SetLogger(conR.Logger)
peer.Set(types.PeerStateKey, peerState)
return peer
}
// AddPeer implements Reactor by spawning multiple gossiping goroutines for the
// peer.
func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) {
if !conR.IsRunning() {
return
}
// Create peerState for peer
peerState := NewPeerState(peer).SetLogger(conR.Logger)
peer.Set(types.PeerStateKey, peerState)
peerState, ok := peer.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("peer %v has no state", peer))
}
// Begin routines for this peer.
go conR.gossipDataRoutine(peer, peerState)
go conR.gossipVotesRoutine(peer, peerState)
@ -177,7 +185,7 @@ func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) {
}
}
// RemovePeer implements Reactor
// RemovePeer is a noop.
func (conR *ConsensusReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
if !conR.IsRunning() {
return


+ 44
- 0
consensus/reactor_test.go View File

@ -23,6 +23,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/mock"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
@ -243,6 +244,49 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
}, css)
}
func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
N := 1
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
defer cleanup()
reactors, _, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
var (
reactor = reactors[0]
peer = mock.NewPeer(nil)
msg = cdc.MustMarshalBinaryBare(&HasVoteMessage{Height: 1, Round: 1, Index: 1, Type: types.PrevoteType})
)
reactor.InitPeer(peer)
// simulate switch calling Receive before AddPeer
assert.NotPanics(t, func() {
reactor.Receive(StateChannel, peer, msg)
reactor.AddPeer(peer)
})
}
func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
N := 1
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
defer cleanup()
reactors, _, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
var (
reactor = reactors[0]
peer = mock.NewPeer(nil)
msg = cdc.MustMarshalBinaryBare(&HasVoteMessage{Height: 1, Round: 1, Index: 1, Type: types.PrevoteType})
)
// we should call InitPeer here
// simulate switch calling Receive before AddPeer
assert.Panics(t, func() {
reactor.Receive(StateChannel, peer, msg)
})
}
// Test we record stats about votes and block parts from other peers.
func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
N := 4


+ 0
- 5
evidence/reactor.go View File

@ -60,11 +60,6 @@ func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
go evR.broadcastEvidenceRoutine(peer)
}
// RemovePeer implements Reactor.
func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// nothing to do
}
// Receive implements Reactor.
// It adds any received evidence to the evpool.
func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {


+ 21
- 3
p2p/base_reactor.go View File

@ -5,23 +5,40 @@ import (
"github.com/tendermint/tendermint/p2p/conn"
)
// Reactor is responsible for handling incoming messages on one or more
// Channel. Switch calls GetChannels when reactor is added to it. When a new
// peer joins our node, InitPeer and AddPeer are called. RemovePeer is called
// when the peer is stopped. Receive is called when a message is received on a
// channel associated with this reactor.
//
// Peer#Send or Peer#TrySend should be used to send the message to a peer.
type Reactor interface {
cmn.Service // Start, Stop
// SetSwitch allows setting a switch.
SetSwitch(*Switch)
// GetChannels returns the list of channel descriptors.
// GetChannels returns the list of MConnection.ChannelDescriptor. Make sure
// that each ID is unique across all the reactors added to the switch.
GetChannels() []*conn.ChannelDescriptor
// AddPeer is called by the switch when a new peer is added.
// InitPeer is called by the switch before the peer is started. Use it to
// initialize data for the peer (e.g. peer state).
//
// NOTE: The switch won't call AddPeer nor RemovePeer if it fails to start
// the peer. Do not store any data associated with the peer in the reactor
// itself unless you don't want to have a state, which is never cleaned up.
InitPeer(peer Peer) Peer
// AddPeer is called by the switch after the peer is added and successfully
// started. Use it to start goroutines communicating with the peer.
AddPeer(peer Peer)
// RemovePeer is called by the switch when the peer is stopped (due to error
// or other reason).
RemovePeer(peer Peer, reason interface{})
// Receive is called when msgBytes is received from peer.
// Receive is called by the switch when msgBytes is received from the peer.
//
// NOTE reactor can not keep msgBytes around after Receive completes without
// copying.
@ -51,3 +68,4 @@ func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }

+ 12
- 12
p2p/pex/pex_reactor.go View File

@ -204,6 +204,13 @@ func (r *PEXReactor) AddPeer(p Peer) {
}
}
// RemovePeer implements Reactor by resetting peer's requests info.
func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) {
id := string(p.ID())
r.requestsSent.Delete(id)
r.lastReceivedRequests.Delete(id)
}
func (r *PEXReactor) logErrAddrBook(err error) {
if err != nil {
switch err.(type) {
@ -216,13 +223,6 @@ func (r *PEXReactor) logErrAddrBook(err error) {
}
}
// RemovePeer implements Reactor.
func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) {
id := string(p.ID())
r.requestsSent.Delete(id)
r.lastReceivedRequests.Delete(id)
}
// Receive implements Reactor by handling incoming PEX messages.
func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
@ -303,7 +303,7 @@ func (r *PEXReactor) receiveRequest(src Peer) error {
now := time.Now()
minInterval := r.minReceiveRequestInterval()
if now.Sub(lastReceived) < minInterval {
return fmt.Errorf("Peer (%v) sent next PEX request too soon. lastReceived: %v, now: %v, minInterval: %v. Disconnecting",
return fmt.Errorf("peer (%v) sent next PEX request too soon. lastReceived: %v, now: %v, minInterval: %v. Disconnecting",
src.ID(),
lastReceived,
now,
@ -314,14 +314,14 @@ func (r *PEXReactor) receiveRequest(src Peer) error {
return nil
}
// RequestAddrs asks peer for more addresses if we do not already
// have a request out for this peer.
// RequestAddrs asks peer for more addresses if we do not already have a
// request out for this peer.
func (r *PEXReactor) RequestAddrs(p Peer) {
r.Logger.Debug("Request addrs", "from", p)
id := string(p.ID())
if r.requestsSent.Has(id) {
return
}
r.Logger.Debug("Request addrs", "from", p)
r.requestsSent.Set(id, struct{}{})
p.Send(PexChannel, cdc.MustMarshalBinaryBare(&pexRequestMessage{}))
}
@ -332,7 +332,7 @@ func (r *PEXReactor) RequestAddrs(p Peer) {
func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
id := string(src.ID())
if !r.requestsSent.Has(id) {
return errors.New("Unsolicited pexAddrsMessage")
return errors.New("unsolicited pexAddrsMessage")
}
r.requestsSent.Delete(id)


+ 2
- 2
p2p/pex/pex_reactor_test.go View File

@ -144,7 +144,7 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
sw.SetAddrBook(book)
peer := mock.NewPeer(nil)
p2p.AddPeerToSwitch(sw, peer)
p2p.AddPeerToSwitchPeerSet(sw, peer)
assert.True(t, sw.Peers().Has(peer.ID()))
id := string(peer.ID())
@ -174,7 +174,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
sw.SetAddrBook(book)
peer := mock.NewPeer(nil)
p2p.AddPeerToSwitch(sw, peer)
p2p.AddPeerToSwitchPeerSet(sw, peer)
assert.True(t, sw.Peers().Has(peer.ID()))
id := string(peer.ID())


+ 14
- 3
p2p/switch.go View File

@ -324,14 +324,20 @@ func (sw *Switch) StopPeerGracefully(peer Peer) {
}
func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
if sw.peers.Remove(peer) {
sw.metrics.Peers.Add(float64(-1))
}
sw.transport.Cleanup(peer)
peer.Stop()
for _, reactor := range sw.reactors {
reactor.RemovePeer(peer, reason)
}
// Removing a peer should go last to avoid a situation where a peer
// reconnect to our node and the switch calls InitPeer before
// RemovePeer is finished.
// https://github.com/tendermint/tendermint/issues/3338
if sw.peers.Remove(peer) {
sw.metrics.Peers.Add(float64(-1))
}
}
// reconnectToPeer tries to reconnect to the addr, first repeatedly
@ -739,6 +745,11 @@ func (sw *Switch) addPeer(p Peer) error {
return nil
}
// Add some data to the peer, which is required by reactors.
for _, reactor := range sw.reactors {
p = reactor.InitPeer(p)
}
// Start the peer's send/recv routines.
// Must start it before adding it to the peer set
// to prevent Start and Stop from being called concurrently.


+ 66
- 0
p2p/switch_test.go View File

@ -12,6 +12,7 @@ import (
"regexp"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
@ -603,6 +604,71 @@ func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
})
}
// mockReactor checks that InitPeer never called before RemovePeer. If that's
// not true, InitCalledBeforeRemoveFinished will return true.
type mockReactor struct {
*BaseReactor
// atomic
removePeerInProgress uint32
initCalledBeforeRemoveFinished uint32
}
func (r *mockReactor) RemovePeer(peer Peer, reason interface{}) {
atomic.StoreUint32(&r.removePeerInProgress, 1)
defer atomic.StoreUint32(&r.removePeerInProgress, 0)
time.Sleep(100 * time.Millisecond)
}
func (r *mockReactor) InitPeer(peer Peer) Peer {
if atomic.LoadUint32(&r.removePeerInProgress) == 1 {
atomic.StoreUint32(&r.initCalledBeforeRemoveFinished, 1)
}
return peer
}
func (r *mockReactor) InitCalledBeforeRemoveFinished() bool {
return atomic.LoadUint32(&r.initCalledBeforeRemoveFinished) == 1
}
// see stopAndRemovePeer
func TestSwitchInitPeerIsNotCalledBeforeRemovePeer(t *testing.T) {
// make reactor
reactor := &mockReactor{}
reactor.BaseReactor = NewBaseReactor("mockReactor", reactor)
// make switch
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", func(i int, sw *Switch) *Switch {
sw.AddReactor("mock", reactor)
return sw
})
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
// add peer
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
_, err = rp.Dial(sw.NetAddress())
require.NoError(t, err)
// wait till the switch adds rp to the peer set
time.Sleep(50 * time.Millisecond)
// stop peer asynchronously
go sw.StopPeerForError(sw.Peers().Get(rp.ID()), "test")
// simulate peer reconnecting to us
_, err = rp.Dial(sw.NetAddress())
require.NoError(t, err)
// wait till the switch adds rp to the peer set
time.Sleep(50 * time.Millisecond)
// make sure reactor.RemovePeer is finished before InitPeer is called
assert.False(t, reactor.InitCalledBeforeRemoveFinished())
}
func BenchmarkSwitchBroadcast(b *testing.B) {
s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
// Make bar reactors of bar channels each


+ 1
- 1
p2p/test_util.go View File

@ -27,7 +27,7 @@ func (ni mockNodeInfo) NetAddress() (*NetAddress, error) { return ni.addr, ni
func (ni mockNodeInfo) Validate() error { return nil }
func (ni mockNodeInfo) CompatibleWith(other NodeInfo) error { return nil }
func AddPeerToSwitch(sw *Switch, peer Peer) {
func AddPeerToSwitchPeerSet(sw *Switch, peer Peer) {
sw.peers.Add(peer)
}


Loading…
Cancel
Save