You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1224 lines
35 KiB

10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/go-p2p"
  11. "github.com/tendermint/go-wire"
  12. bc "github.com/tendermint/tendermint/blockchain"
  13. sm "github.com/tendermint/tendermint/state"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. StateChannel = byte(0x20)
  18. DataChannel = byte(0x21)
  19. VoteChannel = byte(0x22)
  20. VoteSetBitsChannel = byte(0x23)
  21. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  22. peerQueryMaj23SleepDuration = 5 * time.Second // Time to sleep after each VoteSetMaj23Message sent
  23. maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.
  24. )
  25. //-----------------------------------------------------------------------------
  26. type ConsensusReactor struct {
  27. p2p.BaseReactor // BaseService + p2p.Switch
  28. blockStore *bc.BlockStore
  29. conS *ConsensusState
  30. fastSync bool
  31. evsw types.EventSwitch
  32. }
  33. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
  34. conR := &ConsensusReactor{
  35. blockStore: blockStore,
  36. conS: consensusState,
  37. fastSync: fastSync,
  38. }
  39. conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR)
  40. return conR
  41. }
  42. func (conR *ConsensusReactor) OnStart() error {
  43. log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
  44. conR.BaseReactor.OnStart()
  45. // callbacks for broadcasting new steps and votes to peers
  46. // upon their respective events (ie. uses evsw)
  47. conR.registerEventCallbacks()
  48. if !conR.fastSync {
  49. _, err := conR.conS.Start()
  50. if err != nil {
  51. return err
  52. }
  53. }
  54. return nil
  55. }
  56. func (conR *ConsensusReactor) OnStop() {
  57. conR.BaseReactor.OnStop()
  58. conR.conS.Stop()
  59. }
  60. // Switch from the fast_sync to the consensus:
  61. // reset the state, turn off fast_sync, start the consensus-state-machine
  62. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  63. log.Notice("SwitchToConsensus")
  64. conR.conS.reconstructLastCommit(state)
  65. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  66. // broadcast a NewRoundStepMessage.
  67. conR.conS.updateToState(state)
  68. conR.fastSync = false
  69. conR.conS.Start()
  70. }
  71. // Implements Reactor
  72. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  73. // TODO optimize
  74. return []*p2p.ChannelDescriptor{
  75. &p2p.ChannelDescriptor{
  76. ID: StateChannel,
  77. Priority: 5,
  78. SendQueueCapacity: 100,
  79. },
  80. &p2p.ChannelDescriptor{
  81. ID: DataChannel, // maybe split between gossiping current block and catchup stuff
  82. Priority: 10, // once we gossip the whole block there's nothing left to send until next height or round
  83. SendQueueCapacity: 100,
  84. RecvBufferCapacity: 50 * 4096,
  85. },
  86. &p2p.ChannelDescriptor{
  87. ID: VoteChannel,
  88. Priority: 5,
  89. SendQueueCapacity: 100,
  90. RecvBufferCapacity: 100 * 100,
  91. },
  92. &p2p.ChannelDescriptor{
  93. ID: VoteSetBitsChannel,
  94. Priority: 1,
  95. SendQueueCapacity: 2,
  96. RecvBufferCapacity: 1024,
  97. },
  98. }
  99. }
  100. // Implements Reactor
  101. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  102. if !conR.IsRunning() {
  103. return
  104. }
  105. // Create peerState for peer
  106. peerState := NewPeerState(peer)
  107. peer.Data.Set(types.PeerStateKey, peerState)
  108. // Begin routines for this peer.
  109. go conR.gossipDataRoutine(peer, peerState)
  110. go conR.gossipVotesRoutine(peer, peerState)
  111. go conR.queryMaj23Routine(peer, peerState)
  112. go conR.replyMaj23Routine(peer, peerState)
  113. // Send our state to peer.
  114. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  115. if !conR.fastSync {
  116. conR.sendNewRoundStepMessage(peer)
  117. }
  118. }
  119. // Implements Reactor
  120. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  121. if !conR.IsRunning() {
  122. return
  123. }
  124. // TODO
  125. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  126. }
  127. // Implements Reactor
  128. // NOTE: We process these messages even when we're fast_syncing.
  129. // Messages affect either a peer state or the consensus state.
  130. // Peer state updates can happen in parallel, but processing of
  131. // proposals, block parts, and votes are ordered by the receiveRoutine
  132. // NOTE: blocks on consensus state for proposals, block parts, and votes
  133. func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  134. if !conR.IsRunning() {
  135. log.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
  136. return
  137. }
  138. _, msg, err := DecodeMessage(msgBytes)
  139. if err != nil {
  140. log.Warn("Error decoding message", "src", src, "chId", chID, "msg", msg, "error", err, "bytes", msgBytes)
  141. // TODO punish peer?
  142. return
  143. }
  144. log.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  145. // Get peer states
  146. ps := src.Data.Get(types.PeerStateKey).(*PeerState)
  147. switch chID {
  148. case StateChannel:
  149. switch msg := msg.(type) {
  150. case *NewRoundStepMessage:
  151. ps.ApplyNewRoundStepMessage(msg)
  152. case *CommitStepMessage:
  153. ps.ApplyCommitStepMessage(msg)
  154. case *HasVoteMessage:
  155. ps.ApplyHasVoteMessage(msg)
  156. case *VoteSetMaj23Message:
  157. cs := conR.conS
  158. cs.mtx.Lock()
  159. height, votes := cs.Height, cs.Votes
  160. cs.mtx.Unlock()
  161. if height == msg.Height {
  162. votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID)
  163. }
  164. ps.ApplyVoteSetMaj23Message(msg)
  165. default:
  166. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  167. }
  168. case DataChannel:
  169. if conR.fastSync {
  170. log.Warn("Ignoring message received during fastSync", "msg", msg)
  171. return
  172. }
  173. switch msg := msg.(type) {
  174. case *ProposalMessage:
  175. ps.SetHasProposal(msg.Proposal)
  176. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  177. case *ProposalPOLMessage:
  178. ps.ApplyProposalPOLMessage(msg)
  179. case *BlockPartMessage:
  180. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  181. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  182. default:
  183. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  184. }
  185. case VoteChannel:
  186. if conR.fastSync {
  187. log.Warn("Ignoring message received during fastSync", "msg", msg)
  188. return
  189. }
  190. switch msg := msg.(type) {
  191. case *VoteMessage:
  192. cs := conR.conS
  193. cs.mtx.Lock()
  194. height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
  195. cs.mtx.Unlock()
  196. ps.EnsureVoteBitArrays(height, valSize)
  197. ps.EnsureVoteBitArrays(height-1, lastCommitSize)
  198. ps.SetHasVote(msg.Vote)
  199. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  200. default:
  201. // don't punish (leave room for soft upgrades)
  202. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  203. }
  204. case VoteSetBitsChannel:
  205. if conR.fastSync {
  206. log.Warn("Ignoring message received during fastSync", "msg", msg)
  207. return
  208. }
  209. switch msg := msg.(type) {
  210. case *VoteSetBitsMessage:
  211. cs := conR.conS
  212. cs.mtx.Lock()
  213. height, votes := cs.Height, cs.Votes
  214. cs.mtx.Unlock()
  215. if height == msg.Height {
  216. var ourVotes *BitArray
  217. switch msg.Type {
  218. case types.VoteTypePrevote:
  219. ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
  220. case types.VoteTypePrecommit:
  221. ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
  222. default:
  223. log.Warn("Bad VoteSetBitsMessage field Type")
  224. return
  225. }
  226. ps.ApplyVoteSetBitsMessage(msg, ourVotes)
  227. } else {
  228. ps.ApplyVoteSetBitsMessage(msg, nil)
  229. }
  230. default:
  231. // don't punish (leave room for soft upgrades)
  232. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  233. }
  234. default:
  235. log.Warn(Fmt("Unknown chId %X", chID))
  236. }
  237. if err != nil {
  238. log.Warn("Error in Receive()", "error", err)
  239. }
  240. }
  241. // Sets our private validator account for signing votes.
  242. func (conR *ConsensusReactor) SetPrivValidator(priv PrivValidator) {
  243. conR.conS.SetPrivValidator(priv)
  244. }
  245. // implements events.Eventable
  246. func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) {
  247. conR.evsw = evsw
  248. conR.conS.SetEventSwitch(evsw)
  249. }
  250. //--------------------------------------
  251. // Listens for new steps and votes,
  252. // broadcasting the result to peers
  253. func (conR *ConsensusReactor) registerEventCallbacks() {
  254. types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) {
  255. rs := data.(types.EventDataRoundState).RoundState.(*RoundState)
  256. conR.broadcastNewRoundStep(rs)
  257. })
  258. types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) {
  259. edv := data.(types.EventDataVote)
  260. conR.broadcastHasVoteMessage(edv.Vote)
  261. })
  262. }
  263. func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) {
  264. nrsMsg, csMsg := makeRoundStepMessages(rs)
  265. if nrsMsg != nil {
  266. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  267. }
  268. if csMsg != nil {
  269. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{csMsg})
  270. }
  271. }
  272. // Broadcasts HasVoteMessage to peers that care.
  273. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) {
  274. msg := &HasVoteMessage{
  275. Height: vote.Height,
  276. Round: vote.Round,
  277. Type: vote.Type,
  278. Index: vote.ValidatorIndex,
  279. }
  280. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
  281. /*
  282. // TODO: Make this broadcast more selective.
  283. for _, peer := range conR.Switch.Peers().List() {
  284. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  285. prs := ps.GetRoundState()
  286. if prs.Height == vote.Height {
  287. // TODO: Also filter on round?
  288. peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
  289. } else {
  290. // Height doesn't match
  291. // TODO: check a field, maybe CatchupCommitRound?
  292. // TODO: But that requires changing the struct field comment.
  293. }
  294. }
  295. */
  296. }
  297. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  298. nrsMsg = &NewRoundStepMessage{
  299. Height: rs.Height,
  300. Round: rs.Round,
  301. Step: rs.Step,
  302. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  303. LastCommitRound: rs.LastCommit.Round(),
  304. }
  305. if rs.Step == RoundStepCommit {
  306. csMsg = &CommitStepMessage{
  307. Height: rs.Height,
  308. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  309. BlockParts: rs.ProposalBlockParts.BitArray(),
  310. }
  311. }
  312. return
  313. }
  314. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  315. rs := conR.conS.GetRoundState()
  316. nrsMsg, csMsg := makeRoundStepMessages(rs)
  317. if nrsMsg != nil {
  318. peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  319. }
  320. if csMsg != nil {
  321. peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg})
  322. }
  323. }
  324. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  325. log := log.New("peer", peer)
  326. OUTER_LOOP:
  327. for {
  328. // Manage disconnects from self or peer.
  329. if !peer.IsRunning() || !conR.IsRunning() {
  330. log.Notice(Fmt("Stopping gossipDataRoutine for %v.", peer))
  331. return
  332. }
  333. rs := conR.conS.GetRoundState()
  334. prs := ps.GetRoundState()
  335. // Send proposal Block parts?
  336. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  337. //log.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  338. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  339. part := rs.ProposalBlockParts.GetPart(index)
  340. msg := &BlockPartMessage{
  341. Height: rs.Height, // This tells peer that this part applies to us.
  342. Round: rs.Round, // This tells peer that this part applies to us.
  343. Part: part,
  344. }
  345. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  346. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  347. continue OUTER_LOOP
  348. }
  349. }
  350. // If the peer is on a previous height, help catch up.
  351. if (0 < prs.Height) && (prs.Height < rs.Height) {
  352. //log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  353. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  354. // Ensure that the peer's PartSetHeader is correct
  355. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  356. if blockMeta == nil {
  357. log.Warn("Failed to load block meta", "peer height", prs.Height, "our height", rs.Height)
  358. time.Sleep(peerGossipSleepDuration)
  359. continue OUTER_LOOP
  360. } else if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  361. log.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  362. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  363. time.Sleep(peerGossipSleepDuration)
  364. continue OUTER_LOOP
  365. }
  366. // Load the part
  367. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  368. if part == nil {
  369. log.Warn("Could not load part", "index", index,
  370. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  371. time.Sleep(peerGossipSleepDuration)
  372. continue OUTER_LOOP
  373. }
  374. // Send the part
  375. msg := &BlockPartMessage{
  376. Height: prs.Height, // Not our height, so it doesn't matter.
  377. Round: prs.Round, // Not our height, so it doesn't matter.
  378. Part: part,
  379. }
  380. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  381. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  382. continue OUTER_LOOP
  383. } else {
  384. //log.Info("No parts to send in catch-up, sleeping")
  385. time.Sleep(peerGossipSleepDuration)
  386. continue OUTER_LOOP
  387. }
  388. }
  389. // If height and round don't match, sleep.
  390. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  391. //log.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  392. time.Sleep(peerGossipSleepDuration)
  393. continue OUTER_LOOP
  394. }
  395. // By here, height and round match.
  396. // Proposal block parts were already matched and sent if any were wanted.
  397. // (These can match on hash so the round doesn't matter)
  398. // Now consider sending other things, like the Proposal itself.
  399. // Send Proposal && ProposalPOL BitArray?
  400. if rs.Proposal != nil && !prs.Proposal {
  401. // Proposal: share the proposal metadata with peer.
  402. {
  403. msg := &ProposalMessage{Proposal: rs.Proposal}
  404. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  405. ps.SetHasProposal(rs.Proposal)
  406. }
  407. // ProposalPOL: lets peer know which POL votes we have so far.
  408. // Peer must receive ProposalMessage first.
  409. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  410. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  411. if 0 <= rs.Proposal.POLRound {
  412. msg := &ProposalPOLMessage{
  413. Height: rs.Height,
  414. ProposalPOLRound: rs.Proposal.POLRound,
  415. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  416. }
  417. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  418. }
  419. continue OUTER_LOOP
  420. }
  421. // Nothing to do. Sleep.
  422. time.Sleep(peerGossipSleepDuration)
  423. continue OUTER_LOOP
  424. }
  425. }
  426. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  427. log := log.New("peer", peer)
  428. // Simple hack to throttle logs upon sleep.
  429. var sleeping = 0
  430. OUTER_LOOP:
  431. for {
  432. // Manage disconnects from self or peer.
  433. if !peer.IsRunning() || !conR.IsRunning() {
  434. log.Notice(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  435. return
  436. }
  437. rs := conR.conS.GetRoundState()
  438. prs := ps.GetRoundState()
  439. switch sleeping {
  440. case 1: // First sleep
  441. sleeping = 2
  442. case 2: // No more sleep
  443. sleeping = 0
  444. }
  445. //log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  446. // "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  447. // If height matches, then send LastCommit, Prevotes, Precommits.
  448. if rs.Height == prs.Height {
  449. // If there are lastCommits to send...
  450. if prs.Step == RoundStepNewHeight {
  451. if ps.PickSendVote(rs.LastCommit) {
  452. log.Debug("Picked rs.LastCommit to send")
  453. continue OUTER_LOOP
  454. }
  455. }
  456. // If there are prevotes to send...
  457. if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
  458. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  459. log.Debug("Picked rs.Prevotes(prs.Round) to send")
  460. continue OUTER_LOOP
  461. }
  462. }
  463. // If there are precommits to send...
  464. if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
  465. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  466. log.Debug("Picked rs.Precommits(prs.Round) to send")
  467. continue OUTER_LOOP
  468. }
  469. }
  470. // If there are POLPrevotes to send...
  471. if prs.ProposalPOLRound != -1 {
  472. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  473. if ps.PickSendVote(polPrevotes) {
  474. log.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  475. continue OUTER_LOOP
  476. }
  477. }
  478. }
  479. }
  480. // Special catchup logic.
  481. // If peer is lagging by height 1, send LastCommit.
  482. if prs.Height != 0 && rs.Height == prs.Height+1 {
  483. if ps.PickSendVote(rs.LastCommit) {
  484. log.Debug("Picked rs.LastCommit to send")
  485. continue OUTER_LOOP
  486. }
  487. }
  488. // Catchup logic
  489. // If peer is lagging by more than 1, send Commit.
  490. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  491. // Load the block commit for prs.Height,
  492. // which contains precommit signatures for prs.Height.
  493. commit := conR.blockStore.LoadBlockCommit(prs.Height)
  494. log.Debug("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit)
  495. if ps.PickSendVote(commit) {
  496. log.Debug("Picked Catchup commit to send")
  497. continue OUTER_LOOP
  498. }
  499. }
  500. if sleeping == 0 {
  501. // We sent nothing. Sleep...
  502. sleeping = 1
  503. log.Debug("No votes to send, sleeping", "peer", peer,
  504. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  505. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  506. } else if sleeping == 2 {
  507. // Continued sleep...
  508. sleeping = 1
  509. }
  510. time.Sleep(peerGossipSleepDuration)
  511. continue OUTER_LOOP
  512. }
  513. }
  514. func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) {
  515. log := log.New("peer", peer)
  516. OUTER_LOOP:
  517. for {
  518. // Manage disconnects from self or peer.
  519. if !peer.IsRunning() || !conR.IsRunning() {
  520. log.Notice(Fmt("Stopping queryMaj23Routine for %v.", peer))
  521. return
  522. }
  523. // Maybe send Height/Round/Prevotes
  524. {
  525. rs := conR.conS.GetRoundState()
  526. prs := ps.GetRoundState()
  527. if rs.Height == prs.Height {
  528. if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
  529. peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  530. Height: prs.Height,
  531. Round: prs.Round,
  532. Type: types.VoteTypePrevote,
  533. BlockID: maj23,
  534. }})
  535. time.Sleep(peerQueryMaj23SleepDuration)
  536. rs = conR.conS.GetRoundState()
  537. prs = ps.GetRoundState()
  538. }
  539. }
  540. }
  541. // Maybe send Height/Round/Precommits
  542. {
  543. rs := conR.conS.GetRoundState()
  544. prs := ps.GetRoundState()
  545. if rs.Height == prs.Height {
  546. if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
  547. peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  548. Height: prs.Height,
  549. Round: prs.Round,
  550. Type: types.VoteTypePrecommit,
  551. BlockID: maj23,
  552. }})
  553. time.Sleep(peerQueryMaj23SleepDuration)
  554. }
  555. }
  556. }
  557. // Maybe send Height/Round/ProposalPOL
  558. {
  559. rs := conR.conS.GetRoundState()
  560. prs := ps.GetRoundState()
  561. if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
  562. if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
  563. peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  564. Height: prs.Height,
  565. Round: prs.ProposalPOLRound,
  566. Type: types.VoteTypePrevote,
  567. BlockID: maj23,
  568. }})
  569. time.Sleep(peerQueryMaj23SleepDuration)
  570. }
  571. }
  572. }
  573. // Little point sending LastCommitRound/LastCommit,
  574. // These are fleeting and non-blocking.
  575. // Maybe send Height/CatchupCommitRound/CatchupCommit.
  576. {
  577. prs := ps.GetRoundState()
  578. if prs.CatchupCommitRound != -1 {
  579. commit := conR.blockStore.LoadBlockCommit(prs.Height)
  580. peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  581. Height: prs.Height,
  582. Round: commit.Round(),
  583. Type: types.VoteTypePrecommit,
  584. BlockID: commit.BlockID,
  585. }})
  586. time.Sleep(peerQueryMaj23SleepDuration)
  587. }
  588. }
  589. continue OUTER_LOOP
  590. }
  591. }
  592. func (conR *ConsensusReactor) replyMaj23Routine(peer *p2p.Peer, ps *PeerState) {
  593. log := log.New("peer", peer)
  594. OUTER_LOOP:
  595. for {
  596. // Manage disconnects from self or peer.
  597. if !peer.IsRunning() || !conR.IsRunning() {
  598. log.Notice(Fmt("Stopping replyMaj23Routine for %v.", peer))
  599. return
  600. }
  601. rs := conR.conS.GetRoundState()
  602. // Process a VoteSetMaj23Message
  603. msg := <-ps.Maj23Queue
  604. if rs.Height == msg.Height {
  605. var ourVotes *BitArray
  606. switch msg.Type {
  607. case types.VoteTypePrevote:
  608. ourVotes = rs.Votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
  609. case types.VoteTypePrecommit:
  610. ourVotes = rs.Votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
  611. default:
  612. log.Warn("Bad VoteSetBitsMessage field Type")
  613. return
  614. }
  615. peer.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{
  616. Height: msg.Height,
  617. Round: msg.Round,
  618. Type: msg.Type,
  619. BlockID: msg.BlockID,
  620. Votes: ourVotes,
  621. }})
  622. }
  623. continue OUTER_LOOP
  624. }
  625. }
  626. //-----------------------------------------------------------------------------
  627. // Read only when returned by PeerState.GetRoundState().
  628. type PeerRoundState struct {
  629. Height int // Height peer is at
  630. Round int // Round peer is at, -1 if unknown.
  631. Step RoundStepType // Step peer is at
  632. StartTime time.Time // Estimated start of round 0 at this height
  633. Proposal bool // True if peer has proposal for this round
  634. ProposalBlockPartsHeader types.PartSetHeader //
  635. ProposalBlockParts *BitArray //
  636. ProposalPOLRound int // Proposal's POL round. -1 if none.
  637. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  638. Prevotes *BitArray // All votes peer has for this round
  639. Precommits *BitArray // All precommits peer has for this round
  640. LastCommitRound int // Round of commit for last height. -1 if none.
  641. LastCommit *BitArray // All commit precommits of commit for last height.
  642. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none.
  643. CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound
  644. }
  645. //-----------------------------------------------------------------------------
  646. var (
  647. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  648. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  649. )
  650. type PeerState struct {
  651. Peer *p2p.Peer
  652. mtx sync.Mutex
  653. PeerRoundState
  654. Maj23Queue chan *VoteSetMaj23Message
  655. }
  656. func NewPeerState(peer *p2p.Peer) *PeerState {
  657. return &PeerState{
  658. Peer: peer,
  659. PeerRoundState: PeerRoundState{
  660. Round: -1,
  661. ProposalPOLRound: -1,
  662. LastCommitRound: -1,
  663. CatchupCommitRound: -1,
  664. },
  665. Maj23Queue: make(chan *VoteSetMaj23Message, 2),
  666. }
  667. }
  668. // Returns an atomic snapshot of the PeerRoundState.
  669. // There's no point in mutating it since it won't change PeerState.
  670. func (ps *PeerState) GetRoundState() *PeerRoundState {
  671. ps.mtx.Lock()
  672. defer ps.mtx.Unlock()
  673. prs := ps.PeerRoundState // copy
  674. return &prs
  675. }
  676. // Returns an atomic snapshot of the PeerRoundState's height
  677. // used by the mempool to ensure peers are caught up before broadcasting new txs
  678. func (ps *PeerState) GetHeight() int {
  679. ps.mtx.Lock()
  680. defer ps.mtx.Unlock()
  681. return ps.PeerRoundState.Height
  682. }
  683. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  684. ps.mtx.Lock()
  685. defer ps.mtx.Unlock()
  686. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  687. return
  688. }
  689. if ps.Proposal {
  690. return
  691. }
  692. ps.Proposal = true
  693. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  694. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  695. ps.ProposalPOLRound = proposal.POLRound
  696. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  697. }
  698. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  699. ps.mtx.Lock()
  700. defer ps.mtx.Unlock()
  701. if ps.Height != height || ps.Round != round {
  702. return
  703. }
  704. ps.ProposalBlockParts.SetIndex(index, true)
  705. }
  706. // Convenience function to send vote to peer.
  707. // Returns true if vote was sent.
  708. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  709. if vote, ok := ps.PickVoteToSend(votes); ok {
  710. msg := &VoteMessage{vote}
  711. ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
  712. return true
  713. }
  714. return false
  715. }
  716. // votes: Must be the correct Size() for the Height().
  717. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) {
  718. ps.mtx.Lock()
  719. defer ps.mtx.Unlock()
  720. if votes.Size() == 0 {
  721. return nil, false
  722. }
  723. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  724. // Lazily set data using 'votes'.
  725. if votes.IsCommit() {
  726. ps.ensureCatchupCommitRound(height, round, size)
  727. }
  728. ps.ensureVoteBitArrays(height, size)
  729. psVotes := ps.getVoteBitArray(height, round, type_)
  730. if psVotes == nil {
  731. return nil, false // Not something worth sending
  732. }
  733. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  734. ps.setHasVote(height, round, type_, index)
  735. return votes.GetByIndex(index), true
  736. }
  737. return nil, false
  738. }
  739. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  740. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  741. PanicSanity("Invalid vote type")
  742. }
  743. if ps.Height == height {
  744. if ps.Round == round {
  745. switch type_ {
  746. case types.VoteTypePrevote:
  747. return ps.Prevotes
  748. case types.VoteTypePrecommit:
  749. return ps.Precommits
  750. }
  751. }
  752. if ps.CatchupCommitRound == round {
  753. switch type_ {
  754. case types.VoteTypePrevote:
  755. return nil
  756. case types.VoteTypePrecommit:
  757. return ps.CatchupCommit
  758. }
  759. }
  760. if ps.ProposalPOLRound == round {
  761. switch type_ {
  762. case types.VoteTypePrevote:
  763. return ps.ProposalPOL
  764. case types.VoteTypePrecommit:
  765. return nil
  766. }
  767. }
  768. return nil
  769. }
  770. if ps.Height == height+1 {
  771. if ps.LastCommitRound == round {
  772. switch type_ {
  773. case types.VoteTypePrevote:
  774. return nil
  775. case types.VoteTypePrecommit:
  776. return ps.LastCommit
  777. }
  778. }
  779. return nil
  780. }
  781. return nil
  782. }
  783. // 'round': A round for which we have a +2/3 commit.
  784. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  785. if ps.Height != height {
  786. return
  787. }
  788. /*
  789. NOTE: This is wrong, 'round' could change.
  790. e.g. if orig round is not the same as block LastCommit round.
  791. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  792. PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  793. }
  794. */
  795. if ps.CatchupCommitRound == round {
  796. return // Nothing to do!
  797. }
  798. ps.CatchupCommitRound = round
  799. if round == ps.Round {
  800. ps.CatchupCommit = ps.Precommits
  801. } else {
  802. ps.CatchupCommit = NewBitArray(numValidators)
  803. }
  804. }
  805. // NOTE: It's important to make sure that numValidators actually matches
  806. // what the node sees as the number of validators for height.
  807. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  808. ps.mtx.Lock()
  809. defer ps.mtx.Unlock()
  810. ps.ensureVoteBitArrays(height, numValidators)
  811. }
  812. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  813. if ps.Height == height {
  814. if ps.Prevotes == nil {
  815. ps.Prevotes = NewBitArray(numValidators)
  816. }
  817. if ps.Precommits == nil {
  818. ps.Precommits = NewBitArray(numValidators)
  819. }
  820. if ps.CatchupCommit == nil {
  821. ps.CatchupCommit = NewBitArray(numValidators)
  822. }
  823. if ps.ProposalPOL == nil {
  824. ps.ProposalPOL = NewBitArray(numValidators)
  825. }
  826. } else if ps.Height == height+1 {
  827. if ps.LastCommit == nil {
  828. ps.LastCommit = NewBitArray(numValidators)
  829. }
  830. }
  831. }
  832. func (ps *PeerState) SetHasVote(vote *types.Vote) {
  833. ps.mtx.Lock()
  834. defer ps.mtx.Unlock()
  835. ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
  836. }
  837. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  838. log := log.New("peer", ps.Peer, "peerRound", ps.Round, "height", height, "round", round)
  839. log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  840. // NOTE: some may be nil BitArrays -> no side effects.
  841. ps.getVoteBitArray(height, round, type_).SetIndex(index, true)
  842. }
  843. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  844. ps.mtx.Lock()
  845. defer ps.mtx.Unlock()
  846. // Ignore duplicates or decreases
  847. if CompareHRS(msg.Height, msg.Round, msg.Step, ps.Height, ps.Round, ps.Step) <= 0 {
  848. return
  849. }
  850. // Just remember these values.
  851. psHeight := ps.Height
  852. psRound := ps.Round
  853. //psStep := ps.Step
  854. psCatchupCommitRound := ps.CatchupCommitRound
  855. psCatchupCommit := ps.CatchupCommit
  856. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  857. ps.Height = msg.Height
  858. ps.Round = msg.Round
  859. ps.Step = msg.Step
  860. ps.StartTime = startTime
  861. if psHeight != msg.Height || psRound != msg.Round {
  862. ps.Proposal = false
  863. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  864. ps.ProposalBlockParts = nil
  865. ps.ProposalPOLRound = -1
  866. ps.ProposalPOL = nil
  867. // We'll update the BitArray capacity later.
  868. ps.Prevotes = nil
  869. ps.Precommits = nil
  870. }
  871. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  872. // Peer caught up to CatchupCommitRound.
  873. // Preserve psCatchupCommit!
  874. // NOTE: We prefer to use prs.Precommits if
  875. // pr.Round matches pr.CatchupCommitRound.
  876. ps.Precommits = psCatchupCommit
  877. }
  878. if psHeight != msg.Height {
  879. // Shift Precommits to LastCommit.
  880. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  881. ps.LastCommitRound = msg.LastCommitRound
  882. ps.LastCommit = ps.Precommits
  883. } else {
  884. ps.LastCommitRound = msg.LastCommitRound
  885. ps.LastCommit = nil
  886. }
  887. // We'll update the BitArray capacity later.
  888. ps.CatchupCommitRound = -1
  889. ps.CatchupCommit = nil
  890. }
  891. }
  892. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  893. ps.mtx.Lock()
  894. defer ps.mtx.Unlock()
  895. if ps.Height != msg.Height {
  896. return
  897. }
  898. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  899. ps.ProposalBlockParts = msg.BlockParts
  900. }
  901. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  902. ps.mtx.Lock()
  903. defer ps.mtx.Unlock()
  904. if ps.Height != msg.Height {
  905. return
  906. }
  907. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  908. return
  909. }
  910. // TODO: Merge onto existing ps.ProposalPOL?
  911. // We might have sent some prevotes in the meantime.
  912. ps.ProposalPOL = msg.ProposalPOL
  913. }
  914. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  915. ps.mtx.Lock()
  916. defer ps.mtx.Unlock()
  917. if ps.Height != msg.Height {
  918. return
  919. }
  920. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  921. }
  922. // When a peer claims to have a maj23 for some BlockID at H,R,S,
  923. // we will try to respond with a VoteSetBitsMessage showing which
  924. // bits we already have (and which we don't yet have),
  925. // but that happens in another goroutine.
  926. func (ps *PeerState) ApplyVoteSetMaj23Message(msg *VoteSetMaj23Message) {
  927. // ps.mtx.Lock()
  928. // defer ps.mtx.Unlock()
  929. select {
  930. case ps.Maj23Queue <- msg:
  931. default:
  932. // Just ignore if we're already processing messages.
  933. }
  934. }
  935. // The peer has responded with a bitarray of votes that it has
  936. // of the corresponding BlockID.
  937. // ourVotes: BitArray of votes we have for msg.BlockID
  938. // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
  939. // we conservatively overwrite ps's votes w/ msg.Votes.
  940. func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *BitArray) {
  941. ps.mtx.Lock()
  942. defer ps.mtx.Unlock()
  943. votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
  944. if votes != nil {
  945. if ourVotes == nil {
  946. votes.Update(msg.Votes)
  947. } else {
  948. otherVotes := votes.Sub(ourVotes)
  949. hasVotes := otherVotes.Or(msg.Votes)
  950. votes.Update(hasVotes)
  951. }
  952. }
  953. }
  954. //-----------------------------------------------------------------------------
  955. // Messages
  956. const (
  957. msgTypeNewRoundStep = byte(0x01)
  958. msgTypeCommitStep = byte(0x02)
  959. msgTypeProposal = byte(0x11)
  960. msgTypeProposalPOL = byte(0x12)
  961. msgTypeBlockPart = byte(0x13) // both block & POL
  962. msgTypeVote = byte(0x14)
  963. msgTypeHasVote = byte(0x15)
  964. msgTypeVoteSetMaj23 = byte(0x16)
  965. msgTypeVoteSetBits = byte(0x17)
  966. )
  967. type ConsensusMessage interface{}
  968. var _ = wire.RegisterInterface(
  969. struct{ ConsensusMessage }{},
  970. wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  971. wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  972. wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  973. wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  974. wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  975. wire.ConcreteType{&VoteMessage{}, msgTypeVote},
  976. wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  977. wire.ConcreteType{&VoteSetMaj23Message{}, msgTypeVoteSetMaj23},
  978. wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits},
  979. )
  980. // TODO: check for unnecessary extra bytes at the end.
  981. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  982. msgType = bz[0]
  983. n := new(int)
  984. r := bytes.NewReader(bz)
  985. msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  986. return
  987. }
  988. //-------------------------------------
  989. // For every height/round/step transition
  990. type NewRoundStepMessage struct {
  991. Height int
  992. Round int
  993. Step RoundStepType
  994. SecondsSinceStartTime int
  995. LastCommitRound int
  996. }
  997. func (m *NewRoundStepMessage) String() string {
  998. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  999. m.Height, m.Round, m.Step, m.LastCommitRound)
  1000. }
  1001. //-------------------------------------
  1002. type CommitStepMessage struct {
  1003. Height int
  1004. BlockPartsHeader types.PartSetHeader
  1005. BlockParts *BitArray
  1006. }
  1007. func (m *CommitStepMessage) String() string {
  1008. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  1009. }
  1010. //-------------------------------------
  1011. type ProposalMessage struct {
  1012. Proposal *types.Proposal
  1013. }
  1014. func (m *ProposalMessage) String() string {
  1015. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  1016. }
  1017. //-------------------------------------
  1018. type ProposalPOLMessage struct {
  1019. Height int
  1020. ProposalPOLRound int
  1021. ProposalPOL *BitArray
  1022. }
  1023. func (m *ProposalPOLMessage) String() string {
  1024. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  1025. }
  1026. //-------------------------------------
  1027. type BlockPartMessage struct {
  1028. Height int
  1029. Round int
  1030. Part *types.Part
  1031. }
  1032. func (m *BlockPartMessage) String() string {
  1033. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  1034. }
  1035. //-------------------------------------
  1036. type VoteMessage struct {
  1037. Vote *types.Vote
  1038. }
  1039. func (m *VoteMessage) String() string {
  1040. return fmt.Sprintf("[Vote %v]", m.Vote)
  1041. }
  1042. //-------------------------------------
  1043. type HasVoteMessage struct {
  1044. Height int
  1045. Round int
  1046. Type byte
  1047. Index int
  1048. }
  1049. func (m *HasVoteMessage) String() string {
  1050. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  1051. }
  1052. //-------------------------------------
  1053. type VoteSetMaj23Message struct {
  1054. Height int
  1055. Round int
  1056. Type byte
  1057. BlockID types.BlockID
  1058. }
  1059. func (m *VoteSetMaj23Message) String() string {
  1060. return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
  1061. }
  1062. //-------------------------------------
  1063. type VoteSetBitsMessage struct {
  1064. Height int
  1065. Round int
  1066. Type byte
  1067. BlockID types.BlockID
  1068. Votes *BitArray
  1069. }
  1070. func (m *VoteSetBitsMessage) String() string {
  1071. return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
  1072. }