|
|
@ -139,22 +139,22 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { |
|
|
|
// Messages affect either a peer state or the consensus state.
|
|
|
|
// Peer state updates can happen in parallel, but processing of
|
|
|
|
// proposals, block parts, and votes are ordered by the receiveRoutine
|
|
|
|
func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) { |
|
|
|
func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { |
|
|
|
if !conR.IsRunning() { |
|
|
|
log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes) |
|
|
|
log.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
_, msg, err := DecodeMessage(msgBytes) |
|
|
|
if err != nil { |
|
|
|
log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes) |
|
|
|
log.Warn("Error decoding message", "src", src, "chId", chID, "msg", msg, "error", err, "bytes", msgBytes) |
|
|
|
// TODO punish peer?
|
|
|
|
return |
|
|
|
} |
|
|
|
log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg) |
|
|
|
log.Info("Receive", "src", src, "chId", chID, "msg", msg) |
|
|
|
|
|
|
|
// Get peer states
|
|
|
|
ps := peer.Data.Get(types.PeerStateKey).(*PeerState) |
|
|
|
ps := src.Data.Get(types.PeerStateKey).(*PeerState) |
|
|
|
|
|
|
|
switch chID { |
|
|
|
case StateChannel: |
|
|
@ -177,12 +177,12 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte |
|
|
|
switch msg := msg.(type) { |
|
|
|
case *ProposalMessage: |
|
|
|
ps.SetHasProposal(msg.Proposal) |
|
|
|
conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key} |
|
|
|
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key} |
|
|
|
case *ProposalPOLMessage: |
|
|
|
ps.ApplyProposalPOLMessage(msg) |
|
|
|
case *BlockPartMessage: |
|
|
|
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) |
|
|
|
conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key} |
|
|
|
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key} |
|
|
|
default: |
|
|
|
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) |
|
|
|
} |
|
|
@ -202,14 +202,14 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte |
|
|
|
ps.EnsureVoteBitArrays(height-1, lastCommitSize) |
|
|
|
ps.SetHasVote(msg.Vote, msg.ValidatorIndex) |
|
|
|
|
|
|
|
conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key} |
|
|
|
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key} |
|
|
|
|
|
|
|
default: |
|
|
|
// don't punish (leave room for soft upgrades)
|
|
|
|
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) |
|
|
|
} |
|
|
|
default: |
|
|
|
log.Warn(Fmt("Unknown channel %X", chID)) |
|
|
|
log.Warn(Fmt("Unknown chId %X", chID)) |
|
|
|
} |
|
|
|
|
|
|
|
if err != nil { |
|
|
|