Browse Source

Merge pull request #373 from melekes/bugfix/174-peer-send-failures-are-not-checked

Check `peer.Send` failures
pull/418/head
Ethan Buchman 8 years ago
committed by GitHub
parent
commit
98a509ed97
4 changed files with 43 additions and 41 deletions
  1. +27
    -25
      blockchain/reactor.go
  2. +1
    -1
      consensus/byzantine_test.go
  3. +13
    -12
      consensus/reactor.go
  4. +2
    -3
      mempool/reactor.go

+ 27
- 25
blockchain/reactor.go View File

@ -3,11 +3,10 @@ package blockchain
import (
"bytes"
"errors"
"fmt"
"reflect"
"time"
. "github.com/tendermint/go-common"
cmn "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire"
@ -17,7 +16,9 @@ import (
)
const (
BlockchainChannel = byte(0x40)
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40)
defaultChannelCapacity = 100
defaultSleepIntervalMS = 500
trySyncIntervalMS = 100
@ -55,12 +56,13 @@ type BlockchainReactor struct {
evsw types.EventSwitch
}
// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
if state.LastBlockHeight == store.Height()-1 {
store.height -= 1 // XXX HACK, make this better
store.height-- // XXX HACK, make this better
}
if state.LastBlockHeight != store.Height() {
PanicSanity(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
}
requestsCh := make(chan BlockRequest, defaultChannelCapacity)
timeoutsCh := make(chan string, defaultChannelCapacity)
@ -83,6 +85,7 @@ func NewBlockchainReactor(config cfg.Config, state *sm.State, proxyAppConn proxy
return bcR
}
// OnStart implements BaseService
func (bcR *BlockchainReactor) OnStart() error {
bcR.BaseReactor.OnStart()
if bcR.fastSync {
@ -95,12 +98,13 @@ func (bcR *BlockchainReactor) OnStart() error {
return nil
}
// OnStop implements BaseService
func (bcR *BlockchainReactor) OnStop() {
bcR.BaseReactor.OnStop()
bcR.pool.Stop()
}
// Implements Reactor
// GetChannels implements Reactor
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{
@ -111,19 +115,19 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
}
}
// Implements Reactor
// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
// Send peer our state.
peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) {
// doing nothing, will try later in `poolRoutine`
}
}
// Implements Reactor
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// Remove peer from the pool.
bcR.pool.RemovePeer(peer.Key)
}
// Implements Reactor
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes)
if err != nil {
@ -159,7 +163,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
// Got a peer status. Unverified.
bcR.pool.SetPeerHeight(src.Key, msg.Height)
default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
log.Warn(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
}
}
@ -245,7 +249,7 @@ FOR_LOOP:
err := bcR.state.ApplyBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader, types.MockMempool{})
if err != nil {
// TODO This is bad, are we zombie?
PanicQ(Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
bcR.state.Save()
}
@ -257,17 +261,13 @@ FOR_LOOP:
}
}
func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
return nil
}
// BroadcastStatusRequest broadcasts `BlockStore` height.
func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}})
return nil
}
// implements events.Eventable
// SetEventSwitch implements events.Eventable
func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) {
bcR.evsw = evsw
}
@ -282,6 +282,7 @@ const (
msgTypeStatusRequest = byte(0x21)
)
// BlockchainMessage is a generic message for this reactor.
type BlockchainMessage interface{}
var _ = wire.RegisterInterface(
@ -292,6 +293,7 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
)
// DecodeMessage decodes BlockchainMessage.
// TODO: ensure that bz is completely read.
func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
msgType = bz[0]
@ -299,7 +301,7 @@ func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
r := bytes.NewReader(bz)
msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
if err != nil && n != len(bz) {
err = errors.New("DecodeMessage() had bytes left over.")
err = errors.New("DecodeMessage() had bytes left over")
}
return
}
@ -311,7 +313,7 @@ type bcBlockRequestMessage struct {
}
func (m *bcBlockRequestMessage) String() string {
return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height)
}
//-------------------------------------
@ -322,7 +324,7 @@ type bcBlockResponseMessage struct {
}
func (m *bcBlockResponseMessage) String() string {
return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
return cmn.Fmt("[bcBlockResponseMessage %v]", m.Block.Height)
}
//-------------------------------------
@ -332,7 +334,7 @@ type bcStatusRequestMessage struct {
}
func (m *bcStatusRequestMessage) String() string {
return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
return cmn.Fmt("[bcStatusRequestMessage %v]", m.Height)
}
//-------------------------------------
@ -342,5 +344,5 @@ type bcStatusResponseMessage struct {
}
func (m *bcStatusResponseMessage) String() string {
return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
return cmn.Fmt("[bcStatusResponseMessage %v]", m.Height)
}

+ 1
- 1
consensus/byzantine_test.go View File

@ -242,7 +242,7 @@ func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) {
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.fastSync {
br.reactor.sendNewRoundStepMessage(peer)
br.reactor.sendNewRoundStepMessages(peer)
}
}
func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {


+ 13
- 12
consensus/reactor.go View File

@ -127,7 +127,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !conR.fastSync {
conR.sendNewRoundStepMessage(peer)
conR.sendNewRoundStepMessages(peer)
}
}
@ -201,7 +201,6 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
BlockID: msg.BlockID,
Votes: ourVotes,
}})
default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
}
@ -365,7 +364,7 @@ func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *
return
}
func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
func (conR *ConsensusReactor) sendNewRoundStepMessages(peer *p2p.Peer) {
rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
@ -399,8 +398,9 @@ OUTER_LOOP:
Round: rs.Round, // This tells peer that this part applies to us.
Part: part,
}
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
}
}
@ -435,8 +435,9 @@ OUTER_LOOP:
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
} else {
//log.Info("No parts to send in catch-up, sleeping")
@ -462,8 +463,9 @@ OUTER_LOOP:
// Proposal: share the proposal metadata with peer.
{
msg := &ProposalMessage{Proposal: rs.Proposal}
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
ps.SetHasProposal(rs.Proposal)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposal(rs.Proposal)
}
}
// ProposalPOL: lets peer know which POL votes we have so far.
// Peer must receive ProposalMessage first.
@ -806,13 +808,12 @@ func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
ps.ProposalBlockParts.SetIndex(index, true)
}
// Convenience function to send vote to peer.
// PickVoteToSend sends vote to peer.
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote}
ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
return true
return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
}
return false
}


+ 2
- 3
mempool/reactor.go View File

@ -6,13 +6,12 @@ import (
"reflect"
"time"
abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-clist"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types"
abci "github.com/tendermint/abci/types"
)
const (
@ -80,7 +79,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
}
// broadcasting happens from go routines per peer
default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
log.Warn(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}


Loading…
Cancel
Save