Browse Source

blockchain/v0: relax termination conditions and increase sync timeout (#5741)

Closes: #5718
pull/5765/head
Anton Kaliaev 4 years ago
committed by GitHub
parent
commit
89e908e340
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 168 additions and 129 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +17
    -0
      blockchain/doc.go
  3. +36
    -45
      blockchain/v0/pool.go
  4. +90
    -61
      blockchain/v0/reactor.go
  5. +14
    -9
      blockchain/v2/io.go
  6. +10
    -14
      blockchain/v2/reactor.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -37,6 +37,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [mempool] \#5673 Cancel `CheckTx` requests if RPC client disconnects or times out (@melekes)
- [abci] \#5706 Added `AbciVersion` to `RequestInfo` allowing applications to check ABCI version when connecting to Tendermint. (@marbar3778)
- [blockchain/v1] \#5728 Remove in favor of v2 (@melekes)
- [blockchain/v0] \#5741 Relax termination conditions and increase sync timeout (@melekes)
### BUG FIXES


+ 17
- 0
blockchain/doc.go View File

@ -0,0 +1,17 @@
/*
Package blockchain provides two implementations of the fast-sync protocol.
- v0 was the very first implementation. it's battle tested, but does not have a
lot of test coverage.
- v2 is the newest implementation, with a focus on testability and readability.
Check out ADR-40 for the formal model and requirements.
# Termination criteria
1. the maximum peer height is reached
2. termination timeout is triggered, which is set if the peer set is empty or
there are no pending requests.
*/
package blockchain

+ 36
- 45
blockchain/v0/pool.go View File

@ -58,12 +58,19 @@ var peerTimeout = 15 * time.Second // not const so we can override with tests
are not at peer limits, we can probably switch to consensus reactor
*/
// BlockRequest stores a block request identified by the block Height and the
// PeerID responsible for delivering the block.
type BlockRequest struct {
Height int64
PeerID p2p.ID
}
// BlockPool keeps track of the fast sync peers, block requests and block responses.
type BlockPool struct {
service.BaseService
startTime time.Time
lastAdvance time.Time
mtx tmsync.Mutex
mtx tmsync.RWMutex
// block requests
requesters map[int64]*bpRequester
height int64 // the lowest key in requesters.
@ -98,8 +105,8 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
// OnStart implements service.Service by spawning requesters routine and recording
// pool's start time.
func (pool *BlockPool) OnStart() error {
pool.lastAdvance = time.Now()
go pool.makeRequestersRoutine()
pool.startTime = time.Now()
return nil
}
@ -134,6 +141,7 @@ func (pool *BlockPool) removeTimedoutPeers() {
defer pool.mtx.Unlock()
for _, peer := range pool.peers {
// check if peer timed out
if !peer.didTimeout && peer.numPending > 0 {
curRate := peer.recvMonitor.Status().CurRate
// curRate can be 0 on start
@ -147,6 +155,7 @@ func (pool *BlockPool) removeTimedoutPeers() {
peer.didTimeout = true
}
}
if peer.didTimeout {
pool.removePeer(peer.id)
}
@ -156,33 +165,24 @@ func (pool *BlockPool) removeTimedoutPeers() {
// GetStatus returns pool's height, numPending requests and the number of
// requesters.
func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
pool.mtx.RLock()
defer pool.mtx.RUnlock()
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
}
// IsCaughtUp returns true if this node is caught up, false - otherwise.
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.Lock()
defer pool.mtx.Unlock()
pool.mtx.RLock()
defer pool.mtx.RUnlock()
// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
pool.Logger.Debug("Blockpool has no peers")
return false
}
// Some conditions to determine if we're caught up.
// Ensures we've either received a block or waited some amount of time,
// and that we're synced to the highest known height.
// Note we use maxPeerHeight - 1 because to sync block H requires block H+1
// NOTE: we use maxPeerHeight - 1 because to sync block H requires block H+1
// to verify the LastCommit.
receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1)
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
return isCaughtUp
return pool.height >= (pool.maxPeerHeight - 1)
}
// PeekTwoBlocks returns blocks at pool.height and pool.height+1.
@ -190,8 +190,8 @@ func (pool *BlockPool) IsCaughtUp() bool {
// So we peek two blocks at a time.
// The caller will verify the commit.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
pool.mtx.RLock()
defer pool.mtx.RUnlock()
if r := pool.requesters[pool.height]; r != nil {
first = r.getBlock()
@ -209,16 +209,12 @@ func (pool *BlockPool) PopRequest() {
defer pool.mtx.Unlock()
if r := pool.requesters[pool.height]; r != nil {
/* The block can disappear at any time, due to removePeer().
if r := pool.requesters[pool.height]; r == nil || r.block == nil {
PanicSanity("PopRequest() requires a valid block")
}
*/
if err := r.Stop(); err != nil {
pool.Logger.Error("Error stopping requester", "err", err)
}
delete(pool.requesters, pool.height)
pool.height++
pool.lastAdvance = time.Now()
} else {
panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height))
}
@ -248,14 +244,8 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
requester := pool.requesters[block.Height]
if requester == nil {
pool.Logger.Info(
"peer sent us a block we didn't expect",
"peer",
peerID,
"curHeight",
pool.height,
"blockHeight",
block.Height)
pool.Logger.Error("peer sent us a block we didn't expect",
"peer", peerID, "curHeight", pool.height, "blockHeight", block.Height)
diff := pool.height - block.Height
if diff < 0 {
diff *= -1
@ -273,18 +263,27 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
peer.decrPending(blockSize)
}
} else {
pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height)
pool.sendError(errors.New("invalid peer"), peerID)
err := errors.New("requester is different or block already exists")
pool.Logger.Error(err.Error(), "peer", peerID, "requester", requester.getPeerID(), "blockHeight", block.Height)
pool.sendError(err, peerID)
}
}
// MaxPeerHeight returns the highest reported height.
func (pool *BlockPool) MaxPeerHeight() int64 {
pool.mtx.Lock()
defer pool.mtx.Unlock()
pool.mtx.RLock()
defer pool.mtx.RUnlock()
return pool.maxPeerHeight
}
// LastAdvance returns the time when the last block was processed (or start
// time if no blocks were processed).
func (pool *BlockPool) LastAdvance() time.Time {
pool.mtx.RLock()
defer pool.mtx.RUnlock()
return pool.lastAdvance
}
// SetPeerRange sets the peer's alleged blockchain base and height.
func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) {
pool.mtx.Lock()
@ -601,7 +600,6 @@ OUTER_LOOP:
}
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer == nil {
// log.Info("No peers available", "height", height)
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_PEER_LOOP
}
@ -638,10 +636,3 @@ OUTER_LOOP:
}
}
}
// BlockRequest stores a block request identified by the block Height and the PeerID responsible for
// delivering the block
type BlockRequest struct {
Height int64
PeerID p2p.ID
}

