diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 8c89e0130..a25afbb19 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -22,6 +22,7 @@ * `MockMempool` moved to top-level mock package and renamed to `Mempool` - [libs/common] Removed `PanicSanity`, `PanicCrisis`, `PanicConsensus` and `PanicQ` - [node] Moved `GenesisDocProvider` and `DefaultGenesisDocProviderFunc` to state package +- [p2p] \#3346 Reactor#InitPeer method is added to Reactor interface * Blockchain Protocol @@ -50,6 +51,10 @@ - [p2p] \#3532 limit the number of attempts to connect to a peer in seed mode to 16 (as a result, the node will stop retrying after a 35 hours time window) - [consensus] \#2723, \#3451 and \#3317 Fix non-deterministic tests +- [consensus] \#3346 Create a peer state in consensus reactor before the peer + is started (@guagualvcha) +- [p2p] \#3338 Ensure RemovePeer is always called before InitPeer (upon a peer + reconnecting to our node) - [p2p] \#3362 make persistent prop independent of conn direction * `Switch#DialPeersAsync` now only takes a list of peers * `Switch#DialPeerWithAddress` now only takes an address diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index d181fb1a6..c2eb114dc 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -270,3 +270,4 @@ func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) { func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { br.reactor.Receive(chID, peer, msgBytes) } +func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer } diff --git a/consensus/reactor.go b/consensus/reactor.go index 937a12351..36e948f6d 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -155,16 +155,24 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { } } -// AddPeer implements Reactor +// InitPeer implements Reactor by creating a state for the peer. +func (conR *ConsensusReactor) InitPeer(peer p2p.Peer) p2p.Peer { + peerState := NewPeerState(peer).SetLogger(conR.Logger) + peer.Set(types.PeerStateKey, peerState) + return peer +} + +// AddPeer implements Reactor by spawning multiple gossiping goroutines for the +// peer. func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) { if !conR.IsRunning() { return } - // Create peerState for peer - peerState := NewPeerState(peer).SetLogger(conR.Logger) - peer.Set(types.PeerStateKey, peerState) - + peerState, ok := peer.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("peer %v has no state", peer)) + } // Begin routines for this peer. go conR.gossipDataRoutine(peer, peerState) go conR.gossipVotesRoutine(peer, peerState) @@ -177,7 +185,7 @@ func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) { } } -// RemovePeer implements Reactor +// RemovePeer is a noop. func (conR *ConsensusReactor) RemovePeer(peer p2p.Peer, reason interface{}) { if !conR.IsRunning() { return diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 9440b50c3..b237da6b5 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -23,6 +23,7 @@ import ( "github.com/tendermint/tendermint/libs/log" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/mock" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -243,6 +244,49 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { }, css) } +func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) { + N := 1 + css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) + defer cleanup() + reactors, _, eventBuses := startConsensusNet(t, css, N) + defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) + + var ( + reactor = reactors[0] + peer = mock.NewPeer(nil) + msg = cdc.MustMarshalBinaryBare(&HasVoteMessage{Height: 1, Round: 1, Index: 1, Type: types.PrevoteType}) + ) + + reactor.InitPeer(peer) + + // simulate switch calling Receive before AddPeer + assert.NotPanics(t, func() { + reactor.Receive(StateChannel, peer, msg) + reactor.AddPeer(peer) + }) +} + +func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) { + N := 1 + css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) + defer cleanup() + reactors, _, eventBuses := startConsensusNet(t, css, N) + defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) + + var ( + reactor = reactors[0] + peer = mock.NewPeer(nil) + msg = cdc.MustMarshalBinaryBare(&HasVoteMessage{Height: 1, Round: 1, Index: 1, Type: types.PrevoteType}) + ) + + // we should call InitPeer here + + // simulate switch calling Receive before AddPeer + assert.Panics(t, func() { + reactor.Receive(StateChannel, peer, msg) + }) +} + // Test we record stats about votes and block parts from other peers. func TestReactorRecordsVotesAndBlockParts(t *testing.T) { N := 4 diff --git a/evidence/reactor.go b/evidence/reactor.go index bbbab3e96..76ea270d9 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -60,11 +60,6 @@ func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) { go evR.broadcastEvidenceRoutine(peer) } -// RemovePeer implements Reactor. -func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) { - // nothing to do -} - // Receive implements Reactor. // It adds any received evidence to the evpool. func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index be65d2f14..3bccabd64 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -5,23 +5,40 @@ import ( "github.com/tendermint/tendermint/p2p/conn" ) +// Reactor is responsible for handling incoming messages on one or more +// Channel. Switch calls GetChannels when reactor is added to it. When a new +// peer joins our node, InitPeer and AddPeer are called. RemovePeer is called +// when the peer is stopped. Receive is called when a message is received on a +// channel associated with this reactor. +// +// Peer#Send or Peer#TrySend should be used to send the message to a peer. type Reactor interface { cmn.Service // Start, Stop // SetSwitch allows setting a switch. SetSwitch(*Switch) - // GetChannels returns the list of channel descriptors. + // GetChannels returns the list of MConnection.ChannelDescriptor. Make sure + // that each ID is unique across all the reactors added to the switch. GetChannels() []*conn.ChannelDescriptor - // AddPeer is called by the switch when a new peer is added. + // InitPeer is called by the switch before the peer is started. Use it to + // initialize data for the peer (e.g. peer state). + // + // NOTE: The switch won't call AddPeer nor RemovePeer if it fails to start + // the peer. Do not store any data associated with the peer in the reactor + // itself unless you don't want to have a state, which is never cleaned up. + InitPeer(peer Peer) Peer + + // AddPeer is called by the switch after the peer is added and successfully + // started. Use it to start goroutines communicating with the peer. AddPeer(peer Peer) // RemovePeer is called by the switch when the peer is stopped (due to error // or other reason). RemovePeer(peer Peer, reason interface{}) - // Receive is called when msgBytes is received from peer. + // Receive is called by the switch when msgBytes is received from the peer. // // NOTE reactor can not keep msgBytes around after Receive completes without // copying. @@ -51,3 +68,4 @@ func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil func (*BaseReactor) AddPeer(peer Peer) {} func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} +func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 551ed903d..eabbc4d61 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -204,6 +204,13 @@ func (r *PEXReactor) AddPeer(p Peer) { } } +// RemovePeer implements Reactor by resetting peer's requests info. +func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) { + id := string(p.ID()) + r.requestsSent.Delete(id) + r.lastReceivedRequests.Delete(id) +} + func (r *PEXReactor) logErrAddrBook(err error) { if err != nil { switch err.(type) { @@ -216,13 +223,6 @@ func (r *PEXReactor) logErrAddrBook(err error) { } } -// RemovePeer implements Reactor. -func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) { - id := string(p.ID()) - r.requestsSent.Delete(id) - r.lastReceivedRequests.Delete(id) -} - // Receive implements Reactor by handling incoming PEX messages. func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { msg, err := decodeMsg(msgBytes) @@ -303,7 +303,7 @@ func (r *PEXReactor) receiveRequest(src Peer) error { now := time.Now() minInterval := r.minReceiveRequestInterval() if now.Sub(lastReceived) < minInterval { - return fmt.Errorf("Peer (%v) sent next PEX request too soon. lastReceived: %v, now: %v, minInterval: %v. Disconnecting", + return fmt.Errorf("peer (%v) sent next PEX request too soon. lastReceived: %v, now: %v, minInterval: %v. Disconnecting", src.ID(), lastReceived, now, @@ -314,14 +314,14 @@ func (r *PEXReactor) receiveRequest(src Peer) error { return nil } -// RequestAddrs asks peer for more addresses if we do not already -// have a request out for this peer. +// RequestAddrs asks peer for more addresses if we do not already have a +// request out for this peer. func (r *PEXReactor) RequestAddrs(p Peer) { - r.Logger.Debug("Request addrs", "from", p) id := string(p.ID()) if r.requestsSent.Has(id) { return } + r.Logger.Debug("Request addrs", "from", p) r.requestsSent.Set(id, struct{}{}) p.Send(PexChannel, cdc.MustMarshalBinaryBare(&pexRequestMessage{})) } @@ -332,7 +332,7 @@ func (r *PEXReactor) RequestAddrs(p Peer) { func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error { id := string(src.ID()) if !r.requestsSent.Has(id) { - return errors.New("Unsolicited pexAddrsMessage") + return errors.New("unsolicited pexAddrsMessage") } r.requestsSent.Delete(id) diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index b16a0d919..f4b7cc265 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -144,7 +144,7 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) { sw.SetAddrBook(book) peer := mock.NewPeer(nil) - p2p.AddPeerToSwitch(sw, peer) + p2p.AddPeerToSwitchPeerSet(sw, peer) assert.True(t, sw.Peers().Has(peer.ID())) id := string(peer.ID()) @@ -174,7 +174,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { sw.SetAddrBook(book) peer := mock.NewPeer(nil) - p2p.AddPeerToSwitch(sw, peer) + p2p.AddPeerToSwitchPeerSet(sw, peer) assert.True(t, sw.Peers().Has(peer.ID())) id := string(peer.ID()) diff --git a/p2p/switch.go b/p2p/switch.go index 66e90bec5..31e0aa6e1 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -324,14 +324,20 @@ func (sw *Switch) StopPeerGracefully(peer Peer) { } func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { - if sw.peers.Remove(peer) { - sw.metrics.Peers.Add(float64(-1)) - } sw.transport.Cleanup(peer) peer.Stop() + for _, reactor := range sw.reactors { reactor.RemovePeer(peer, reason) } + + // Removing a peer should go last to avoid a situation where a peer + // reconnect to our node and the switch calls InitPeer before + // RemovePeer is finished. + // https://github.com/tendermint/tendermint/issues/3338 + if sw.peers.Remove(peer) { + sw.metrics.Peers.Add(float64(-1)) + } } // reconnectToPeer tries to reconnect to the addr, first repeatedly @@ -739,6 +745,11 @@ func (sw *Switch) addPeer(p Peer) error { return nil } + // Add some data to the peer, which is required by reactors. + for _, reactor := range sw.reactors { + p = reactor.InitPeer(p) + } + // Start the peer's send/recv routines. // Must start it before adding it to the peer set // to prevent Start and Stop from being called concurrently. diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 8485a759f..aa5ca78bf 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -12,6 +12,7 @@ import ( "regexp" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -603,6 +604,71 @@ func TestSwitchAcceptRoutineErrorCases(t *testing.T) { }) } +// mockReactor checks that InitPeer never called before RemovePeer. If that's +// not true, InitCalledBeforeRemoveFinished will return true. +type mockReactor struct { + *BaseReactor + + // atomic + removePeerInProgress uint32 + initCalledBeforeRemoveFinished uint32 +} + +func (r *mockReactor) RemovePeer(peer Peer, reason interface{}) { + atomic.StoreUint32(&r.removePeerInProgress, 1) + defer atomic.StoreUint32(&r.removePeerInProgress, 0) + time.Sleep(100 * time.Millisecond) +} + +func (r *mockReactor) InitPeer(peer Peer) Peer { + if atomic.LoadUint32(&r.removePeerInProgress) == 1 { + atomic.StoreUint32(&r.initCalledBeforeRemoveFinished, 1) + } + + return peer +} + +func (r *mockReactor) InitCalledBeforeRemoveFinished() bool { + return atomic.LoadUint32(&r.initCalledBeforeRemoveFinished) == 1 +} + +// see stopAndRemovePeer +func TestSwitchInitPeerIsNotCalledBeforeRemovePeer(t *testing.T) { + // make reactor + reactor := &mockReactor{} + reactor.BaseReactor = NewBaseReactor("mockReactor", reactor) + + // make switch + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", func(i int, sw *Switch) *Switch { + sw.AddReactor("mock", reactor) + return sw + }) + err := sw.Start() + require.NoError(t, err) + defer sw.Stop() + + // add peer + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + rp.Start() + defer rp.Stop() + _, err = rp.Dial(sw.NetAddress()) + require.NoError(t, err) + // wait till the switch adds rp to the peer set + time.Sleep(50 * time.Millisecond) + + // stop peer asynchronously + go sw.StopPeerForError(sw.Peers().Get(rp.ID()), "test") + + // simulate peer reconnecting to us + _, err = rp.Dial(sw.NetAddress()) + require.NoError(t, err) + // wait till the switch adds rp to the peer set + time.Sleep(50 * time.Millisecond) + + // make sure reactor.RemovePeer is finished before InitPeer is called + assert.False(t, reactor.InitCalledBeforeRemoveFinished()) +} + func BenchmarkSwitchBroadcast(b *testing.B) { s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch { // Make bar reactors of bar channels each diff --git a/p2p/test_util.go b/p2p/test_util.go index f8020924c..fa175aeb4 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -27,7 +27,7 @@ func (ni mockNodeInfo) NetAddress() (*NetAddress, error) { return ni.addr, ni func (ni mockNodeInfo) Validate() error { return nil } func (ni mockNodeInfo) CompatibleWith(other NodeInfo) error { return nil } -func AddPeerToSwitch(sw *Switch, peer Peer) { +func AddPeerToSwitchPeerSet(sw *Switch, peer Peer) { sw.peers.Add(peer) }