Browse Source

blockchain/v2: send status request when new peer joins (#5774)

Closes #5766

* memoize the scSchedulerFail error to avoid printing it every scheduleFreq
* blockchain/v2: modify switchIO funcs to accept peer instead of peerID
pull/5793/head
Anton Kaliaev 4 years ago
committed by GitHub
parent
commit
5aa859c370
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 39 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +22
    -26
      blockchain/v2/io.go
  3. +24
    -8
      blockchain/v2/reactor.go
  4. +15
    -5
      blockchain/v2/reactor_test.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -41,6 +41,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [blockchain/v1] \#5728 Remove in favor of v2 (@melekes)
- [blockchain/v0] \#5741 Relax termination conditions and increase sync timeout (@melekes)
- [cli] \#5772 `gen_node_key` output now contains node ID (`id` field) (@melekes)
- [blockchain/v2] \#5774 Send status request when new peer joins (@melekes)
### BUG FIXES


+ 22
- 26
blockchain/v2/io.go View File

@ -12,15 +12,15 @@ import (
var (
errPeerQueueFull = errors.New("peer queue full")
errNoPeer = errors.New("peer not found")
)
type iIO interface {
sendBlockRequest(peerID p2p.ID, height int64) error
sendBlockToPeer(block *types.Block, peerID p2p.ID) error
sendBlockNotFound(height int64, peerID p2p.ID) error
sendStatusResponse(base, height int64, peerID p2p.ID) error
sendBlockRequest(peer p2p.Peer, height int64) error
sendBlockToPeer(block *types.Block, peer p2p.Peer) error
sendBlockNotFound(height int64, peer p2p.Peer) error
sendStatusResponse(base, height int64, peer p2p.Peer) error
sendStatusRequest(peer p2p.Peer) error
broadcastStatusRequest() error
trySwitchToConsensus(state state.State, skipWAL bool) bool
@ -47,11 +47,7 @@ type consensusReactor interface {
SwitchToConsensus(state state.State, skipWAL bool)
}
func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return errNoPeer
}
func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error {
msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height})
if err != nil {
return err
@ -64,12 +60,7 @@ func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error {
return nil
}
func (sio *switchIO) sendStatusResponse(base int64, height int64, peerID p2p.ID) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return errNoPeer
}
func (sio *switchIO) sendStatusResponse(base int64, height int64, peer p2p.Peer) error {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base})
if err != nil {
return err
@ -82,11 +73,7 @@ func (sio *switchIO) sendStatusResponse(base int64, height int64, peerID p2p.ID)
return nil
}
func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return errNoPeer
}
func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error {
if block == nil {
panic("trying to send nil block")
}
@ -107,11 +94,7 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
return nil
}
func (sio *switchIO) sendBlockNotFound(height int64, peerID p2p.ID) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return errNoPeer
}
func (sio *switchIO) sendBlockNotFound(height int64, peer p2p.Peer) error {
msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: height})
if err != nil {
return err
@ -132,6 +115,19 @@ func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) bool
return ok
}
func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{})
if err != nil {
return err
}
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return errPeerQueueFull
}
return nil
}
func (sio *switchIO) broadcastStatusRequest() error {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{})
if err != nil {


+ 24
- 8
blockchain/v2/reactor.go View File

@ -311,6 +311,9 @@ func (r *BlockchainReactor) demux(events <-chan Event) {
defer doStatusTk.Stop()
doStatusCh <- struct{}{} // immediately broadcast to get status of existing peers
// Memoize the scSchedulerFail error to avoid printing it every scheduleFreq.
var scSchedulerFailErr error
// XXX: Extract timers to make testing atemporal
for {
select {
@ -375,14 +378,22 @@ func (r *BlockchainReactor) demux(events <-chan Event) {
r.logger.Error("Error reporting peer", "err", err)
}
case scBlockRequest:
if err := r.io.sendBlockRequest(event.peerID, event.height); err != nil {
peer := r.Switch.Peers().Get(event.peerID)
if peer == nil {
r.logger.Error("Wanted to send block request, but no such peer", "peerID", event.peerID)
continue
}
if err := r.io.sendBlockRequest(peer, event.height); err != nil {
r.logger.Error("Error sending block request", "err", err)
}
case scFinishedEv:
r.processor.send(event)
r.scheduler.stop()
case scSchedulerFail:
r.logger.Error("Scheduler failure", "err", event.reason.Error())
if scSchedulerFailErr != event.reason {
r.logger.Error("Scheduler failure", "err", event.reason.Error())
scSchedulerFailErr = event.reason
}
case scPeersPruned:
// Remove peers from the processor.
for _, peerID := range event.peers {
@ -472,20 +483,19 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *bcproto.StatusRequest:
if err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), src.ID()); err != nil {
if err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), src); err != nil {
logger.Error("Could not send status message to src peer")
}
case *bcproto.BlockRequest:
block := r.store.LoadBlock(msg.Height)
if block != nil {
if err = r.io.sendBlockToPeer(block, src.ID()); err != nil {
if err = r.io.sendBlockToPeer(block, src); err != nil {
logger.Error("Could not send block message to src peer", "err", err)
}
} else {
logger.Info("peer asking for a block we don't have", "height", msg.Height)
peerID := src.ID()
if err = r.io.sendBlockNotFound(msg.Height, peerID); err != nil {
if err = r.io.sendBlockNotFound(msg.Height, src); err != nil {
logger.Error("Couldn't send block not found msg", "err", err)
}
}
@ -526,10 +536,16 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
// AddPeer implements Reactor interface
func (r *BlockchainReactor) AddPeer(peer p2p.Peer) {
err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), peer.ID())
err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), peer)
if err != nil {
r.logger.Error("Could not send status message to peer new", "src", peer.ID, "height", r.SyncHeight())
r.logger.Error("could not send our status to the new peer", "peer", peer.ID, "err", err)
}
err = r.io.sendStatusRequest(peer)
if err != nil {
r.logger.Error("could not send status request to the new peer", "peer", peer.ID, "err", err)
}
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.events != nil {


+ 15
- 5
blockchain/v2/reactor_test.go View File

@ -93,34 +93,37 @@ type mockSwitchIo struct {
numStatusResponse int
numBlockResponse int
numNoBlockResponse int
numStatusRequest int
}
func (sio *mockSwitchIo) sendBlockRequest(peerID p2p.ID, height int64) error {
var _ iIO = (*mockSwitchIo)(nil)
func (sio *mockSwitchIo) sendBlockRequest(_ p2p.Peer, _ int64) error {
return nil
}
func (sio *mockSwitchIo) sendStatusResponse(base, height int64, peerID p2p.ID) error {
func (sio *mockSwitchIo) sendStatusResponse(_, _ int64, _ p2p.Peer) error {
sio.mtx.Lock()
defer sio.mtx.Unlock()
sio.numStatusResponse++
return nil
}
func (sio *mockSwitchIo) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
func (sio *mockSwitchIo) sendBlockToPeer(_ *types.Block, _ p2p.Peer) error {
sio.mtx.Lock()
defer sio.mtx.Unlock()
sio.numBlockResponse++
return nil
}
func (sio *mockSwitchIo) sendBlockNotFound(height int64, peerID p2p.ID) error {
func (sio *mockSwitchIo) sendBlockNotFound(_ int64, _ p2p.Peer) error {
sio.mtx.Lock()
defer sio.mtx.Unlock()
sio.numNoBlockResponse++
return nil
}
func (sio *mockSwitchIo) trySwitchToConsensus(state sm.State, skipWAL bool) bool {
func (sio *mockSwitchIo) trySwitchToConsensus(_ sm.State, _ bool) bool {
sio.mtx.Lock()
defer sio.mtx.Unlock()
sio.switchedToConsensus = true
@ -131,6 +134,13 @@ func (sio *mockSwitchIo) broadcastStatusRequest() error {
return nil
}
func (sio *mockSwitchIo) sendStatusRequest(_ p2p.Peer) error {
sio.mtx.Lock()
defer sio.mtx.Unlock()
sio.numStatusRequest++
return nil
}
type testReactorParams struct {
logger log.Logger
genDoc *types.GenesisDoc


Loading…
Cancel
Save