+ 90
- 61
blockchain/v0/reactor.go View File

@ -28,6 +28,9 @@ const (
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1
// switch to consensus after this duration of inactivity
syncTimeout = 60 * time.Second
)
type consensusReactor interface {
@ -158,7 +161,7 @@ func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
return
}
peer.Send(BlockchainChannel, msgBytes)
_ = peer.Send(BlockchainChannel, msgBytes)
// it's OK if send fails. will try later in poolRoutine
// peer is added to the pool once we receive the first
@ -207,20 +210,22 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest,
// XXX: do not call any methods that can block or incur heavy processing.
// https://github.com/tendermint/tendermint/issues/2888
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
logger := bcR.Logger.With("src", src, "chId", chID)
msg, err := bc.DecodeMsg(msgBytes)
if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
logger.Error("Error decoding message", "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
if err = bc.ValidateMsg(msg); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
logger.Error("Peer sent us invalid msg", "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
logger.Debug("Receive", "msg", msg)
switch msg := msg.(type) {
case *bcproto.BlockRequest:
@ -228,7 +233,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("Block content is invalid", "err", err)
logger.Error("Block content is invalid", "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
@ -240,7 +245,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
Base: bcR.store.Base(),
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobut", "err", err)
logger.Error("could not convert msg to protobut", "err", err)
return
}
src.TrySend(BlockchainChannel, msgBytes)
@ -248,45 +253,50 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
logger.Debug("Peer does not have requested block", "height", msg.Height)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) {
var (
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
statusUpdateTicker = time.NewTicker(statusUpdateIntervalSeconds * time.Second)
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
blocksSynced := uint64(0)
blocksSynced = uint64(0)
chainID := bcR.initialState.ChainID
state := bcR.initialState
chainID = bcR.initialState.ChainID
state = bcR.initialState
lastHundred := time.Now()
lastRate := 0.0
lastHundred = time.Now()
lastRate = 0.0
didProcessCh := make(chan struct{}, 1)
didProcessCh = make(chan struct{}, 1)
)
go func() {
for {
select {
case <-bcR.Quit():
return
case <-bcR.pool.Quit():
return
case request := <-bcR.requestsCh:
peer := bcR.Switch.Peers().Get(request.PeerID)
if peer == nil {
bcR.Logger.Debug("Can't send request: no peer", "peer_id", request.PeerID)
continue
}
msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
bcR.Logger.Error("could not convert BlockRequest to proto", "err", err)
continue
}
@ -294,6 +304,7 @@ func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) {
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}
case err := <-bcR.errorsCh:
peer := bcR.Switch.Peers().Get(err.peerID)
if peer != nil {
@ -302,8 +313,7 @@ func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) {
case <-statusUpdateTicker.C:
// ask for status updates
go bcR.BroadcastStatusRequest() // nolint: errcheck
go bcR.BroadcastStatusRequest()
}
}
}()
@ -311,27 +321,41 @@ func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) {
FOR_LOOP:
for {
select {
case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound)
if bcR.pool.IsCaughtUp() {
var (
height, numPending, lenRequesters = bcR.pool.GetStatus()
outbound, inbound, _ = bcR.Switch.NumPeers()
lastAdvance = bcR.pool.LastAdvance()
)
bcR.Logger.Debug("Consensus ticker",
"numPending", numPending,
"total", lenRequesters)
switch {
case bcR.pool.IsCaughtUp():
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
}
// else {
// should only happen during testing
// }
case time.Since(lastAdvance) > syncTimeout:
bcR.Logger.Error(fmt.Sprintf("No progress since last advance: %v", lastAdvance))
default:
bcR.Logger.Info("Not caught up yet",
"height", height, "max_peer_height", bcR.pool.MaxPeerHeight(),
"num_peers", outbound+inbound,
"timeout_in", syncTimeout-time.Since(lastAdvance))
continue
}
break FOR_LOOP
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
}
break FOR_LOOP
case <-trySyncTicker.C: // chan time
select {
case didProcessCh <- struct{}{}:
@ -358,31 +382,37 @@ FOR_LOOP:
didProcessCh <- struct{}{}
}
firstParts := first.MakePartSet(types.BlockPartSizeBytes)
firstPartSetHeader := firstParts.Header()
firstID := types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader}
var (
firstParts = first.MakePartSet(types.BlockPartSizeBytes)
firstPartSetHeader = firstParts.Header()
firstID = types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader}
)
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
err := state.Validators.VerifyCommitLight(
chainID, firstID, first.Height, second.LastCommit)
err := state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit)
if err != nil {
bcR.Logger.Error("Error in validation", "err", err)
err = fmt.Errorf("invalid last commit: %w", err)
bcR.Logger.Error(err.Error(),
"last_commit", second.LastCommit, "block_id", firstID, "height", first.Height)
peerID := bcR.pool.RedoRequest(first.Height)
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
// NOTE: we've already removed the peer's request, but we
// still need to clean up the rest.
bcR.Switch.StopPeerForError(peer, fmt.Errorf("blockchainReactor validation error: %v", err))
// NOTE: we've already removed the peer's request, but we still need
// to clean up the rest.
bcR.Switch.StopPeerForError(peer, err)
}
peerID2 := bcR.pool.RedoRequest(second.Height)
peer2 := bcR.Switch.Peers().Get(peerID2)
if peer2 != nil && peer2 != peer {
// NOTE: we've already removed the peer's request, but we
// still need to clean up the rest.
bcR.Switch.StopPeerForError(peer2, fmt.Errorf("blockchainReactor validation error: %v", err))
if peerID2 != peerID {
if peer2 := bcR.Switch.Peers().Get(peerID2); peer2 != nil {
bcR.Switch.StopPeerForError(peer2, err)
}
}
continue FOR_LOOP
} else {
bcR.pool.PopRequest()
@ -390,8 +420,8 @@ FOR_LOOP:
// TODO: batch saves so we dont persist to disk every block
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
// TODO: same thing for app - but we would need a way to
// get the hash without persisting the state
// TODO: same thing for app - but we would need a way to get the hash
// without persisting the state.
var err error
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
@ -402,8 +432,8 @@ FOR_LOOP:
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
bcR.Logger.Info("Fast Sync Rate",
"height", bcR.pool.height, "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
lastHundred = time.Now()
}
}
@ -416,14 +446,13 @@ FOR_LOOP:
}
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
func (bcR *BlockchainReactor) BroadcastStatusRequest() {
bm, err := bc.EncodeMsg(&bcproto.StatusRequest{})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
return fmt.Errorf("could not convert msg to proto: %w", err)
bcR.Logger.Error("could not convert StatusRequest to proto", "err", err)
return
}
bcR.Switch.Broadcast(BlockchainChannel, bm)
return nil
// We don't care about whenever broadcast is successful or not.
_ = bcR.Switch.Broadcast(BlockchainChannel, bm)
}

