From 6dbe9febce198df4468a4dcd7915208db5fab5e4 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 17 Jan 2017 20:58:27 +0400 Subject: [PATCH] log warning if peer send failed (Refs #174) make lint happy remove dead code remove not needed go-common dependency check peer.Send failures (Refs #174) --- blockchain/reactor.go | 53 ++++++++++++++++++++----------------- consensus/byzantine_test.go | 2 +- consensus/reactor.go | 45 ++++++++++++++++++++----------- mempool/reactor.go | 5 ++-- 4 files changed, 61 insertions(+), 44 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 4a7f21d00..fda49db48 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -3,11 +3,10 @@ package blockchain import ( "bytes" "errors" - "fmt" "reflect" "time" - . "github.com/tendermint/go-common" + cmn "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" @@ -17,7 +16,9 @@ import ( ) const ( - BlockchainChannel = byte(0x40) + // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) + BlockchainChannel = byte(0x40) + defaultChannelCapacity = 100 defaultSleepIntervalMS = 500 trySyncIntervalMS = 100 @@ -55,12 +56,13 @@ type BlockchainReactor struct { evsw types.EventSwitch } +// NewBlockchainReactor returns new reactor instance. func NewBlockchainReactor(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor { if state.LastBlockHeight == store.Height()-1 { - store.height -= 1 // XXX HACK, make this better + store.height-- // XXX HACK, make this better } if state.LastBlockHeight != store.Height() { - PanicSanity(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) + cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) } requestsCh := make(chan BlockRequest, defaultChannelCapacity) timeoutsCh := make(chan string, defaultChannelCapacity) @@ -83,6 +85,7 @@ func NewBlockchainReactor(config cfg.Config, state *sm.State, proxyAppConn proxy return bcR } +// OnStart implements BaseService func (bcR *BlockchainReactor) OnStart() error { bcR.BaseReactor.OnStart() if bcR.fastSync { @@ -95,12 +98,13 @@ func (bcR *BlockchainReactor) OnStart() error { return nil } +// OnStop implements BaseService func (bcR *BlockchainReactor) OnStop() { bcR.BaseReactor.OnStop() bcR.pool.Stop() } -// Implements Reactor +// GetChannels implements Reactor func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ @@ -111,19 +115,20 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { } } -// Implements Reactor +// AddPeer implements Reactor by sending our state to peer. func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { - // Send peer our state. - peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) + if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) { + log.Warn("Failed to send our state to peer", "peer", peer) + // doing nothing, will try later in `poolRoutine` + } } -// Implements Reactor +// RemovePeer implements Reactor by removing peer from the pool. func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { - // Remove peer from the pool. bcR.pool.RemovePeer(peer.Key) } -// Implements Reactor +// Receive implements Reactor by handling 4 types of messages (look below). func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { @@ -159,7 +164,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) // Got a peer status. Unverified. bcR.pool.SetPeerHeight(src.Key, msg.Height) default: - log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + log.Warn(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } } @@ -245,7 +250,7 @@ FOR_LOOP: err := bcR.state.ApplyBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader, types.MockMempool{}) if err != nil { // TODO This is bad, are we zombie? - PanicQ(Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) + cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } bcR.state.Save() } @@ -257,17 +262,13 @@ FOR_LOOP: } } -func (bcR *BlockchainReactor) BroadcastStatusResponse() error { - bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) - return nil -} - +// BroadcastStatusRequest broadcasts `BlockStore` height. func (bcR *BlockchainReactor) BroadcastStatusRequest() error { bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}}) return nil } -// implements events.Eventable +// SetEventSwitch implements events.Eventable func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) { bcR.evsw = evsw } @@ -282,6 +283,7 @@ const ( msgTypeStatusRequest = byte(0x21) ) +// BlockchainMessage is a generic message for this reactor. type BlockchainMessage interface{} var _ = wire.RegisterInterface( @@ -292,6 +294,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest}, ) +// DecodeMessage decodes BlockchainMessage. // TODO: ensure that bz is completely read. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { msgType = bz[0] @@ -299,7 +302,7 @@ func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { r := bytes.NewReader(bz) msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage if err != nil && n != len(bz) { - err = errors.New("DecodeMessage() had bytes left over.") + err = errors.New("DecodeMessage() had bytes left over") } return } @@ -311,7 +314,7 @@ type bcBlockRequestMessage struct { } func (m *bcBlockRequestMessage) String() string { - return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height) + return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height) } //------------------------------------- @@ -322,7 +325,7 @@ type bcBlockResponseMessage struct { } func (m *bcBlockResponseMessage) String() string { - return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height) + return cmn.Fmt("[bcBlockResponseMessage %v]", m.Block.Height) } //------------------------------------- @@ -332,7 +335,7 @@ type bcStatusRequestMessage struct { } func (m *bcStatusRequestMessage) String() string { - return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height) + return cmn.Fmt("[bcStatusRequestMessage %v]", m.Height) } //------------------------------------- @@ -342,5 +345,5 @@ type bcStatusResponseMessage struct { } func (m *bcStatusResponseMessage) String() string { - return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height) + return cmn.Fmt("[bcStatusResponseMessage %v]", m.Height) } diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 396c8c074..cd62f3f08 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -242,7 +242,7 @@ func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) { // Send our state to peer. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). if !br.reactor.fastSync { - br.reactor.sendNewRoundStepMessage(peer) + br.reactor.sendNewRoundStepMessages(peer) } } func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { diff --git a/consensus/reactor.go b/consensus/reactor.go index 1e700a865..c79dcc811 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -127,7 +127,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { // Send our state to peer. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). if !conR.fastSync { - conR.sendNewRoundStepMessage(peer) + conR.sendNewRoundStepMessages(peer) } } @@ -201,7 +201,6 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) BlockID: msg.BlockID, Votes: ourVotes, }}) - default: log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) } @@ -365,14 +364,20 @@ func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg * return } -func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) { +func (conR *ConsensusReactor) sendNewRoundStepMessages(peer *p2p.Peer) { + log := log.New("peer", peer) + rs := conR.conS.GetRoundState() nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { - peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg}) + if !peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg}) { + log.Warn("Failed to send NewRoundStepMessage to peer") + } } if csMsg != nil { - peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg}) + if !peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg}) { + log.Warn("Failed to send RoundStepCommit to peer") + } } } @@ -399,8 +404,11 @@ OUTER_LOOP: Round: rs.Round, // This tells peer that this part applies to us. Part: part, } - peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) - ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) + if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { + ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) + } else { + log.Warn("Failed to send BlockPartMessage to peer") + } continue OUTER_LOOP } } @@ -435,8 +443,11 @@ OUTER_LOOP: Round: prs.Round, // Not our height, so it doesn't matter. Part: part, } - peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) - ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) + if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { + ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) + } else { + log.Warn("Failed to send BlockPartMessage to peer") + } continue OUTER_LOOP } else { //log.Info("No parts to send in catch-up, sleeping") @@ -462,8 +473,11 @@ OUTER_LOOP: // Proposal: share the proposal metadata with peer. { msg := &ProposalMessage{Proposal: rs.Proposal} - peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) - ps.SetHasProposal(rs.Proposal) + if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { + ps.SetHasProposal(rs.Proposal) + } else { + log.Warn("Failed to send ProposalMessage to peer") + } } // ProposalPOL: lets peer know which POL votes we have so far. // Peer must receive ProposalMessage first. @@ -475,7 +489,9 @@ OUTER_LOOP: ProposalPOLRound: rs.Proposal.POLRound, ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(), } - peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) + if !peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { + log.Warn("Failed to send ProposalPOLMessage to peer") + } } continue OUTER_LOOP } @@ -806,13 +822,12 @@ func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) { ps.ProposalBlockParts.SetIndex(index, true) } -// Convenience function to send vote to peer. +// PickVoteToSend sends vote to peer. // Returns true if vote was sent. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) { if vote, ok := ps.PickVoteToSend(votes); ok { msg := &VoteMessage{vote} - ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg}) - return true + return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg}) } return false } diff --git a/mempool/reactor.go b/mempool/reactor.go index 0c5cc9f85..4531edee0 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -6,13 +6,12 @@ import ( "reflect" "time" + abci "github.com/tendermint/abci/types" "github.com/tendermint/go-clist" - . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" - abci "github.com/tendermint/abci/types" ) const ( @@ -80,7 +79,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { } // broadcasting happens from go routines per peer default: - log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + log.Warn(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } }