You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1320 lines
39 KiB

8 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
8 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
7 years ago
10 years ago
7 years ago
10 years ago
10 years ago
7 years ago
10 years ago
7 years ago
7 years ago
8 years ago
8 years ago
10 years ago
7 years ago
10 years ago
7 years ago
8 years ago
10 years ago
10 years ago
7 years ago
7 years ago
10 years ago
10 years ago
8 years ago
7 years ago
8 years ago
8 years ago
10 years ago
10 years ago
10 years ago
7 years ago
7 years ago
7 years ago
7 years ago
8 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
8 years ago
10 years ago
8 years ago
10 years ago
10 years ago
8 years ago
9 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
8 years ago
7 years ago
8 years ago
8 years ago
7 years ago
7 years ago
10 years ago
7 years ago
7 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. wire "github.com/tendermint/go-wire"
  10. cmn "github.com/tendermint/tmlibs/common"
  11. "github.com/tendermint/tmlibs/log"
  12. cstypes "github.com/tendermint/tendermint/consensus/types"
  13. "github.com/tendermint/tendermint/p2p"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. const (
  18. StateChannel = byte(0x20)
  19. DataChannel = byte(0x21)
  20. VoteChannel = byte(0x22)
  21. VoteSetBitsChannel = byte(0x23)
  22. maxConsensusMessageSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes.
  23. )
  24. //-----------------------------------------------------------------------------
  25. // ConsensusReactor defines a reactor for the consensus service.
  26. type ConsensusReactor struct {
  27. p2p.BaseReactor // BaseService + p2p.Switch
  28. conS *ConsensusState
  29. evsw types.EventSwitch
  30. mtx sync.RWMutex
  31. fastSync bool
  32. }
  33. // NewConsensusReactor returns a new ConsensusReactor with the given consensusState.
  34. func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor {
  35. conR := &ConsensusReactor{
  36. conS: consensusState,
  37. fastSync: fastSync,
  38. }
  39. conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
  40. return conR
  41. }
  42. // OnStart implements BaseService.
  43. func (conR *ConsensusReactor) OnStart() error {
  44. conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync())
  45. conR.BaseReactor.OnStart()
  46. // callbacks for broadcasting new steps and votes to peers
  47. // upon their respective events (ie. uses evsw)
  48. conR.registerEventCallbacks()
  49. if !conR.FastSync() {
  50. _, err := conR.conS.Start()
  51. if err != nil {
  52. return err
  53. }
  54. }
  55. return nil
  56. }
  57. // OnStop implements BaseService
  58. func (conR *ConsensusReactor) OnStop() {
  59. conR.BaseReactor.OnStop()
  60. conR.conS.Stop()
  61. }
  62. // SwitchToConsensus switches from fast_sync mode to consensus mode.
  63. // It resets the state, turns off fast_sync, and starts the consensus state-machine
  64. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced int) {
  65. conR.Logger.Info("SwitchToConsensus")
  66. conR.conS.reconstructLastCommit(state)
  67. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  68. // broadcast a NewRoundStepMessage.
  69. conR.conS.updateToState(state)
  70. conR.mtx.Lock()
  71. conR.fastSync = false
  72. conR.mtx.Unlock()
  73. if blocksSynced > 0 {
  74. // dont bother with the WAL if we fast synced
  75. conR.conS.doWALCatchup = false
  76. }
  77. conR.conS.Start()
  78. }
  79. // GetChannels implements Reactor
  80. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  81. // TODO optimize
  82. return []*p2p.ChannelDescriptor{
  83. &p2p.ChannelDescriptor{
  84. ID: StateChannel,
  85. Priority: 5,
  86. SendQueueCapacity: 100,
  87. },
  88. &p2p.ChannelDescriptor{
  89. ID: DataChannel, // maybe split between gossiping current block and catchup stuff
  90. Priority: 10, // once we gossip the whole block there's nothing left to send until next height or round
  91. SendQueueCapacity: 100,
  92. RecvBufferCapacity: 50 * 4096,
  93. },
  94. &p2p.ChannelDescriptor{
  95. ID: VoteChannel,
  96. Priority: 5,
  97. SendQueueCapacity: 100,
  98. RecvBufferCapacity: 100 * 100,
  99. },
  100. &p2p.ChannelDescriptor{
  101. ID: VoteSetBitsChannel,
  102. Priority: 1,
  103. SendQueueCapacity: 2,
  104. RecvBufferCapacity: 1024,
  105. },
  106. }
  107. }
  108. // AddPeer implements Reactor
  109. func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) {
  110. if !conR.IsRunning() {
  111. return
  112. }
  113. // Create peerState for peer
  114. peerState := NewPeerState(peer).SetLogger(conR.Logger)
  115. peer.Set(types.PeerStateKey, peerState)
  116. // Begin routines for this peer.
  117. go conR.gossipDataRoutine(peer, peerState)
  118. go conR.gossipVotesRoutine(peer, peerState)
  119. go conR.queryMaj23Routine(peer, peerState)
  120. // Send our state to peer.
  121. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  122. if !conR.FastSync() {
  123. conR.sendNewRoundStepMessages(peer)
  124. }
  125. }
  126. // RemovePeer implements Reactor
  127. func (conR *ConsensusReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  128. if !conR.IsRunning() {
  129. return
  130. }
  131. // TODO
  132. //peer.Get(PeerStateKey).(*PeerState).Disconnect()
  133. }
  134. // Receive implements Reactor
  135. // NOTE: We process these messages even when we're fast_syncing.
  136. // Messages affect either a peer state or the consensus state.
  137. // Peer state updates can happen in parallel, but processing of
  138. // proposals, block parts, and votes are ordered by the receiveRoutine
  139. // NOTE: blocks on consensus state for proposals, block parts, and votes
  140. func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  141. if !conR.IsRunning() {
  142. conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
  143. return
  144. }
  145. _, msg, err := DecodeMessage(msgBytes)
  146. if err != nil {
  147. conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  148. // TODO punish peer?
  149. return
  150. }
  151. conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  152. // Get peer states
  153. ps := src.Get(types.PeerStateKey).(*PeerState)
  154. switch chID {
  155. case StateChannel:
  156. switch msg := msg.(type) {
  157. case *NewRoundStepMessage:
  158. ps.ApplyNewRoundStepMessage(msg)
  159. case *CommitStepMessage:
  160. ps.ApplyCommitStepMessage(msg)
  161. case *HasVoteMessage:
  162. ps.ApplyHasVoteMessage(msg)
  163. case *VoteSetMaj23Message:
  164. cs := conR.conS
  165. cs.mtx.Lock()
  166. height, votes := cs.Height, cs.Votes
  167. cs.mtx.Unlock()
  168. if height != msg.Height {
  169. return
  170. }
  171. // Peer claims to have a maj23 for some BlockID at H,R,S,
  172. votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key(), msg.BlockID)
  173. // Respond with a VoteSetBitsMessage showing which votes we have.
  174. // (and consequently shows which we don't have)
  175. var ourVotes *cmn.BitArray
  176. switch msg.Type {
  177. case types.VoteTypePrevote:
  178. ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
  179. case types.VoteTypePrecommit:
  180. ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
  181. default:
  182. conR.Logger.Error("Bad VoteSetBitsMessage field Type")
  183. return
  184. }
  185. src.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{
  186. Height: msg.Height,
  187. Round: msg.Round,
  188. Type: msg.Type,
  189. BlockID: msg.BlockID,
  190. Votes: ourVotes,
  191. }})
  192. case *ProposalHeartbeatMessage:
  193. hb := msg.Heartbeat
  194. conR.Logger.Debug("Received proposal heartbeat message",
  195. "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence,
  196. "valIdx", hb.ValidatorIndex, "valAddr", hb.ValidatorAddress)
  197. default:
  198. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  199. }
  200. case DataChannel:
  201. if conR.FastSync() {
  202. conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
  203. return
  204. }
  205. switch msg := msg.(type) {
  206. case *ProposalMessage:
  207. ps.SetHasProposal(msg.Proposal)
  208. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()}
  209. case *ProposalPOLMessage:
  210. ps.ApplyProposalPOLMessage(msg)
  211. case *BlockPartMessage:
  212. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  213. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()}
  214. default:
  215. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  216. }
  217. case VoteChannel:
  218. if conR.FastSync() {
  219. conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
  220. return
  221. }
  222. switch msg := msg.(type) {
  223. case *VoteMessage:
  224. cs := conR.conS
  225. cs.mtx.Lock()
  226. height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
  227. cs.mtx.Unlock()
  228. ps.EnsureVoteBitArrays(height, valSize)
  229. ps.EnsureVoteBitArrays(height-1, lastCommitSize)
  230. ps.SetHasVote(msg.Vote)
  231. cs.peerMsgQueue <- msgInfo{msg, src.Key()}
  232. default:
  233. // don't punish (leave room for soft upgrades)
  234. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  235. }
  236. case VoteSetBitsChannel:
  237. if conR.FastSync() {
  238. conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
  239. return
  240. }
  241. switch msg := msg.(type) {
  242. case *VoteSetBitsMessage:
  243. cs := conR.conS
  244. cs.mtx.Lock()
  245. height, votes := cs.Height, cs.Votes
  246. cs.mtx.Unlock()
  247. if height == msg.Height {
  248. var ourVotes *cmn.BitArray
  249. switch msg.Type {
  250. case types.VoteTypePrevote:
  251. ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
  252. case types.VoteTypePrecommit:
  253. ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
  254. default:
  255. conR.Logger.Error("Bad VoteSetBitsMessage field Type")
  256. return
  257. }
  258. ps.ApplyVoteSetBitsMessage(msg, ourVotes)
  259. } else {
  260. ps.ApplyVoteSetBitsMessage(msg, nil)
  261. }
  262. default:
  263. // don't punish (leave room for soft upgrades)
  264. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  265. }
  266. default:
  267. conR.Logger.Error(cmn.Fmt("Unknown chId %X", chID))
  268. }
  269. if err != nil {
  270. conR.Logger.Error("Error in Receive()", "err", err)
  271. }
  272. }
  273. // SetEventSwitch implements events.Eventable
  274. func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) {
  275. conR.evsw = evsw
  276. conR.conS.SetEventSwitch(evsw)
  277. }
  278. // FastSync returns whether the consensus reactor is in fast-sync mode.
  279. func (conR *ConsensusReactor) FastSync() bool {
  280. conR.mtx.RLock()
  281. defer conR.mtx.RUnlock()
  282. return conR.fastSync
  283. }
  284. //--------------------------------------
  285. // Listens for new steps and votes,
  286. // broadcasting the result to peers
  287. func (conR *ConsensusReactor) registerEventCallbacks() {
  288. types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) {
  289. rs := data.Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
  290. conR.broadcastNewRoundStep(rs)
  291. })
  292. types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) {
  293. edv := data.Unwrap().(types.EventDataVote)
  294. conR.broadcastHasVoteMessage(edv.Vote)
  295. })
  296. types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposalHeartbeat(), func(data types.TMEventData) {
  297. heartbeat := data.Unwrap().(types.EventDataProposalHeartbeat)
  298. conR.broadcastProposalHeartbeatMessage(heartbeat)
  299. })
  300. }
  301. func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) {
  302. hb := heartbeat.Heartbeat
  303. conR.Logger.Debug("Broadcasting proposal heartbeat message",
  304. "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence)
  305. msg := &ProposalHeartbeatMessage{hb}
  306. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
  307. }
  308. func (conR *ConsensusReactor) broadcastNewRoundStep(rs *cstypes.RoundState) {
  309. nrsMsg, csMsg := makeRoundStepMessages(rs)
  310. if nrsMsg != nil {
  311. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  312. }
  313. if csMsg != nil {
  314. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{csMsg})
  315. }
  316. }
  317. // Broadcasts HasVoteMessage to peers that care.
  318. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) {
  319. msg := &HasVoteMessage{
  320. Height: vote.Height,
  321. Round: vote.Round,
  322. Type: vote.Type,
  323. Index: vote.ValidatorIndex,
  324. }
  325. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
  326. /*
  327. // TODO: Make this broadcast more selective.
  328. for _, peer := range conR.Switch.Peers().List() {
  329. ps := peer.Get(PeerStateKey).(*PeerState)
  330. prs := ps.GetRoundState()
  331. if prs.Height == vote.Height {
  332. // TODO: Also filter on round?
  333. peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
  334. } else {
  335. // Height doesn't match
  336. // TODO: check a field, maybe CatchupCommitRound?
  337. // TODO: But that requires changing the struct field comment.
  338. }
  339. }
  340. */
  341. }
  342. func makeRoundStepMessages(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  343. nrsMsg = &NewRoundStepMessage{
  344. Height: rs.Height,
  345. Round: rs.Round,
  346. Step: rs.Step,
  347. SecondsSinceStartTime: int(time.Since(rs.StartTime).Seconds()),
  348. LastCommitRound: rs.LastCommit.Round(),
  349. }
  350. if rs.Step == cstypes.RoundStepCommit {
  351. csMsg = &CommitStepMessage{
  352. Height: rs.Height,
  353. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  354. BlockParts: rs.ProposalBlockParts.BitArray(),
  355. }
  356. }
  357. return
  358. }
  359. func (conR *ConsensusReactor) sendNewRoundStepMessages(peer p2p.Peer) {
  360. rs := conR.conS.GetRoundState()
  361. nrsMsg, csMsg := makeRoundStepMessages(rs)
  362. if nrsMsg != nil {
  363. peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  364. }
  365. if csMsg != nil {
  366. peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg})
  367. }
  368. }
  369. func (conR *ConsensusReactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
  370. logger := conR.Logger.With("peer", peer)
  371. OUTER_LOOP:
  372. for {
  373. // Manage disconnects from self or peer.
  374. if !peer.IsRunning() || !conR.IsRunning() {
  375. logger.Info("Stopping gossipDataRoutine for peer")
  376. return
  377. }
  378. rs := conR.conS.GetRoundState()
  379. prs := ps.GetRoundState()
  380. // Send proposal Block parts?
  381. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  382. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  383. part := rs.ProposalBlockParts.GetPart(index)
  384. msg := &BlockPartMessage{
  385. Height: rs.Height, // This tells peer that this part applies to us.
  386. Round: rs.Round, // This tells peer that this part applies to us.
  387. Part: part,
  388. }
  389. logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
  390. if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
  391. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  392. }
  393. continue OUTER_LOOP
  394. }
  395. }
  396. // If the peer is on a previous height, help catch up.
  397. if (0 < prs.Height) && (prs.Height < rs.Height) {
  398. heightLogger := logger.With("height", prs.Height)
  399. conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer)
  400. continue OUTER_LOOP
  401. }
  402. // If height and round don't match, sleep.
  403. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  404. //logger.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  405. time.Sleep(conR.conS.config.PeerGossipSleep())
  406. continue OUTER_LOOP
  407. }
  408. // By here, height and round match.
  409. // Proposal block parts were already matched and sent if any were wanted.
  410. // (These can match on hash so the round doesn't matter)
  411. // Now consider sending other things, like the Proposal itself.
  412. // Send Proposal && ProposalPOL BitArray?
  413. if rs.Proposal != nil && !prs.Proposal {
  414. // Proposal: share the proposal metadata with peer.
  415. {
  416. msg := &ProposalMessage{Proposal: rs.Proposal}
  417. logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
  418. if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
  419. ps.SetHasProposal(rs.Proposal)
  420. }
  421. }
  422. // ProposalPOL: lets peer know which POL votes we have so far.
  423. // Peer must receive ProposalMessage first.
  424. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  425. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  426. if 0 <= rs.Proposal.POLRound {
  427. msg := &ProposalPOLMessage{
  428. Height: rs.Height,
  429. ProposalPOLRound: rs.Proposal.POLRound,
  430. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  431. }
  432. logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
  433. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  434. }
  435. continue OUTER_LOOP
  436. }
  437. // Nothing to do. Sleep.
  438. time.Sleep(conR.conS.config.PeerGossipSleep())
  439. continue OUTER_LOOP
  440. }
  441. }
  442. func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundState,
  443. prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer) {
  444. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  445. // Ensure that the peer's PartSetHeader is correct
  446. blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
  447. if blockMeta == nil {
  448. logger.Error("Failed to load block meta",
  449. "ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height())
  450. time.Sleep(conR.conS.config.PeerGossipSleep())
  451. return
  452. } else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  453. logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  454. "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  455. time.Sleep(conR.conS.config.PeerGossipSleep())
  456. return
  457. }
  458. // Load the part
  459. part := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
  460. if part == nil {
  461. logger.Error("Could not load part", "index", index,
  462. "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  463. time.Sleep(conR.conS.config.PeerGossipSleep())
  464. return
  465. }
  466. // Send the part
  467. msg := &BlockPartMessage{
  468. Height: prs.Height, // Not our height, so it doesn't matter.
  469. Round: prs.Round, // Not our height, so it doesn't matter.
  470. Part: part,
  471. }
  472. logger.Debug("Sending block part for catchup", "round", prs.Round)
  473. if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
  474. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  475. }
  476. return
  477. } else {
  478. //logger.Info("No parts to send in catch-up, sleeping")
  479. time.Sleep(conR.conS.config.PeerGossipSleep())
  480. return
  481. }
  482. }
  483. func (conR *ConsensusReactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
  484. logger := conR.Logger.With("peer", peer)
  485. // Simple hack to throttle logs upon sleep.
  486. var sleeping = 0
  487. OUTER_LOOP:
  488. for {
  489. // Manage disconnects from self or peer.
  490. if !peer.IsRunning() || !conR.IsRunning() {
  491. logger.Info("Stopping gossipVotesRoutine for peer")
  492. return
  493. }
  494. rs := conR.conS.GetRoundState()
  495. prs := ps.GetRoundState()
  496. switch sleeping {
  497. case 1: // First sleep
  498. sleeping = 2
  499. case 2: // No more sleep
  500. sleeping = 0
  501. }
  502. //logger.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  503. // "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  504. // If height matches, then send LastCommit, Prevotes, Precommits.
  505. if rs.Height == prs.Height {
  506. heightLogger := logger.With("height", prs.Height)
  507. if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
  508. continue OUTER_LOOP
  509. }
  510. }
  511. // Special catchup logic.
  512. // If peer is lagging by height 1, send LastCommit.
  513. if prs.Height != 0 && rs.Height == prs.Height+1 {
  514. if ps.PickSendVote(rs.LastCommit) {
  515. logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
  516. continue OUTER_LOOP
  517. }
  518. }
  519. // Catchup logic
  520. // If peer is lagging by more than 1, send Commit.
  521. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  522. // Load the block commit for prs.Height,
  523. // which contains precommit signatures for prs.Height.
  524. commit := conR.conS.blockStore.LoadBlockCommit(prs.Height)
  525. logger.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit)
  526. if ps.PickSendVote(commit) {
  527. logger.Debug("Picked Catchup commit to send", "height", prs.Height)
  528. continue OUTER_LOOP
  529. }
  530. }
  531. if sleeping == 0 {
  532. // We sent nothing. Sleep...
  533. sleeping = 1
  534. logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
  535. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  536. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  537. } else if sleeping == 2 {
  538. // Continued sleep...
  539. sleeping = 1
  540. }
  541. time.Sleep(conR.conS.config.PeerGossipSleep())
  542. continue OUTER_LOOP
  543. }
  544. }
  545. func (conR *ConsensusReactor) gossipVotesForHeight(logger log.Logger, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) bool {
  546. // If there are lastCommits to send...
  547. if prs.Step == cstypes.RoundStepNewHeight {
  548. if ps.PickSendVote(rs.LastCommit) {
  549. logger.Debug("Picked rs.LastCommit to send")
  550. return true
  551. }
  552. }
  553. // If there are prevotes to send...
  554. if prs.Step <= cstypes.RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
  555. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  556. logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
  557. return true
  558. }
  559. }
  560. // If there are precommits to send...
  561. if prs.Step <= cstypes.RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
  562. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  563. logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round)
  564. return true
  565. }
  566. }
  567. // If there are POLPrevotes to send...
  568. if prs.ProposalPOLRound != -1 {
  569. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  570. if ps.PickSendVote(polPrevotes) {
  571. logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
  572. "round", prs.ProposalPOLRound)
  573. return true
  574. }
  575. }
  576. }
  577. return false
  578. }
  579. // NOTE: `queryMaj23Routine` has a simple crude design since it only comes
  580. // into play for liveness when there's a signature DDoS attack happening.
  581. func (conR *ConsensusReactor) queryMaj23Routine(peer p2p.Peer, ps *PeerState) {
  582. logger := conR.Logger.With("peer", peer)
  583. OUTER_LOOP:
  584. for {
  585. // Manage disconnects from self or peer.
  586. if !peer.IsRunning() || !conR.IsRunning() {
  587. logger.Info("Stopping queryMaj23Routine for peer")
  588. return
  589. }
  590. // Maybe send Height/Round/Prevotes
  591. {
  592. rs := conR.conS.GetRoundState()
  593. prs := ps.GetRoundState()
  594. if rs.Height == prs.Height {
  595. if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
  596. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  597. Height: prs.Height,
  598. Round: prs.Round,
  599. Type: types.VoteTypePrevote,
  600. BlockID: maj23,
  601. }})
  602. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  603. }
  604. }
  605. }
  606. // Maybe send Height/Round/Precommits
  607. {
  608. rs := conR.conS.GetRoundState()
  609. prs := ps.GetRoundState()
  610. if rs.Height == prs.Height {
  611. if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
  612. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  613. Height: prs.Height,
  614. Round: prs.Round,
  615. Type: types.VoteTypePrecommit,
  616. BlockID: maj23,
  617. }})
  618. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  619. }
  620. }
  621. }
  622. // Maybe send Height/Round/ProposalPOL
  623. {
  624. rs := conR.conS.GetRoundState()
  625. prs := ps.GetRoundState()
  626. if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
  627. if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
  628. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  629. Height: prs.Height,
  630. Round: prs.ProposalPOLRound,
  631. Type: types.VoteTypePrevote,
  632. BlockID: maj23,
  633. }})
  634. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  635. }
  636. }
  637. }
  638. // Little point sending LastCommitRound/LastCommit,
  639. // These are fleeting and non-blocking.
  640. // Maybe send Height/CatchupCommitRound/CatchupCommit.
  641. {
  642. prs := ps.GetRoundState()
  643. if prs.CatchupCommitRound != -1 && 0 < prs.Height && prs.Height <= conR.conS.blockStore.Height() {
  644. commit := conR.conS.LoadCommit(prs.Height)
  645. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  646. Height: prs.Height,
  647. Round: commit.Round(),
  648. Type: types.VoteTypePrecommit,
  649. BlockID: commit.BlockID,
  650. }})
  651. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  652. }
  653. }
  654. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  655. continue OUTER_LOOP
  656. }
  657. }
  658. // String returns a string representation of the ConsensusReactor.
  659. // NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables.
  660. // TODO: improve!
  661. func (conR *ConsensusReactor) String() string {
  662. // better not to access shared variables
  663. return "ConsensusReactor" // conR.StringIndented("")
  664. }
  665. // StringIndented returns an indented string representation of the ConsensusReactor
  666. func (conR *ConsensusReactor) StringIndented(indent string) string {
  667. s := "ConsensusReactor{\n"
  668. s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n"
  669. for _, peer := range conR.Switch.Peers().List() {
  670. ps := peer.Get(types.PeerStateKey).(*PeerState)
  671. s += indent + " " + ps.StringIndented(indent+" ") + "\n"
  672. }
  673. s += indent + "}"
  674. return s
  675. }
  676. //-----------------------------------------------------------------------------
  677. var (
  678. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  679. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  680. )
  681. // PeerState contains the known state of a peer, including its connection
  682. // and threadsafe access to its PeerRoundState.
  683. type PeerState struct {
  684. Peer p2p.Peer
  685. logger log.Logger
  686. mtx sync.Mutex
  687. cstypes.PeerRoundState
  688. }
  689. // NewPeerState returns a new PeerState for the given Peer
  690. func NewPeerState(peer p2p.Peer) *PeerState {
  691. return &PeerState{
  692. Peer: peer,
  693. logger: log.NewNopLogger(),
  694. PeerRoundState: cstypes.PeerRoundState{
  695. Round: -1,
  696. ProposalPOLRound: -1,
  697. LastCommitRound: -1,
  698. CatchupCommitRound: -1,
  699. },
  700. }
  701. }
  702. func (ps *PeerState) SetLogger(logger log.Logger) *PeerState {
  703. ps.logger = logger
  704. return ps
  705. }
  706. // GetRoundState returns an atomic snapshot of the PeerRoundState.
  707. // There's no point in mutating it since it won't change PeerState.
  708. func (ps *PeerState) GetRoundState() *cstypes.PeerRoundState {
  709. ps.mtx.Lock()
  710. defer ps.mtx.Unlock()
  711. prs := ps.PeerRoundState // copy
  712. return &prs
  713. }
  714. // GetHeight returns an atomic snapshot of the PeerRoundState's height
  715. // used by the mempool to ensure peers are caught up before broadcasting new txs
  716. func (ps *PeerState) GetHeight() int {
  717. ps.mtx.Lock()
  718. defer ps.mtx.Unlock()
  719. return ps.PeerRoundState.Height
  720. }
  721. // SetHasProposal sets the given proposal as known for the peer.
  722. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  723. ps.mtx.Lock()
  724. defer ps.mtx.Unlock()
  725. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  726. return
  727. }
  728. if ps.Proposal {
  729. return
  730. }
  731. ps.Proposal = true
  732. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  733. ps.ProposalBlockParts = cmn.NewBitArray(proposal.BlockPartsHeader.Total)
  734. ps.ProposalPOLRound = proposal.POLRound
  735. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  736. }
  737. // SetHasProposalBlockPart sets the given block part index as known for the peer.
  738. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  739. ps.mtx.Lock()
  740. defer ps.mtx.Unlock()
  741. if ps.Height != height || ps.Round != round {
  742. return
  743. }
  744. ps.ProposalBlockParts.SetIndex(index, true)
  745. }
  746. // PickSendVote picks a vote and sends it to the peer.
  747. // Returns true if vote was sent.
  748. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
  749. if vote, ok := ps.PickVoteToSend(votes); ok {
  750. msg := &VoteMessage{vote}
  751. return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
  752. }
  753. return false
  754. }
  755. // PickVoteToSend picks a vote to send to the peer.
  756. // Returns true if a vote was picked.
  757. // NOTE: `votes` must be the correct Size() for the Height().
  758. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) {
  759. ps.mtx.Lock()
  760. defer ps.mtx.Unlock()
  761. if votes.Size() == 0 {
  762. return nil, false
  763. }
  764. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  765. // Lazily set data using 'votes'.
  766. if votes.IsCommit() {
  767. ps.ensureCatchupCommitRound(height, round, size)
  768. }
  769. ps.ensureVoteBitArrays(height, size)
  770. psVotes := ps.getVoteBitArray(height, round, type_)
  771. if psVotes == nil {
  772. return nil, false // Not something worth sending
  773. }
  774. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  775. ps.setHasVote(height, round, type_, index)
  776. return votes.GetByIndex(index), true
  777. }
  778. return nil, false
  779. }
  780. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *cmn.BitArray {
  781. if !types.IsVoteTypeValid(type_) {
  782. return nil
  783. }
  784. if ps.Height == height {
  785. if ps.Round == round {
  786. switch type_ {
  787. case types.VoteTypePrevote:
  788. return ps.Prevotes
  789. case types.VoteTypePrecommit:
  790. return ps.Precommits
  791. }
  792. }
  793. if ps.CatchupCommitRound == round {
  794. switch type_ {
  795. case types.VoteTypePrevote:
  796. return nil
  797. case types.VoteTypePrecommit:
  798. return ps.CatchupCommit
  799. }
  800. }
  801. if ps.ProposalPOLRound == round {
  802. switch type_ {
  803. case types.VoteTypePrevote:
  804. return ps.ProposalPOL
  805. case types.VoteTypePrecommit:
  806. return nil
  807. }
  808. }
  809. return nil
  810. }
  811. if ps.Height == height+1 {
  812. if ps.LastCommitRound == round {
  813. switch type_ {
  814. case types.VoteTypePrevote:
  815. return nil
  816. case types.VoteTypePrecommit:
  817. return ps.LastCommit
  818. }
  819. }
  820. return nil
  821. }
  822. return nil
  823. }
  824. // 'round': A round for which we have a +2/3 commit.
  825. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  826. if ps.Height != height {
  827. return
  828. }
  829. /*
  830. NOTE: This is wrong, 'round' could change.
  831. e.g. if orig round is not the same as block LastCommit round.
  832. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  833. cmn.PanicSanity(cmn.Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  834. }
  835. */
  836. if ps.CatchupCommitRound == round {
  837. return // Nothing to do!
  838. }
  839. ps.CatchupCommitRound = round
  840. if round == ps.Round {
  841. ps.CatchupCommit = ps.Precommits
  842. } else {
  843. ps.CatchupCommit = cmn.NewBitArray(numValidators)
  844. }
  845. }
  846. // EnsureVoteVitArrays ensures the bit-arrays have been allocated for tracking
  847. // what votes this peer has received.
  848. // NOTE: It's important to make sure that numValidators actually matches
  849. // what the node sees as the number of validators for height.
  850. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  851. ps.mtx.Lock()
  852. defer ps.mtx.Unlock()
  853. ps.ensureVoteBitArrays(height, numValidators)
  854. }
  855. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  856. if ps.Height == height {
  857. if ps.Prevotes == nil {
  858. ps.Prevotes = cmn.NewBitArray(numValidators)
  859. }
  860. if ps.Precommits == nil {
  861. ps.Precommits = cmn.NewBitArray(numValidators)
  862. }
  863. if ps.CatchupCommit == nil {
  864. ps.CatchupCommit = cmn.NewBitArray(numValidators)
  865. }
  866. if ps.ProposalPOL == nil {
  867. ps.ProposalPOL = cmn.NewBitArray(numValidators)
  868. }
  869. } else if ps.Height == height+1 {
  870. if ps.LastCommit == nil {
  871. ps.LastCommit = cmn.NewBitArray(numValidators)
  872. }
  873. }
  874. }
  875. // SetHasVote sets the given vote as known by the peer
  876. func (ps *PeerState) SetHasVote(vote *types.Vote) {
  877. ps.mtx.Lock()
  878. defer ps.mtx.Unlock()
  879. ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
  880. }
  881. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  882. logger := ps.logger.With("peerRound", ps.Round, "height", height, "round", round)
  883. logger.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  884. // NOTE: some may be nil BitArrays -> no side effects.
  885. psVotes := ps.getVoteBitArray(height, round, type_)
  886. if psVotes != nil {
  887. psVotes.SetIndex(index, true)
  888. }
  889. }
  890. // ApplyNewRoundStepMessage updates the peer state for the new round.
  891. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  892. ps.mtx.Lock()
  893. defer ps.mtx.Unlock()
  894. // Ignore duplicates or decreases
  895. if CompareHRS(msg.Height, msg.Round, msg.Step, ps.Height, ps.Round, ps.Step) <= 0 {
  896. return
  897. }
  898. // Just remember these values.
  899. psHeight := ps.Height
  900. psRound := ps.Round
  901. //psStep := ps.Step
  902. psCatchupCommitRound := ps.CatchupCommitRound
  903. psCatchupCommit := ps.CatchupCommit
  904. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  905. ps.Height = msg.Height
  906. ps.Round = msg.Round
  907. ps.Step = msg.Step
  908. ps.StartTime = startTime
  909. if psHeight != msg.Height || psRound != msg.Round {
  910. ps.Proposal = false
  911. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  912. ps.ProposalBlockParts = nil
  913. ps.ProposalPOLRound = -1
  914. ps.ProposalPOL = nil
  915. // We'll update the BitArray capacity later.
  916. ps.Prevotes = nil
  917. ps.Precommits = nil
  918. }
  919. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  920. // Peer caught up to CatchupCommitRound.
  921. // Preserve psCatchupCommit!
  922. // NOTE: We prefer to use prs.Precommits if
  923. // pr.Round matches pr.CatchupCommitRound.
  924. ps.Precommits = psCatchupCommit
  925. }
  926. if psHeight != msg.Height {
  927. // Shift Precommits to LastCommit.
  928. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  929. ps.LastCommitRound = msg.LastCommitRound
  930. ps.LastCommit = ps.Precommits
  931. } else {
  932. ps.LastCommitRound = msg.LastCommitRound
  933. ps.LastCommit = nil
  934. }
  935. // We'll update the BitArray capacity later.
  936. ps.CatchupCommitRound = -1
  937. ps.CatchupCommit = nil
  938. }
  939. }
  940. // ApplyCommitStepMessage updates the peer state for the new commit.
  941. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  942. ps.mtx.Lock()
  943. defer ps.mtx.Unlock()
  944. if ps.Height != msg.Height {
  945. return
  946. }
  947. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  948. ps.ProposalBlockParts = msg.BlockParts
  949. }
  950. // ApplyProposalPOLMessage updates the peer state for the new proposal POL.
  951. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  952. ps.mtx.Lock()
  953. defer ps.mtx.Unlock()
  954. if ps.Height != msg.Height {
  955. return
  956. }
  957. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  958. return
  959. }
  960. // TODO: Merge onto existing ps.ProposalPOL?
  961. // We might have sent some prevotes in the meantime.
  962. ps.ProposalPOL = msg.ProposalPOL
  963. }
  964. // ApplyHasVoteMessage updates the peer state for the new vote.
  965. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  966. ps.mtx.Lock()
  967. defer ps.mtx.Unlock()
  968. if ps.Height != msg.Height {
  969. return
  970. }
  971. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  972. }
  973. // ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
  974. // it claims to have for the corresponding BlockID.
  975. // `ourVotes` is a BitArray of votes we have for msg.BlockID
  976. // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
  977. // we conservatively overwrite ps's votes w/ msg.Votes.
  978. func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *cmn.BitArray) {
  979. ps.mtx.Lock()
  980. defer ps.mtx.Unlock()
  981. votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
  982. if votes != nil {
  983. if ourVotes == nil {
  984. votes.Update(msg.Votes)
  985. } else {
  986. otherVotes := votes.Sub(ourVotes)
  987. hasVotes := otherVotes.Or(msg.Votes)
  988. votes.Update(hasVotes)
  989. }
  990. }
  991. }
  992. // String returns a string representation of the PeerState
  993. func (ps *PeerState) String() string {
  994. return ps.StringIndented("")
  995. }
  996. // StringIndented returns a string representation of the PeerState
  997. func (ps *PeerState) StringIndented(indent string) string {
  998. return fmt.Sprintf(`PeerState{
  999. %s Key %v
  1000. %s PRS %v
  1001. %s}`,
  1002. indent, ps.Peer.Key(),
  1003. indent, ps.PeerRoundState.StringIndented(indent+" "),
  1004. indent)
  1005. }
  1006. //-----------------------------------------------------------------------------
  1007. // Messages
  1008. const (
  1009. msgTypeNewRoundStep = byte(0x01)
  1010. msgTypeCommitStep = byte(0x02)
  1011. msgTypeProposal = byte(0x11)
  1012. msgTypeProposalPOL = byte(0x12)
  1013. msgTypeBlockPart = byte(0x13) // both block & POL
  1014. msgTypeVote = byte(0x14)
  1015. msgTypeHasVote = byte(0x15)
  1016. msgTypeVoteSetMaj23 = byte(0x16)
  1017. msgTypeVoteSetBits = byte(0x17)
  1018. msgTypeProposalHeartbeat = byte(0x20)
  1019. )
  1020. // ConsensusMessage is a message that can be sent and received on the ConsensusReactor
  1021. type ConsensusMessage interface{}
  1022. var _ = wire.RegisterInterface(
  1023. struct{ ConsensusMessage }{},
  1024. wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  1025. wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  1026. wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  1027. wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  1028. wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  1029. wire.ConcreteType{&VoteMessage{}, msgTypeVote},
  1030. wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  1031. wire.ConcreteType{&VoteSetMaj23Message{}, msgTypeVoteSetMaj23},
  1032. wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits},
  1033. wire.ConcreteType{&ProposalHeartbeatMessage{}, msgTypeProposalHeartbeat},
  1034. )
  1035. // DecodeMessage decodes the given bytes into a ConsensusMessage.
  1036. // TODO: check for unnecessary extra bytes at the end.
  1037. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  1038. msgType = bz[0]
  1039. n := new(int)
  1040. r := bytes.NewReader(bz)
  1041. msgI := wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err)
  1042. msg = msgI.(struct{ ConsensusMessage }).ConsensusMessage
  1043. return
  1044. }
  1045. //-------------------------------------
  1046. // NewRoundStepMessage is sent for every step taken in the ConsensusState.
  1047. // For every height/round/step transition
  1048. type NewRoundStepMessage struct {
  1049. Height int
  1050. Round int
  1051. Step cstypes.RoundStepType
  1052. SecondsSinceStartTime int
  1053. LastCommitRound int
  1054. }
  1055. // String returns a string representation.
  1056. func (m *NewRoundStepMessage) String() string {
  1057. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  1058. m.Height, m.Round, m.Step, m.LastCommitRound)
  1059. }
  1060. //-------------------------------------
  1061. // CommitStepMessage is sent when a block is committed.
  1062. type CommitStepMessage struct {
  1063. Height int
  1064. BlockPartsHeader types.PartSetHeader
  1065. BlockParts *cmn.BitArray
  1066. }
  1067. // String returns a string representation.
  1068. func (m *CommitStepMessage) String() string {
  1069. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  1070. }
  1071. //-------------------------------------
  1072. // ProposalMessage is sent when a new block is proposed.
  1073. type ProposalMessage struct {
  1074. Proposal *types.Proposal
  1075. }
  1076. // String returns a string representation.
  1077. func (m *ProposalMessage) String() string {
  1078. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  1079. }
  1080. //-------------------------------------
  1081. // ProposalPOLMessage is sent when a previous proposal is re-proposed.
  1082. type ProposalPOLMessage struct {
  1083. Height int
  1084. ProposalPOLRound int
  1085. ProposalPOL *cmn.BitArray
  1086. }
  1087. // String returns a string representation.
  1088. func (m *ProposalPOLMessage) String() string {
  1089. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  1090. }
  1091. //-------------------------------------
  1092. // BlockPartMessage is sent when gossipping a piece of the proposed block.
  1093. type BlockPartMessage struct {
  1094. Height int
  1095. Round int
  1096. Part *types.Part
  1097. }
  1098. // String returns a string representation.
  1099. func (m *BlockPartMessage) String() string {
  1100. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  1101. }
  1102. //-------------------------------------
  1103. // VoteMessage is sent when voting for a proposal (or lack thereof).
  1104. type VoteMessage struct {
  1105. Vote *types.Vote
  1106. }
  1107. // String returns a string representation.
  1108. func (m *VoteMessage) String() string {
  1109. return fmt.Sprintf("[Vote %v]", m.Vote)
  1110. }
  1111. //-------------------------------------
  1112. // HasVoteMessage is sent to indicate that a particular vote has been received.
  1113. type HasVoteMessage struct {
  1114. Height int
  1115. Round int
  1116. Type byte
  1117. Index int
  1118. }
  1119. // String returns a string representation.
  1120. func (m *HasVoteMessage) String() string {
  1121. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  1122. }
  1123. //-------------------------------------
  1124. // VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
  1125. type VoteSetMaj23Message struct {
  1126. Height int
  1127. Round int
  1128. Type byte
  1129. BlockID types.BlockID
  1130. }
  1131. // String returns a string representation.
  1132. func (m *VoteSetMaj23Message) String() string {
  1133. return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
  1134. }
  1135. //-------------------------------------
  1136. // VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID.
  1137. type VoteSetBitsMessage struct {
  1138. Height int
  1139. Round int
  1140. Type byte
  1141. BlockID types.BlockID
  1142. Votes *cmn.BitArray
  1143. }
  1144. // String returns a string representation.
  1145. func (m *VoteSetBitsMessage) String() string {
  1146. return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
  1147. }
  1148. //-------------------------------------
  1149. // ProposalHeartbeatMessage is sent to signal that a node is alive and waiting for transactions for a proposal.
  1150. type ProposalHeartbeatMessage struct {
  1151. Heartbeat *types.Heartbeat
  1152. }
  1153. // String returns a string representation.
  1154. func (m *ProposalHeartbeatMessage) String() string {
  1155. return fmt.Sprintf("[HEARTBEAT %v]", m.Heartbeat)
  1156. }