+ 14
- 9
blockchain/v2/io.go View File

@ -1,7 +1,7 @@
package v2
import (
"fmt"
"errors"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/p2p"
@ -10,6 +10,11 @@ import (
"github.com/tendermint/tendermint/types"
)
var (
errPeerQueueFull = errors.New("peer queue full")
errNoPeer = errors.New("peer not found")
)
type iIO interface {
sendBlockRequest(peerID p2p.ID, height int64) error
sendBlockToPeer(block *types.Block, peerID p2p.ID) error
@ -45,7 +50,7 @@ type consensusReactor interface {
func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return fmt.Errorf("peer not found")
return errNoPeer
}
msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height})
if err != nil {
@ -54,7 +59,7 @@ func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error {
queued := peer.TrySend(BlockchainChannel, msgBytes)
if !queued {
return fmt.Errorf("send queue full")
return errPeerQueueFull
}
return nil
}
@ -62,7 +67,7 @@ func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error {
func (sio *switchIO) sendStatusResponse(base int64, height int64, peerID p2p.ID) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return fmt.Errorf("peer not found")
return errNoPeer
}
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base})
@ -71,7 +76,7 @@ func (sio *switchIO) sendStatusResponse(base int64, height int64, peerID p2p.ID)
}
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return fmt.Errorf("peer queue full")
return errPeerQueueFull
}
return nil
@ -80,7 +85,7 @@ func (sio *switchIO) sendStatusResponse(base int64, height int64, peerID p2p.ID)
func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return fmt.Errorf("peer not found")
return errNoPeer
}
if block == nil {
panic("trying to send nil block")
@ -96,7 +101,7 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
return err
}
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return fmt.Errorf("peer queue full")
return errPeerQueueFull
}
return nil
@ -105,7 +110,7 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
func (sio *switchIO) sendBlockNotFound(height int64, peerID p2p.ID) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return fmt.Errorf("peer not found")
return errNoPeer
}
msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: height})
if err != nil {
@ -113,7 +118,7 @@ func (sio *switchIO) sendBlockNotFound(height int64, peerID p2p.ID) error {
}
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return fmt.Errorf("peer queue full")
return errPeerQueueFull
}
return nil


