You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1356 lines
41 KiB

8 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
8 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
8 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
8 years ago
8 years ago
10 years ago
10 years ago
10 years ago
7 years ago
10 years ago
8 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
8 years ago
10 years ago
10 years ago
8 years ago
9 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
8 years ago
8 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. wire "github.com/tendermint/go-wire"
  10. cmn "github.com/tendermint/tmlibs/common"
  11. "github.com/tendermint/tmlibs/log"
  12. "github.com/tendermint/tendermint/p2p"
  13. sm "github.com/tendermint/tendermint/state"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. StateChannel = byte(0x20)
  18. DataChannel = byte(0x21)
  19. VoteChannel = byte(0x22)
  20. VoteSetBitsChannel = byte(0x23)
  21. maxConsensusMessageSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes.
  22. )
  23. //-----------------------------------------------------------------------------
  24. // ConsensusReactor defines a reactor for the consensus service.
  25. type ConsensusReactor struct {
  26. p2p.BaseReactor // BaseService + p2p.Switch
  27. conS *ConsensusState
  28. evsw types.EventSwitch
  29. mtx sync.RWMutex
  30. fastSync bool
  31. }
  32. // NewConsensusReactor returns a new ConsensusReactor with the given consensusState.
  33. func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor {
  34. conR := &ConsensusReactor{
  35. conS: consensusState,
  36. fastSync: fastSync,
  37. }
  38. conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
  39. return conR
  40. }
  41. // OnStart implements BaseService.
  42. func (conR *ConsensusReactor) OnStart() error {
  43. conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync())
  44. conR.BaseReactor.OnStart()
  45. // callbacks for broadcasting new steps and votes to peers
  46. // upon their respective events (ie. uses evsw)
  47. conR.registerEventCallbacks()
  48. if !conR.FastSync() {
  49. _, err := conR.conS.Start()
  50. if err != nil {
  51. return err
  52. }
  53. }
  54. return nil
  55. }
  56. // OnStop implements BaseService
  57. func (conR *ConsensusReactor) OnStop() {
  58. conR.BaseReactor.OnStop()
  59. conR.conS.Stop()
  60. }
  61. // SwitchToConsensus switches from fast_sync mode to consensus mode.
  62. // It resets the state, turns off fast_sync, and starts the consensus state-machine
  63. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  64. conR.Logger.Info("SwitchToConsensus")
  65. conR.conS.reconstructLastCommit(state)
  66. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  67. // broadcast a NewRoundStepMessage.
  68. conR.conS.updateToState(state)
  69. conR.mtx.Lock()
  70. conR.fastSync = false
  71. conR.mtx.Unlock()
  72. conR.conS.Start()
  73. }
  74. // GetChannels implements Reactor
  75. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  76. // TODO optimize
  77. return []*p2p.ChannelDescriptor{
  78. &p2p.ChannelDescriptor{
  79. ID: StateChannel,
  80. Priority: 5,
  81. SendQueueCapacity: 100,
  82. },
  83. &p2p.ChannelDescriptor{
  84. ID: DataChannel, // maybe split between gossiping current block and catchup stuff
  85. Priority: 10, // once we gossip the whole block there's nothing left to send until next height or round
  86. SendQueueCapacity: 100,
  87. RecvBufferCapacity: 50 * 4096,
  88. },
  89. &p2p.ChannelDescriptor{
  90. ID: VoteChannel,
  91. Priority: 5,
  92. SendQueueCapacity: 100,
  93. RecvBufferCapacity: 100 * 100,
  94. },
  95. &p2p.ChannelDescriptor{
  96. ID: VoteSetBitsChannel,
  97. Priority: 1,
  98. SendQueueCapacity: 2,
  99. RecvBufferCapacity: 1024,
  100. },
  101. }
  102. }
  103. // AddPeer implements Reactor
  104. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  105. if !conR.IsRunning() {
  106. return
  107. }
  108. // Create peerState for peer
  109. peerState := NewPeerState(peer)
  110. peer.Data.Set(types.PeerStateKey, peerState)
  111. // Begin routines for this peer.
  112. go conR.gossipDataRoutine(peer, peerState)
  113. go conR.gossipVotesRoutine(peer, peerState)
  114. go conR.queryMaj23Routine(peer, peerState)
  115. // Send our state to peer.
  116. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  117. if !conR.FastSync() {
  118. conR.sendNewRoundStepMessages(peer)
  119. }
  120. }
  121. // RemovePeer implements Reactor
  122. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  123. if !conR.IsRunning() {
  124. return
  125. }
  126. // TODO
  127. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  128. }
  129. // Receive implements Reactor
  130. // NOTE: We process these messages even when we're fast_syncing.
  131. // Messages affect either a peer state or the consensus state.
  132. // Peer state updates can happen in parallel, but processing of
  133. // proposals, block parts, and votes are ordered by the receiveRoutine
  134. // NOTE: blocks on consensus state for proposals, block parts, and votes
  135. func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  136. if !conR.IsRunning() {
  137. conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
  138. return
  139. }
  140. _, msg, err := DecodeMessage(msgBytes)
  141. if err != nil {
  142. conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  143. // TODO punish peer?
  144. return
  145. }
  146. conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  147. // Get peer states
  148. ps := src.Data.Get(types.PeerStateKey).(*PeerState)
  149. switch chID {
  150. case StateChannel:
  151. switch msg := msg.(type) {
  152. case *NewRoundStepMessage:
  153. ps.ApplyNewRoundStepMessage(msg)
  154. case *CommitStepMessage:
  155. ps.ApplyCommitStepMessage(msg)
  156. case *HasVoteMessage:
  157. ps.ApplyHasVoteMessage(msg)
  158. case *VoteSetMaj23Message:
  159. cs := conR.conS
  160. cs.mtx.Lock()
  161. height, votes := cs.Height, cs.Votes
  162. cs.mtx.Unlock()
  163. if height != msg.Height {
  164. return
  165. }
  166. // Peer claims to have a maj23 for some BlockID at H,R,S,
  167. votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID)
  168. // Respond with a VoteSetBitsMessage showing which votes we have.
  169. // (and consequently shows which we don't have)
  170. var ourVotes *cmn.BitArray
  171. switch msg.Type {
  172. case types.VoteTypePrevote:
  173. ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
  174. case types.VoteTypePrecommit:
  175. ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
  176. default:
  177. conR.Logger.Error("Bad VoteSetBitsMessage field Type")
  178. return
  179. }
  180. src.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{
  181. Height: msg.Height,
  182. Round: msg.Round,
  183. Type: msg.Type,
  184. BlockID: msg.BlockID,
  185. Votes: ourVotes,
  186. }})
  187. case *ProposalHeartbeatMessage:
  188. hb := msg.Heartbeat
  189. conR.Logger.Debug("Received proposal heartbeat message",
  190. "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence,
  191. "valIdx", hb.ValidatorIndex, "valAddr", hb.ValidatorAddress)
  192. default:
  193. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  194. }
  195. case DataChannel:
  196. if conR.FastSync() {
  197. conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
  198. return
  199. }
  200. switch msg := msg.(type) {
  201. case *ProposalMessage:
  202. ps.SetHasProposal(msg.Proposal)
  203. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  204. case *ProposalPOLMessage:
  205. ps.ApplyProposalPOLMessage(msg)
  206. case *BlockPartMessage:
  207. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  208. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  209. default:
  210. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  211. }
  212. case VoteChannel:
  213. if conR.FastSync() {
  214. conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
  215. return
  216. }
  217. switch msg := msg.(type) {
  218. case *VoteMessage:
  219. cs := conR.conS
  220. cs.mtx.Lock()
  221. height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
  222. cs.mtx.Unlock()
  223. ps.EnsureVoteBitArrays(height, valSize)
  224. ps.EnsureVoteBitArrays(height-1, lastCommitSize)
  225. ps.SetHasVote(msg.Vote)
  226. cs.peerMsgQueue <- msgInfo{msg, src.Key}
  227. default:
  228. // don't punish (leave room for soft upgrades)
  229. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  230. }
  231. case VoteSetBitsChannel:
  232. if conR.FastSync() {
  233. conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
  234. return
  235. }
  236. switch msg := msg.(type) {
  237. case *VoteSetBitsMessage:
  238. cs := conR.conS
  239. cs.mtx.Lock()
  240. height, votes := cs.Height, cs.Votes
  241. cs.mtx.Unlock()
  242. if height == msg.Height {
  243. var ourVotes *cmn.BitArray
  244. switch msg.Type {
  245. case types.VoteTypePrevote:
  246. ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
  247. case types.VoteTypePrecommit:
  248. ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
  249. default:
  250. conR.Logger.Error("Bad VoteSetBitsMessage field Type")
  251. return
  252. }
  253. ps.ApplyVoteSetBitsMessage(msg, ourVotes)
  254. } else {
  255. ps.ApplyVoteSetBitsMessage(msg, nil)
  256. }
  257. default:
  258. // don't punish (leave room for soft upgrades)
  259. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  260. }
  261. default:
  262. conR.Logger.Error(cmn.Fmt("Unknown chId %X", chID))
  263. }
  264. if err != nil {
  265. conR.Logger.Error("Error in Receive()", "err", err)
  266. }
  267. }
  268. // SetEventSwitch implements events.Eventable
  269. func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) {
  270. conR.evsw = evsw
  271. conR.conS.SetEventSwitch(evsw)
  272. }
  273. // FastSync returns whether the consensus reactor is in fast-sync mode.
  274. func (conR *ConsensusReactor) FastSync() bool {
  275. conR.mtx.RLock()
  276. defer conR.mtx.RUnlock()
  277. return conR.fastSync
  278. }
  279. //--------------------------------------
  280. // Listens for new steps and votes,
  281. // broadcasting the result to peers
  282. func (conR *ConsensusReactor) registerEventCallbacks() {
  283. types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) {
  284. rs := data.Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
  285. conR.broadcastNewRoundStep(rs)
  286. })
  287. types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) {
  288. edv := data.Unwrap().(types.EventDataVote)
  289. conR.broadcastHasVoteMessage(edv.Vote)
  290. })
  291. types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposalHeartbeat(), func(data types.TMEventData) {
  292. heartbeat := data.Unwrap().(types.EventDataProposalHeartbeat)
  293. conR.broadcastProposalHeartbeatMessage(heartbeat)
  294. })
  295. }
  296. func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) {
  297. hb := heartbeat.Heartbeat
  298. conR.Logger.Debug("Broadcasting proposal heartbeat message",
  299. "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence)
  300. msg := &ProposalHeartbeatMessage{hb}
  301. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
  302. }
  303. func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) {
  304. nrsMsg, csMsg := makeRoundStepMessages(rs)
  305. if nrsMsg != nil {
  306. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  307. }
  308. if csMsg != nil {
  309. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{csMsg})
  310. }
  311. }
  312. // Broadcasts HasVoteMessage to peers that care.
  313. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) {
  314. msg := &HasVoteMessage{
  315. Height: vote.Height,
  316. Round: vote.Round,
  317. Type: vote.Type,
  318. Index: vote.ValidatorIndex,
  319. }
  320. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
  321. /*
  322. // TODO: Make this broadcast more selective.
  323. for _, peer := range conR.Switch.Peers().List() {
  324. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  325. prs := ps.GetRoundState()
  326. if prs.Height == vote.Height {
  327. // TODO: Also filter on round?
  328. peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
  329. } else {
  330. // Height doesn't match
  331. // TODO: check a field, maybe CatchupCommitRound?
  332. // TODO: But that requires changing the struct field comment.
  333. }
  334. }
  335. */
  336. }
  337. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  338. nrsMsg = &NewRoundStepMessage{
  339. Height: rs.Height,
  340. Round: rs.Round,
  341. Step: rs.Step,
  342. SecondsSinceStartTime: int(time.Since(rs.StartTime).Seconds()),
  343. LastCommitRound: rs.LastCommit.Round(),
  344. }
  345. if rs.Step == RoundStepCommit {
  346. csMsg = &CommitStepMessage{
  347. Height: rs.Height,
  348. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  349. BlockParts: rs.ProposalBlockParts.BitArray(),
  350. }
  351. }
  352. return
  353. }
  354. func (conR *ConsensusReactor) sendNewRoundStepMessages(peer *p2p.Peer) {
  355. rs := conR.conS.GetRoundState()
  356. nrsMsg, csMsg := makeRoundStepMessages(rs)
  357. if nrsMsg != nil {
  358. peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  359. }
  360. if csMsg != nil {
  361. peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg})
  362. }
  363. }
  364. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  365. logger := conR.Logger.With("peer", peer)
  366. OUTER_LOOP:
  367. for {
  368. // Manage disconnects from self or peer.
  369. if !peer.IsRunning() || !conR.IsRunning() {
  370. logger.Info("Stopping gossipDataRoutine for peer")
  371. return
  372. }
  373. rs := conR.conS.GetRoundState()
  374. prs := ps.GetRoundState()
  375. // Send proposal Block parts?
  376. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  377. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  378. part := rs.ProposalBlockParts.GetPart(index)
  379. msg := &BlockPartMessage{
  380. Height: rs.Height, // This tells peer that this part applies to us.
  381. Round: rs.Round, // This tells peer that this part applies to us.
  382. Part: part,
  383. }
  384. logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
  385. if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
  386. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  387. }
  388. continue OUTER_LOOP
  389. }
  390. }
  391. // If the peer is on a previous height, help catch up.
  392. if (0 < prs.Height) && (prs.Height < rs.Height) {
  393. heightLogger := logger.With("height", prs.Height)
  394. conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer)
  395. continue OUTER_LOOP
  396. }
  397. // If height and round don't match, sleep.
  398. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  399. //logger.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  400. time.Sleep(conR.conS.config.PeerGossipSleep())
  401. continue OUTER_LOOP
  402. }
  403. // By here, height and round match.
  404. // Proposal block parts were already matched and sent if any were wanted.
  405. // (These can match on hash so the round doesn't matter)
  406. // Now consider sending other things, like the Proposal itself.
  407. // Send Proposal && ProposalPOL BitArray?
  408. if rs.Proposal != nil && !prs.Proposal {
  409. // Proposal: share the proposal metadata with peer.
  410. {
  411. msg := &ProposalMessage{Proposal: rs.Proposal}
  412. logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
  413. if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
  414. ps.SetHasProposal(rs.Proposal)
  415. }
  416. }
  417. // ProposalPOL: lets peer know which POL votes we have so far.
  418. // Peer must receive ProposalMessage first.
  419. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  420. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  421. if 0 <= rs.Proposal.POLRound {
  422. msg := &ProposalPOLMessage{
  423. Height: rs.Height,
  424. ProposalPOLRound: rs.Proposal.POLRound,
  425. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  426. }
  427. logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
  428. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  429. }
  430. continue OUTER_LOOP
  431. }
  432. // Nothing to do. Sleep.
  433. time.Sleep(conR.conS.config.PeerGossipSleep())
  434. continue OUTER_LOOP
  435. }
  436. }
  437. func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *RoundState,
  438. prs *PeerRoundState, ps *PeerState, peer *p2p.Peer) {
  439. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  440. // Ensure that the peer's PartSetHeader is correct
  441. blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
  442. if blockMeta == nil {
  443. logger.Error("Failed to load block meta",
  444. "ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height())
  445. time.Sleep(conR.conS.config.PeerGossipSleep())
  446. return
  447. } else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  448. logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  449. "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  450. time.Sleep(conR.conS.config.PeerGossipSleep())
  451. return
  452. }
  453. // Load the part
  454. part := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
  455. if part == nil {
  456. logger.Error("Could not load part", "index", index,
  457. "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  458. time.Sleep(conR.conS.config.PeerGossipSleep())
  459. return
  460. }
  461. // Send the part
  462. msg := &BlockPartMessage{
  463. Height: prs.Height, // Not our height, so it doesn't matter.
  464. Round: prs.Round, // Not our height, so it doesn't matter.
  465. Part: part,
  466. }
  467. logger.Debug("Sending block part for catchup", "round", prs.Round)
  468. if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
  469. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  470. }
  471. return
  472. } else {
  473. //logger.Info("No parts to send in catch-up, sleeping")
  474. time.Sleep(conR.conS.config.PeerGossipSleep())
  475. return
  476. }
  477. }
  478. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  479. logger := conR.Logger.With("peer", peer)
  480. // Simple hack to throttle logs upon sleep.
  481. var sleeping = 0
  482. OUTER_LOOP:
  483. for {
  484. // Manage disconnects from self or peer.
  485. if !peer.IsRunning() || !conR.IsRunning() {
  486. logger.Info("Stopping gossipVotesRoutine for peer")
  487. return
  488. }
  489. rs := conR.conS.GetRoundState()
  490. prs := ps.GetRoundState()
  491. switch sleeping {
  492. case 1: // First sleep
  493. sleeping = 2
  494. case 2: // No more sleep
  495. sleeping = 0
  496. }
  497. //logger.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  498. // "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  499. // If height matches, then send LastCommit, Prevotes, Precommits.
  500. if rs.Height == prs.Height {
  501. heightLogger := logger.With("height", prs.Height)
  502. if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
  503. continue OUTER_LOOP
  504. }
  505. }
  506. // Special catchup logic.
  507. // If peer is lagging by height 1, send LastCommit.
  508. if prs.Height != 0 && rs.Height == prs.Height+1 {
  509. if ps.PickSendVote(rs.LastCommit) {
  510. logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
  511. continue OUTER_LOOP
  512. }
  513. }
  514. // Catchup logic
  515. // If peer is lagging by more than 1, send Commit.
  516. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  517. // Load the block commit for prs.Height,
  518. // which contains precommit signatures for prs.Height.
  519. commit := conR.conS.blockStore.LoadBlockCommit(prs.Height)
  520. logger.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit)
  521. if ps.PickSendVote(commit) {
  522. logger.Debug("Picked Catchup commit to send", "height", prs.Height)
  523. continue OUTER_LOOP
  524. }
  525. }
  526. if sleeping == 0 {
  527. // We sent nothing. Sleep...
  528. sleeping = 1
  529. logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
  530. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  531. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  532. } else if sleeping == 2 {
  533. // Continued sleep...
  534. sleeping = 1
  535. }
  536. time.Sleep(conR.conS.config.PeerGossipSleep())
  537. continue OUTER_LOOP
  538. }
  539. }
  540. func (conR *ConsensusReactor) gossipVotesForHeight(logger log.Logger, rs *RoundState, prs *PeerRoundState, ps *PeerState) bool {
  541. // If there are lastCommits to send...
  542. if prs.Step == RoundStepNewHeight {
  543. if ps.PickSendVote(rs.LastCommit) {
  544. logger.Debug("Picked rs.LastCommit to send")
  545. return true
  546. }
  547. }
  548. // If there are prevotes to send...
  549. if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
  550. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  551. logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
  552. return true
  553. }
  554. }
  555. // If there are precommits to send...
  556. if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
  557. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  558. logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round)
  559. return true
  560. }
  561. }
  562. // If there are POLPrevotes to send...
  563. if prs.ProposalPOLRound != -1 {
  564. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  565. if ps.PickSendVote(polPrevotes) {
  566. logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
  567. "round", prs.ProposalPOLRound)
  568. return true
  569. }
  570. }
  571. }
  572. return false
  573. }
  574. // NOTE: `queryMaj23Routine` has a simple crude design since it only comes
  575. // into play for liveness when there's a signature DDoS attack happening.
  576. func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) {
  577. logger := conR.Logger.With("peer", peer)
  578. OUTER_LOOP:
  579. for {
  580. // Manage disconnects from self or peer.
  581. if !peer.IsRunning() || !conR.IsRunning() {
  582. logger.Info("Stopping queryMaj23Routine for peer")
  583. return
  584. }
  585. // Maybe send Height/Round/Prevotes
  586. {
  587. rs := conR.conS.GetRoundState()
  588. prs := ps.GetRoundState()
  589. if rs.Height == prs.Height {
  590. if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
  591. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  592. Height: prs.Height,
  593. Round: prs.Round,
  594. Type: types.VoteTypePrevote,
  595. BlockID: maj23,
  596. }})
  597. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  598. }
  599. }
  600. }
  601. // Maybe send Height/Round/Precommits
  602. {
  603. rs := conR.conS.GetRoundState()
  604. prs := ps.GetRoundState()
  605. if rs.Height == prs.Height {
  606. if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
  607. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  608. Height: prs.Height,
  609. Round: prs.Round,
  610. Type: types.VoteTypePrecommit,
  611. BlockID: maj23,
  612. }})
  613. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  614. }
  615. }
  616. }
  617. // Maybe send Height/Round/ProposalPOL
  618. {
  619. rs := conR.conS.GetRoundState()
  620. prs := ps.GetRoundState()
  621. if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
  622. if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
  623. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  624. Height: prs.Height,
  625. Round: prs.ProposalPOLRound,
  626. Type: types.VoteTypePrevote,
  627. BlockID: maj23,
  628. }})
  629. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  630. }
  631. }
  632. }
  633. // Little point sending LastCommitRound/LastCommit,
  634. // These are fleeting and non-blocking.
  635. // Maybe send Height/CatchupCommitRound/CatchupCommit.
  636. {
  637. prs := ps.GetRoundState()
  638. if prs.CatchupCommitRound != -1 && 0 < prs.Height && prs.Height <= conR.conS.blockStore.Height() {
  639. commit := conR.conS.LoadCommit(prs.Height)
  640. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  641. Height: prs.Height,
  642. Round: commit.Round(),
  643. Type: types.VoteTypePrecommit,
  644. BlockID: commit.BlockID,
  645. }})
  646. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  647. }
  648. }
  649. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  650. continue OUTER_LOOP
  651. }
  652. }
  653. // String returns a string representation of the ConsensusReactor.
  654. // NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables.
  655. // TODO: improve!
  656. func (conR *ConsensusReactor) String() string {
  657. // better not to access shared variables
  658. return "ConsensusReactor" // conR.StringIndented("")
  659. }
  660. // StringIndented returns an indented string representation of the ConsensusReactor
  661. func (conR *ConsensusReactor) StringIndented(indent string) string {
  662. s := "ConsensusReactor{\n"
  663. s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n"
  664. for _, peer := range conR.Switch.Peers().List() {
  665. ps := peer.Data.Get(types.PeerStateKey).(*PeerState)
  666. s += indent + " " + ps.StringIndented(indent+" ") + "\n"
  667. }
  668. s += indent + "}"
  669. return s
  670. }
  671. //-----------------------------------------------------------------------------
  672. // PeerRoundState contains the known state of a peer.
  673. // NOTE: Read-only when returned by PeerState.GetRoundState().
  674. type PeerRoundState struct {
  675. Height int // Height peer is at
  676. Round int // Round peer is at, -1 if unknown.
  677. Step RoundStepType // Step peer is at
  678. StartTime time.Time // Estimated start of round 0 at this height
  679. Proposal bool // True if peer has proposal for this round
  680. ProposalBlockPartsHeader types.PartSetHeader //
  681. ProposalBlockParts *cmn.BitArray //
  682. ProposalPOLRound int // Proposal's POL round. -1 if none.
  683. ProposalPOL *cmn.BitArray // nil until ProposalPOLMessage received.
  684. Prevotes *cmn.BitArray // All votes peer has for this round
  685. Precommits *cmn.BitArray // All precommits peer has for this round
  686. LastCommitRound int // Round of commit for last height. -1 if none.
  687. LastCommit *cmn.BitArray // All commit precommits of commit for last height.
  688. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none.
  689. CatchupCommit *cmn.BitArray // All commit precommits peer has for this height & CatchupCommitRound
  690. }
  691. // String returns a string representation of the PeerRoundState
  692. func (prs PeerRoundState) String() string {
  693. return prs.StringIndented("")
  694. }
  695. // StringIndented returns a string representation of the PeerRoundState
  696. func (prs PeerRoundState) StringIndented(indent string) string {
  697. return fmt.Sprintf(`PeerRoundState{
  698. %s %v/%v/%v @%v
  699. %s Proposal %v -> %v
  700. %s POL %v (round %v)
  701. %s Prevotes %v
  702. %s Precommits %v
  703. %s LastCommit %v (round %v)
  704. %s Catchup %v (round %v)
  705. %s}`,
  706. indent, prs.Height, prs.Round, prs.Step, prs.StartTime,
  707. indent, prs.ProposalBlockPartsHeader, prs.ProposalBlockParts,
  708. indent, prs.ProposalPOL, prs.ProposalPOLRound,
  709. indent, prs.Prevotes,
  710. indent, prs.Precommits,
  711. indent, prs.LastCommit, prs.LastCommitRound,
  712. indent, prs.CatchupCommit, prs.CatchupCommitRound,
  713. indent)
  714. }
  715. //-----------------------------------------------------------------------------
  716. var (
  717. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  718. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  719. )
  720. // PeerState contains the known state of a peer, including its connection
  721. // and threadsafe access to its PeerRoundState.
  722. type PeerState struct {
  723. Peer *p2p.Peer
  724. mtx sync.Mutex
  725. PeerRoundState
  726. }
  727. // NewPeerState returns a new PeerState for the given Peer
  728. func NewPeerState(peer *p2p.Peer) *PeerState {
  729. return &PeerState{
  730. Peer: peer,
  731. PeerRoundState: PeerRoundState{
  732. Round: -1,
  733. ProposalPOLRound: -1,
  734. LastCommitRound: -1,
  735. CatchupCommitRound: -1,
  736. },
  737. }
  738. }
  739. // GetRoundState returns an atomic snapshot of the PeerRoundState.
  740. // There's no point in mutating it since it won't change PeerState.
  741. func (ps *PeerState) GetRoundState() *PeerRoundState {
  742. ps.mtx.Lock()
  743. defer ps.mtx.Unlock()
  744. prs := ps.PeerRoundState // copy
  745. return &prs
  746. }
  747. // GetHeight returns an atomic snapshot of the PeerRoundState's height
  748. // used by the mempool to ensure peers are caught up before broadcasting new txs
  749. func (ps *PeerState) GetHeight() int {
  750. ps.mtx.Lock()
  751. defer ps.mtx.Unlock()
  752. return ps.PeerRoundState.Height
  753. }
  754. // SetHasProposal sets the given proposal as known for the peer.
  755. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  756. ps.mtx.Lock()
  757. defer ps.mtx.Unlock()
  758. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  759. return
  760. }
  761. if ps.Proposal {
  762. return
  763. }
  764. ps.Proposal = true
  765. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  766. ps.ProposalBlockParts = cmn.NewBitArray(proposal.BlockPartsHeader.Total)
  767. ps.ProposalPOLRound = proposal.POLRound
  768. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  769. }
  770. // SetHasProposalBlockPart sets the given block part index as known for the peer.
  771. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  772. ps.mtx.Lock()
  773. defer ps.mtx.Unlock()
  774. if ps.Height != height || ps.Round != round {
  775. return
  776. }
  777. ps.ProposalBlockParts.SetIndex(index, true)
  778. }
  779. // PickSendVote picks a vote and sends it to the peer.
  780. // Returns true if vote was sent.
  781. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
  782. if vote, ok := ps.PickVoteToSend(votes); ok {
  783. msg := &VoteMessage{vote}
  784. return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
  785. }
  786. return false
  787. }
  788. // PickVoteToSend picks a vote to send to the peer.
  789. // Returns true if a vote was picked.
  790. // NOTE: `votes` must be the correct Size() for the Height().
  791. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) {
  792. ps.mtx.Lock()
  793. defer ps.mtx.Unlock()
  794. if votes.Size() == 0 {
  795. return nil, false
  796. }
  797. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  798. // Lazily set data using 'votes'.
  799. if votes.IsCommit() {
  800. ps.ensureCatchupCommitRound(height, round, size)
  801. }
  802. ps.ensureVoteBitArrays(height, size)
  803. psVotes := ps.getVoteBitArray(height, round, type_)
  804. if psVotes == nil {
  805. return nil, false // Not something worth sending
  806. }
  807. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  808. ps.setHasVote(height, round, type_, index)
  809. return votes.GetByIndex(index), true
  810. }
  811. return nil, false
  812. }
  813. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *cmn.BitArray {
  814. if !types.IsVoteTypeValid(type_) {
  815. return nil
  816. }
  817. if ps.Height == height {
  818. if ps.Round == round {
  819. switch type_ {
  820. case types.VoteTypePrevote:
  821. return ps.Prevotes
  822. case types.VoteTypePrecommit:
  823. return ps.Precommits
  824. }
  825. }
  826. if ps.CatchupCommitRound == round {
  827. switch type_ {
  828. case types.VoteTypePrevote:
  829. return nil
  830. case types.VoteTypePrecommit:
  831. return ps.CatchupCommit
  832. }
  833. }
  834. if ps.ProposalPOLRound == round {
  835. switch type_ {
  836. case types.VoteTypePrevote:
  837. return ps.ProposalPOL
  838. case types.VoteTypePrecommit:
  839. return nil
  840. }
  841. }
  842. return nil
  843. }
  844. if ps.Height == height+1 {
  845. if ps.LastCommitRound == round {
  846. switch type_ {
  847. case types.VoteTypePrevote:
  848. return nil
  849. case types.VoteTypePrecommit:
  850. return ps.LastCommit
  851. }
  852. }
  853. return nil
  854. }
  855. return nil
  856. }
  857. // 'round': A round for which we have a +2/3 commit.
  858. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  859. if ps.Height != height {
  860. return
  861. }
  862. /*
  863. NOTE: This is wrong, 'round' could change.
  864. e.g. if orig round is not the same as block LastCommit round.
  865. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  866. cmn.PanicSanity(cmn.Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  867. }
  868. */
  869. if ps.CatchupCommitRound == round {
  870. return // Nothing to do!
  871. }
  872. ps.CatchupCommitRound = round
  873. if round == ps.Round {
  874. ps.CatchupCommit = ps.Precommits
  875. } else {
  876. ps.CatchupCommit = cmn.NewBitArray(numValidators)
  877. }
  878. }
  879. // EnsureVoteVitArrays ensures the bit-arrays have been allocated for tracking
  880. // what votes this peer has received.
  881. // NOTE: It's important to make sure that numValidators actually matches
  882. // what the node sees as the number of validators for height.
  883. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  884. ps.mtx.Lock()
  885. defer ps.mtx.Unlock()
  886. ps.ensureVoteBitArrays(height, numValidators)
  887. }
  888. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  889. if ps.Height == height {
  890. if ps.Prevotes == nil {
  891. ps.Prevotes = cmn.NewBitArray(numValidators)
  892. }
  893. if ps.Precommits == nil {
  894. ps.Precommits = cmn.NewBitArray(numValidators)
  895. }
  896. if ps.CatchupCommit == nil {
  897. ps.CatchupCommit = cmn.NewBitArray(numValidators)
  898. }
  899. if ps.ProposalPOL == nil {
  900. ps.ProposalPOL = cmn.NewBitArray(numValidators)
  901. }
  902. } else if ps.Height == height+1 {
  903. if ps.LastCommit == nil {
  904. ps.LastCommit = cmn.NewBitArray(numValidators)
  905. }
  906. }
  907. }
  908. // SetHasVote sets the given vote as known by the peer
  909. func (ps *PeerState) SetHasVote(vote *types.Vote) {
  910. ps.mtx.Lock()
  911. defer ps.mtx.Unlock()
  912. ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
  913. }
  914. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  915. logger := ps.Peer.Logger.With("peerRound", ps.Round, "height", height, "round", round)
  916. logger.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  917. // NOTE: some may be nil BitArrays -> no side effects.
  918. psVotes := ps.getVoteBitArray(height, round, type_)
  919. if psVotes != nil {
  920. psVotes.SetIndex(index, true)
  921. }
  922. }
  923. // ApplyNewRoundStepMessage updates the peer state for the new round.
  924. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  925. ps.mtx.Lock()
  926. defer ps.mtx.Unlock()
  927. // Ignore duplicates or decreases
  928. if CompareHRS(msg.Height, msg.Round, msg.Step, ps.Height, ps.Round, ps.Step) <= 0 {
  929. return
  930. }
  931. // Just remember these values.
  932. psHeight := ps.Height
  933. psRound := ps.Round
  934. //psStep := ps.Step
  935. psCatchupCommitRound := ps.CatchupCommitRound
  936. psCatchupCommit := ps.CatchupCommit
  937. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  938. ps.Height = msg.Height
  939. ps.Round = msg.Round
  940. ps.Step = msg.Step
  941. ps.StartTime = startTime
  942. if psHeight != msg.Height || psRound != msg.Round {
  943. ps.Proposal = false
  944. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  945. ps.ProposalBlockParts = nil
  946. ps.ProposalPOLRound = -1
  947. ps.ProposalPOL = nil
  948. // We'll update the BitArray capacity later.
  949. ps.Prevotes = nil
  950. ps.Precommits = nil
  951. }
  952. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  953. // Peer caught up to CatchupCommitRound.
  954. // Preserve psCatchupCommit!
  955. // NOTE: We prefer to use prs.Precommits if
  956. // pr.Round matches pr.CatchupCommitRound.
  957. ps.Precommits = psCatchupCommit
  958. }
  959. if psHeight != msg.Height {
  960. // Shift Precommits to LastCommit.
  961. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  962. ps.LastCommitRound = msg.LastCommitRound
  963. ps.LastCommit = ps.Precommits
  964. } else {
  965. ps.LastCommitRound = msg.LastCommitRound
  966. ps.LastCommit = nil
  967. }
  968. // We'll update the BitArray capacity later.
  969. ps.CatchupCommitRound = -1
  970. ps.CatchupCommit = nil
  971. }
  972. }
  973. // ApplyCommitStepMessage updates the peer state for the new commit.
  974. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  975. ps.mtx.Lock()
  976. defer ps.mtx.Unlock()
  977. if ps.Height != msg.Height {
  978. return
  979. }
  980. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  981. ps.ProposalBlockParts = msg.BlockParts
  982. }
  983. // ApplyProposalPOLMessage updates the peer state for the new proposal POL.
  984. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  985. ps.mtx.Lock()
  986. defer ps.mtx.Unlock()
  987. if ps.Height != msg.Height {
  988. return
  989. }
  990. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  991. return
  992. }
  993. // TODO: Merge onto existing ps.ProposalPOL?
  994. // We might have sent some prevotes in the meantime.
  995. ps.ProposalPOL = msg.ProposalPOL
  996. }
  997. // ApplyHasVoteMessage updates the peer state for the new vote.
  998. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  999. ps.mtx.Lock()
  1000. defer ps.mtx.Unlock()
  1001. if ps.Height != msg.Height {
  1002. return
  1003. }
  1004. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  1005. }
  1006. // ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
  1007. // it claims to have for the corresponding BlockID.
  1008. // `ourVotes` is a BitArray of votes we have for msg.BlockID
  1009. // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
  1010. // we conservatively overwrite ps's votes w/ msg.Votes.
  1011. func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *cmn.BitArray) {
  1012. ps.mtx.Lock()
  1013. defer ps.mtx.Unlock()
  1014. votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
  1015. if votes != nil {
  1016. if ourVotes == nil {
  1017. votes.Update(msg.Votes)
  1018. } else {
  1019. otherVotes := votes.Sub(ourVotes)
  1020. hasVotes := otherVotes.Or(msg.Votes)
  1021. votes.Update(hasVotes)
  1022. }
  1023. }
  1024. }
  1025. // String returns a string representation of the PeerState
  1026. func (ps *PeerState) String() string {
  1027. return ps.StringIndented("")
  1028. }
  1029. // StringIndented returns a string representation of the PeerState
  1030. func (ps *PeerState) StringIndented(indent string) string {
  1031. return fmt.Sprintf(`PeerState{
  1032. %s Key %v
  1033. %s PRS %v
  1034. %s}`,
  1035. indent, ps.Peer.Key,
  1036. indent, ps.PeerRoundState.StringIndented(indent+" "),
  1037. indent)
  1038. }
  1039. //-----------------------------------------------------------------------------
  1040. // Messages
  1041. const (
  1042. msgTypeNewRoundStep = byte(0x01)
  1043. msgTypeCommitStep = byte(0x02)
  1044. msgTypeProposal = byte(0x11)
  1045. msgTypeProposalPOL = byte(0x12)
  1046. msgTypeBlockPart = byte(0x13) // both block & POL
  1047. msgTypeVote = byte(0x14)
  1048. msgTypeHasVote = byte(0x15)
  1049. msgTypeVoteSetMaj23 = byte(0x16)
  1050. msgTypeVoteSetBits = byte(0x17)
  1051. msgTypeProposalHeartbeat = byte(0x20)
  1052. )
  1053. // ConsensusMessage is a message that can be sent and received on the ConsensusReactor
  1054. type ConsensusMessage interface{}
  1055. var _ = wire.RegisterInterface(
  1056. struct{ ConsensusMessage }{},
  1057. wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  1058. wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  1059. wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  1060. wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  1061. wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  1062. wire.ConcreteType{&VoteMessage{}, msgTypeVote},
  1063. wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  1064. wire.ConcreteType{&VoteSetMaj23Message{}, msgTypeVoteSetMaj23},
  1065. wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits},
  1066. wire.ConcreteType{&ProposalHeartbeatMessage{}, msgTypeProposalHeartbeat},
  1067. )
  1068. // DecodeMessage decodes the given bytes into a ConsensusMessage.
  1069. // TODO: check for unnecessary extra bytes at the end.
  1070. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  1071. msgType = bz[0]
  1072. n := new(int)
  1073. r := bytes.NewReader(bz)
  1074. msgI := wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err)
  1075. msg = msgI.(struct{ ConsensusMessage }).ConsensusMessage
  1076. return
  1077. }
  1078. //-------------------------------------
  1079. // NewRoundStepMessage is sent for every step taken in the ConsensusState.
  1080. // For every height/round/step transition
  1081. type NewRoundStepMessage struct {
  1082. Height int
  1083. Round int
  1084. Step RoundStepType
  1085. SecondsSinceStartTime int
  1086. LastCommitRound int
  1087. }
  1088. // String returns a string representation.
  1089. func (m *NewRoundStepMessage) String() string {
  1090. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  1091. m.Height, m.Round, m.Step, m.LastCommitRound)
  1092. }
  1093. //-------------------------------------
  1094. // CommitStepMessage is sent when a block is committed.
  1095. type CommitStepMessage struct {
  1096. Height int
  1097. BlockPartsHeader types.PartSetHeader
  1098. BlockParts *cmn.BitArray
  1099. }
  1100. // String returns a string representation.
  1101. func (m *CommitStepMessage) String() string {
  1102. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  1103. }
  1104. //-------------------------------------
  1105. // ProposalMessage is sent when a new block is proposed.
  1106. type ProposalMessage struct {
  1107. Proposal *types.Proposal
  1108. }
  1109. // String returns a string representation.
  1110. func (m *ProposalMessage) String() string {
  1111. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  1112. }
  1113. //-------------------------------------
  1114. // ProposalPOLMessage is sent when a previous proposal is re-proposed.
  1115. type ProposalPOLMessage struct {
  1116. Height int
  1117. ProposalPOLRound int
  1118. ProposalPOL *cmn.BitArray
  1119. }
  1120. // String returns a string representation.
  1121. func (m *ProposalPOLMessage) String() string {
  1122. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  1123. }
  1124. //-------------------------------------
  1125. // BlockPartMessage is sent when gossipping a piece of the proposed block.
  1126. type BlockPartMessage struct {
  1127. Height int
  1128. Round int
  1129. Part *types.Part
  1130. }
  1131. // String returns a string representation.
  1132. func (m *BlockPartMessage) String() string {
  1133. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  1134. }
  1135. //-------------------------------------
  1136. // VoteMessage is sent when voting for a proposal (or lack thereof).
  1137. type VoteMessage struct {
  1138. Vote *types.Vote
  1139. }
  1140. // String returns a string representation.
  1141. func (m *VoteMessage) String() string {
  1142. return fmt.Sprintf("[Vote %v]", m.Vote)
  1143. }
  1144. //-------------------------------------
  1145. // HasVoteMessage is sent to indicate that a particular vote has been received.
  1146. type HasVoteMessage struct {
  1147. Height int
  1148. Round int
  1149. Type byte
  1150. Index int
  1151. }
  1152. // String returns a string representation.
  1153. func (m *HasVoteMessage) String() string {
  1154. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  1155. }
  1156. //-------------------------------------
  1157. // VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
  1158. type VoteSetMaj23Message struct {
  1159. Height int
  1160. Round int
  1161. Type byte
  1162. BlockID types.BlockID
  1163. }
  1164. // String returns a string representation.
  1165. func (m *VoteSetMaj23Message) String() string {
  1166. return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
  1167. }
  1168. //-------------------------------------
  1169. // VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID.
  1170. type VoteSetBitsMessage struct {
  1171. Height int
  1172. Round int
  1173. Type byte
  1174. BlockID types.BlockID
  1175. Votes *cmn.BitArray
  1176. }
  1177. // String returns a string representation.
  1178. func (m *VoteSetBitsMessage) String() string {
  1179. return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
  1180. }
  1181. //-------------------------------------
  1182. // ProposalHeartbeatMessage is sent to signal that a node is alive and waiting for transactions for a proposal.
  1183. type ProposalHeartbeatMessage struct {
  1184. Heartbeat *types.Heartbeat
  1185. }
  1186. // String returns a string representation.
  1187. func (m *ProposalHeartbeatMessage) String() string {
  1188. return fmt.Sprintf("[HEARTBEAT %v]", m.Heartbeat)
  1189. }