diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 03891be43..25f99dda8 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -41,6 +41,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [blockchain/v1] \#5728 Remove in favor of v2 (@melekes) - [blockchain/v0] \#5741 Relax termination conditions and increase sync timeout (@melekes) - [cli] \#5772 `gen_node_key` output now contains node ID (`id` field) (@melekes) +- [blockchain/v2] \#5774 Send status request when new peer joins (@melekes) ### BUG FIXES diff --git a/blockchain/v2/io.go b/blockchain/v2/io.go index 85d929968..69995e4c5 100644 --- a/blockchain/v2/io.go +++ b/blockchain/v2/io.go @@ -12,15 +12,15 @@ import ( var ( errPeerQueueFull = errors.New("peer queue full") - errNoPeer = errors.New("peer not found") ) type iIO interface { - sendBlockRequest(peerID p2p.ID, height int64) error - sendBlockToPeer(block *types.Block, peerID p2p.ID) error - sendBlockNotFound(height int64, peerID p2p.ID) error - sendStatusResponse(base, height int64, peerID p2p.ID) error + sendBlockRequest(peer p2p.Peer, height int64) error + sendBlockToPeer(block *types.Block, peer p2p.Peer) error + sendBlockNotFound(height int64, peer p2p.Peer) error + sendStatusResponse(base, height int64, peer p2p.Peer) error + sendStatusRequest(peer p2p.Peer) error broadcastStatusRequest() error trySwitchToConsensus(state state.State, skipWAL bool) bool @@ -47,11 +47,7 @@ type consensusReactor interface { SwitchToConsensus(state state.State, skipWAL bool) } -func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error { - peer := sio.sw.Peers().Get(peerID) - if peer == nil { - return errNoPeer - } +func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error { msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height}) if err != nil { return err @@ -64,12 +60,7 @@ func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error { return nil } -func (sio *switchIO) sendStatusResponse(base int64, height int64, peerID p2p.ID) error { - peer := sio.sw.Peers().Get(peerID) - if peer == nil { - return errNoPeer - } - +func (sio *switchIO) sendStatusResponse(base int64, height int64, peer p2p.Peer) error { msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base}) if err != nil { return err @@ -82,11 +73,7 @@ func (sio *switchIO) sendStatusResponse(base int64, height int64, peerID p2p.ID) return nil } -func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error { - peer := sio.sw.Peers().Get(peerID) - if peer == nil { - return errNoPeer - } +func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error { if block == nil { panic("trying to send nil block") } @@ -107,11 +94,7 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error { return nil } -func (sio *switchIO) sendBlockNotFound(height int64, peerID p2p.ID) error { - peer := sio.sw.Peers().Get(peerID) - if peer == nil { - return errNoPeer - } +func (sio *switchIO) sendBlockNotFound(height int64, peer p2p.Peer) error { msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: height}) if err != nil { return err @@ -132,6 +115,19 @@ func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) bool return ok } +func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error { + msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{}) + if err != nil { + return err + } + + if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { + return errPeerQueueFull + } + + return nil +} + func (sio *switchIO) broadcastStatusRequest() error { msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{}) if err != nil { diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 26be63ea7..7cd45080f 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -311,6 +311,9 @@ func (r *BlockchainReactor) demux(events <-chan Event) { defer doStatusTk.Stop() doStatusCh <- struct{}{} // immediately broadcast to get status of existing peers + // Memoize the scSchedulerFail error to avoid printing it every scheduleFreq. + var scSchedulerFailErr error + // XXX: Extract timers to make testing atemporal for { select { @@ -375,14 +378,22 @@ func (r *BlockchainReactor) demux(events <-chan Event) { r.logger.Error("Error reporting peer", "err", err) } case scBlockRequest: - if err := r.io.sendBlockRequest(event.peerID, event.height); err != nil { + peer := r.Switch.Peers().Get(event.peerID) + if peer == nil { + r.logger.Error("Wanted to send block request, but no such peer", "peerID", event.peerID) + continue + } + if err := r.io.sendBlockRequest(peer, event.height); err != nil { r.logger.Error("Error sending block request", "err", err) } case scFinishedEv: r.processor.send(event) r.scheduler.stop() case scSchedulerFail: - r.logger.Error("Scheduler failure", "err", event.reason.Error()) + if scSchedulerFailErr != event.reason { + r.logger.Error("Scheduler failure", "err", event.reason.Error()) + scSchedulerFailErr = event.reason + } case scPeersPruned: // Remove peers from the processor. for _, peerID := range event.peers { @@ -472,20 +483,19 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { switch msg := msg.(type) { case *bcproto.StatusRequest: - if err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), src.ID()); err != nil { + if err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), src); err != nil { logger.Error("Could not send status message to src peer") } case *bcproto.BlockRequest: block := r.store.LoadBlock(msg.Height) if block != nil { - if err = r.io.sendBlockToPeer(block, src.ID()); err != nil { + if err = r.io.sendBlockToPeer(block, src); err != nil { logger.Error("Could not send block message to src peer", "err", err) } } else { logger.Info("peer asking for a block we don't have", "height", msg.Height) - peerID := src.ID() - if err = r.io.sendBlockNotFound(msg.Height, peerID); err != nil { + if err = r.io.sendBlockNotFound(msg.Height, src); err != nil { logger.Error("Couldn't send block not found msg", "err", err) } } @@ -526,10 +536,16 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { // AddPeer implements Reactor interface func (r *BlockchainReactor) AddPeer(peer p2p.Peer) { - err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), peer.ID()) + err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), peer) if err != nil { - r.logger.Error("Could not send status message to peer new", "src", peer.ID, "height", r.SyncHeight()) + r.logger.Error("could not send our status to the new peer", "peer", peer.ID, "err", err) } + + err = r.io.sendStatusRequest(peer) + if err != nil { + r.logger.Error("could not send status request to the new peer", "peer", peer.ID, "err", err) + } + r.mtx.RLock() defer r.mtx.RUnlock() if r.events != nil { diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index 35cedf178..ac3b6c9d1 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -93,34 +93,37 @@ type mockSwitchIo struct { numStatusResponse int numBlockResponse int numNoBlockResponse int + numStatusRequest int } -func (sio *mockSwitchIo) sendBlockRequest(peerID p2p.ID, height int64) error { +var _ iIO = (*mockSwitchIo)(nil) + +func (sio *mockSwitchIo) sendBlockRequest(_ p2p.Peer, _ int64) error { return nil } -func (sio *mockSwitchIo) sendStatusResponse(base, height int64, peerID p2p.ID) error { +func (sio *mockSwitchIo) sendStatusResponse(_, _ int64, _ p2p.Peer) error { sio.mtx.Lock() defer sio.mtx.Unlock() sio.numStatusResponse++ return nil } -func (sio *mockSwitchIo) sendBlockToPeer(block *types.Block, peerID p2p.ID) error { +func (sio *mockSwitchIo) sendBlockToPeer(_ *types.Block, _ p2p.Peer) error { sio.mtx.Lock() defer sio.mtx.Unlock() sio.numBlockResponse++ return nil } -func (sio *mockSwitchIo) sendBlockNotFound(height int64, peerID p2p.ID) error { +func (sio *mockSwitchIo) sendBlockNotFound(_ int64, _ p2p.Peer) error { sio.mtx.Lock() defer sio.mtx.Unlock() sio.numNoBlockResponse++ return nil } -func (sio *mockSwitchIo) trySwitchToConsensus(state sm.State, skipWAL bool) bool { +func (sio *mockSwitchIo) trySwitchToConsensus(_ sm.State, _ bool) bool { sio.mtx.Lock() defer sio.mtx.Unlock() sio.switchedToConsensus = true @@ -131,6 +134,13 @@ func (sio *mockSwitchIo) broadcastStatusRequest() error { return nil } +func (sio *mockSwitchIo) sendStatusRequest(_ p2p.Peer) error { + sio.mtx.Lock() + defer sio.mtx.Unlock() + sio.numStatusRequest++ + return nil +} + type testReactorParams struct { logger log.Logger genDoc *types.GenesisDoc