+ 10
- 14
blockchain/v2/reactor.go View File

@ -47,11 +47,6 @@ type BlockchainReactor struct {
store blockStore
}
//nolint:unused,deadcode
type blockVerifier interface {
VerifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error
}
type blockApplier interface {
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, int64, error)
}
@ -458,39 +453,40 @@ func (r *BlockchainReactor) Stop() error {
// XXX: do not call any methods that can block or incur heavy processing.
// https://github.com/tendermint/tendermint/issues/2888
func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
logger := r.logger.With("src", src.ID(), "chID", chID)
msg, err := bc.DecodeMsg(msgBytes)
if err != nil {
r.logger.Error("error decoding message",
"src", src.ID(), "chId", chID, "msg", msg, "err", err)
logger.Error("error decoding message", "err", err)
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}
if err = bc.ValidateMsg(msg); err != nil {
r.logger.Error("peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
logger.Error("peer sent us invalid msg", "msg", msg, "err", err)
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}
r.logger.Debug("Receive", "src", src.ID(), "chID", chID, "msg", msg)
r.logger.Debug("Receive", "msg", msg)
switch msg := msg.(type) {
case *bcproto.StatusRequest:
if err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), src.ID()); err != nil {
r.logger.Error("Could not send status message to peer", "src", src)
logger.Error("Could not send status message to src peer")
}
case *bcproto.BlockRequest:
block := r.store.LoadBlock(msg.Height)
if block != nil {
if err = r.io.sendBlockToPeer(block, src.ID()); err != nil {
r.logger.Error("Could not send block message to peer: ", err)
logger.Error("Could not send block message to src peer", "err", err)
}
} else {
r.logger.Info("peer asking for a block we don't have", "src", src, "height", msg.Height)
logger.Info("peer asking for a block we don't have", "height", msg.Height)
peerID := src.ID()
if err = r.io.sendBlockNotFound(msg.Height, peerID); err != nil {
r.logger.Error("Couldn't send block not found: ", err)
logger.Error("Couldn't send block not found msg", "err", err)
}
}
@ -505,7 +501,7 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
r.mtx.RLock()
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
r.logger.Error("error transitioning block from protobuf", "err", err)
logger.Error("error transitioning block from protobuf", "err", err)
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}


Loading…
Cancel
Save