You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

996 lines
28 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/go-p2p"
  11. "github.com/tendermint/go-wire"
  12. bc "github.com/tendermint/tendermint/blockchain"
  13. "github.com/tendermint/tendermint/events"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. const (
  18. StateChannel = byte(0x20)
  19. DataChannel = byte(0x21)
  20. VoteChannel = byte(0x22)
  21. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  22. maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.
  23. )
  24. //-----------------------------------------------------------------------------
  25. type ConsensusReactor struct {
  26. p2p.BaseReactor // QuitService + p2p.Switch
  27. blockStore *bc.BlockStore
  28. conS *ConsensusState
  29. fastSync bool
  30. evsw events.Fireable
  31. }
  32. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
  33. conR := &ConsensusReactor{
  34. blockStore: blockStore,
  35. conS: consensusState,
  36. fastSync: fastSync,
  37. }
  38. conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR)
  39. return conR
  40. }
  41. func (conR *ConsensusReactor) OnStart() error {
  42. log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
  43. conR.BaseReactor.OnStart()
  44. if !conR.fastSync {
  45. _, err := conR.conS.Start()
  46. if err != nil {
  47. return err
  48. }
  49. }
  50. go conR.broadcastNewRoundStepRoutine()
  51. return nil
  52. }
  53. func (conR *ConsensusReactor) OnStop() {
  54. conR.BaseReactor.OnStop()
  55. conR.conS.Stop()
  56. }
  57. // Switch from the fast_sync to the consensus:
  58. // reset the state, turn off fast_sync, start the consensus-state-machine
  59. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  60. log.Notice("SwitchToConsensus")
  61. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  62. // broadcast a NewRoundStepMessage.
  63. conR.conS.updateToState(state)
  64. conR.fastSync = false
  65. conR.conS.Start()
  66. }
  67. // Implements Reactor
  68. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  69. // TODO optimize
  70. return []*p2p.ChannelDescriptor{
  71. &p2p.ChannelDescriptor{
  72. ID: StateChannel,
  73. Priority: 5,
  74. SendQueueCapacity: 100,
  75. },
  76. &p2p.ChannelDescriptor{
  77. ID: DataChannel,
  78. Priority: 5,
  79. SendQueueCapacity: 2,
  80. },
  81. &p2p.ChannelDescriptor{
  82. ID: VoteChannel,
  83. Priority: 5,
  84. SendQueueCapacity: 40,
  85. },
  86. }
  87. }
  88. // Implements Reactor
  89. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  90. if !conR.IsRunning() {
  91. return
  92. }
  93. // Create peerState for peer
  94. peerState := NewPeerState(peer)
  95. peer.Data.Set(types.PeerStateKey, peerState)
  96. // Begin gossip routines for this peer.
  97. go conR.gossipDataRoutine(peer, peerState)
  98. go conR.gossipVotesRoutine(peer, peerState)
  99. // Send our state to peer.
  100. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  101. if !conR.fastSync {
  102. conR.sendNewRoundStepMessage(peer)
  103. }
  104. }
  105. // Implements Reactor
  106. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. if !conR.IsRunning() {
  108. return
  109. }
  110. // TODO
  111. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  112. }
  113. // Implements Reactor
  114. // NOTE: We process these messages even when we're fast_syncing.
  115. // Messages affect either a peer state or the consensus state.
  116. // Peer state updates can happen in parallel, but processing of
  117. // proposals, block parts, and votes are ordered.
  118. func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
  119. if !conR.IsRunning() {
  120. log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes)
  121. return
  122. }
  123. _, msg, err := DecodeMessage(msgBytes)
  124. if err != nil {
  125. log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
  126. // TODO punish peer?
  127. return
  128. }
  129. log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg)
  130. // Get peer states
  131. ps := peer.Data.Get(types.PeerStateKey).(*PeerState)
  132. switch chID {
  133. case StateChannel:
  134. switch msg := msg.(type) {
  135. case *NewRoundStepMessage:
  136. ps.ApplyNewRoundStepMessage(msg)
  137. case *CommitStepMessage:
  138. ps.ApplyCommitStepMessage(msg)
  139. case *HasVoteMessage:
  140. ps.ApplyHasVoteMessage(msg)
  141. default:
  142. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  143. }
  144. case DataChannel:
  145. if conR.fastSync {
  146. log.Warn("Ignoring message received during fastSync", "msg", msg)
  147. return
  148. }
  149. switch msg := msg.(type) {
  150. case *ProposalMessage:
  151. ps.SetHasProposal(msg.Proposal)
  152. conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key}
  153. case *ProposalPOLMessage:
  154. ps.ApplyProposalPOLMessage(msg)
  155. case *BlockPartMessage:
  156. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  157. conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key}
  158. default:
  159. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  160. }
  161. case VoteChannel:
  162. if conR.fastSync {
  163. log.Warn("Ignoring message received during fastSync", "msg", msg)
  164. return
  165. }
  166. switch msg := msg.(type) {
  167. case *VoteMessage:
  168. cs := conR.conS
  169. cs.mtx.Lock()
  170. height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
  171. cs.mtx.Unlock()
  172. ps.EnsureVoteBitArrays(height, valSize)
  173. ps.EnsureVoteBitArrays(height-1, lastCommitSize)
  174. ps.SetHasVote(msg.Vote, msg.ValidatorIndex)
  175. conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key}
  176. default:
  177. // don't punish (leave room for soft upgrades)
  178. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  179. }
  180. default:
  181. log.Warn(Fmt("Unknown channel %X", chID))
  182. }
  183. if err != nil {
  184. log.Warn("Error in Receive()", "error", err)
  185. }
  186. }
  187. // Broadcasts HasVoteMessage to peers that care.
  188. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  189. msg := &HasVoteMessage{
  190. Height: vote.Height,
  191. Round: vote.Round,
  192. Type: vote.Type,
  193. Index: index,
  194. }
  195. conR.Switch.Broadcast(StateChannel, msg)
  196. /*
  197. // TODO: Make this broadcast more selective.
  198. for _, peer := range conR.Switch.Peers().List() {
  199. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  200. prs := ps.GetRoundState()
  201. if prs.Height == vote.Height {
  202. // TODO: Also filter on round?
  203. peer.TrySend(StateChannel, msg)
  204. } else {
  205. // Height doesn't match
  206. // TODO: check a field, maybe CatchupCommitRound?
  207. // TODO: But that requires changing the struct field comment.
  208. }
  209. }
  210. */
  211. }
  212. // Sets our private validator account for signing votes.
  213. func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
  214. conR.conS.SetPrivValidator(priv)
  215. }
  216. // implements events.Eventable
  217. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  218. conR.evsw = evsw
  219. conR.conS.SetFireable(evsw)
  220. }
  221. //--------------------------------------
  222. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  223. nrsMsg = &NewRoundStepMessage{
  224. Height: rs.Height,
  225. Round: rs.Round,
  226. Step: rs.Step,
  227. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  228. LastCommitRound: rs.LastCommit.Round(),
  229. }
  230. if rs.Step == RoundStepCommit {
  231. csMsg = &CommitStepMessage{
  232. Height: rs.Height,
  233. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  234. BlockParts: rs.ProposalBlockParts.BitArray(),
  235. }
  236. }
  237. return
  238. }
  239. // Listens for changes to the ConsensusState.Step by pulling
  240. // on conR.conS.NewStepCh().
  241. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  242. for {
  243. // Get RoundState with new Step or quit.
  244. var rs *RoundState
  245. select {
  246. case rs = <-conR.conS.NewStepCh():
  247. case <-conR.Quit:
  248. return
  249. }
  250. nrsMsg, csMsg := makeRoundStepMessages(rs)
  251. if nrsMsg != nil {
  252. conR.Switch.Broadcast(StateChannel, nrsMsg)
  253. }
  254. if csMsg != nil {
  255. conR.Switch.Broadcast(StateChannel, csMsg)
  256. }
  257. }
  258. }
  259. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  260. rs := conR.conS.GetRoundState()
  261. nrsMsg, csMsg := makeRoundStepMessages(rs)
  262. if nrsMsg != nil {
  263. peer.Send(StateChannel, nrsMsg)
  264. }
  265. if csMsg != nil {
  266. peer.Send(StateChannel, csMsg)
  267. }
  268. }
  269. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  270. log := log.New("peer", peer.Key)
  271. OUTER_LOOP:
  272. for {
  273. // Manage disconnects from self or peer.
  274. if !peer.IsRunning() || !conR.IsRunning() {
  275. log.Notice(Fmt("Stopping gossipDataRoutine for %v.", peer))
  276. return
  277. }
  278. rs := conR.conS.GetRoundState()
  279. prs := ps.GetRoundState()
  280. // Send proposal Block parts?
  281. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  282. //log.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  283. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  284. part := rs.ProposalBlockParts.GetPart(index)
  285. msg := &BlockPartMessage{
  286. Height: rs.Height, // This tells peer that this part applies to us.
  287. Round: rs.Round, // This tells peer that this part applies to us.
  288. Part: part,
  289. }
  290. peer.Send(DataChannel, msg)
  291. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  292. continue OUTER_LOOP
  293. }
  294. }
  295. // If the peer is on a previous height, help catch up.
  296. if (0 < prs.Height) && (prs.Height < rs.Height) {
  297. //log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  298. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  299. // Ensure that the peer's PartSetHeader is correct
  300. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  301. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  302. log.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  303. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  304. time.Sleep(peerGossipSleepDuration)
  305. continue OUTER_LOOP
  306. }
  307. // Load the part
  308. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  309. if part == nil {
  310. log.Warn("Could not load part", "index", index,
  311. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  312. time.Sleep(peerGossipSleepDuration)
  313. continue OUTER_LOOP
  314. }
  315. // Send the part
  316. msg := &BlockPartMessage{
  317. Height: prs.Height, // Not our height, so it doesn't matter.
  318. Round: prs.Round, // Not our height, so it doesn't matter.
  319. Part: part,
  320. }
  321. peer.Send(DataChannel, msg)
  322. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  323. continue OUTER_LOOP
  324. } else {
  325. //log.Info("No parts to send in catch-up, sleeping")
  326. time.Sleep(peerGossipSleepDuration)
  327. continue OUTER_LOOP
  328. }
  329. }
  330. // If height and round don't match, sleep.
  331. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  332. //log.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  333. time.Sleep(peerGossipSleepDuration)
  334. continue OUTER_LOOP
  335. }
  336. // By here, height and round match.
  337. // Proposal block parts were already matched and sent if any were wanted.
  338. // (These can match on hash so the round doesn't matter)
  339. // Now consider sending other things, like the Proposal itself.
  340. // Send Proposal && ProposalPOL BitArray?
  341. if rs.Proposal != nil && !prs.Proposal {
  342. // Proposal
  343. {
  344. msg := &ProposalMessage{Proposal: rs.Proposal}
  345. peer.Send(DataChannel, msg)
  346. ps.SetHasProposal(rs.Proposal)
  347. }
  348. // ProposalPOL.
  349. // Peer must receive ProposalMessage first.
  350. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  351. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  352. if 0 <= rs.Proposal.POLRound {
  353. msg := &ProposalPOLMessage{
  354. Height: rs.Height,
  355. ProposalPOLRound: rs.Proposal.POLRound,
  356. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  357. }
  358. peer.Send(DataChannel, msg)
  359. }
  360. continue OUTER_LOOP
  361. }
  362. // Nothing to do. Sleep.
  363. time.Sleep(peerGossipSleepDuration)
  364. continue OUTER_LOOP
  365. }
  366. }
  367. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  368. log := log.New("peer", peer.Key)
  369. // Simple hack to throttle logs upon sleep.
  370. var sleeping = 0
  371. OUTER_LOOP:
  372. for {
  373. // Manage disconnects from self or peer.
  374. if !peer.IsRunning() || !conR.IsRunning() {
  375. log.Notice(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  376. return
  377. }
  378. rs := conR.conS.GetRoundState()
  379. prs := ps.GetRoundState()
  380. switch sleeping {
  381. case 1: // First sleep
  382. sleeping = 2
  383. case 2: // No more sleep
  384. sleeping = 0
  385. }
  386. log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  387. "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  388. // If height matches, then send LastCommit, Prevotes, Precommits.
  389. if rs.Height == prs.Height {
  390. // If there are lastCommits to send...
  391. if prs.Step == RoundStepNewHeight {
  392. if ps.PickSendVote(rs.LastCommit) {
  393. log.Info("Picked rs.LastCommit to send")
  394. continue OUTER_LOOP
  395. }
  396. }
  397. // If there are prevotes to send...
  398. if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
  399. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  400. log.Info("Picked rs.Prevotes(prs.Round) to send")
  401. continue OUTER_LOOP
  402. }
  403. }
  404. // If there are precommits to send...
  405. if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
  406. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  407. log.Info("Picked rs.Precommits(prs.Round) to send")
  408. continue OUTER_LOOP
  409. }
  410. }
  411. // If there are POLPrevotes to send...
  412. if prs.ProposalPOLRound != -1 {
  413. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  414. if ps.PickSendVote(polPrevotes) {
  415. log.Info("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  416. continue OUTER_LOOP
  417. }
  418. }
  419. }
  420. }
  421. // Special catchup logic.
  422. // If peer is lagging by height 1, send LastCommit.
  423. if prs.Height != 0 && rs.Height == prs.Height+1 {
  424. if ps.PickSendVote(rs.LastCommit) {
  425. log.Info("Picked rs.LastCommit to send")
  426. continue OUTER_LOOP
  427. }
  428. }
  429. // Catchup logic
  430. // If peer is lagging by more than 1, send Validation.
  431. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  432. // Load the block validation for prs.Height,
  433. // which contains precommit signatures for prs.Height.
  434. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  435. log.Info("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  436. if ps.PickSendVote(validation) {
  437. log.Info("Picked Catchup validation to send")
  438. continue OUTER_LOOP
  439. }
  440. }
  441. if sleeping == 0 {
  442. // We sent nothing. Sleep...
  443. sleeping = 1
  444. log.Info("No votes to send, sleeping", "peer", peer,
  445. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  446. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  447. } else if sleeping == 2 {
  448. // Continued sleep...
  449. sleeping = 1
  450. }
  451. time.Sleep(peerGossipSleepDuration)
  452. continue OUTER_LOOP
  453. }
  454. }
  455. //-----------------------------------------------------------------------------
  456. // Read only when returned by PeerState.GetRoundState().
  457. type PeerRoundState struct {
  458. Height int // Height peer is at
  459. Round int // Round peer is at, -1 if unknown.
  460. Step RoundStepType // Step peer is at
  461. StartTime time.Time // Estimated start of round 0 at this height
  462. Proposal bool // True if peer has proposal for this round
  463. ProposalBlockPartsHeader types.PartSetHeader //
  464. ProposalBlockParts *BitArray //
  465. ProposalPOLRound int // Proposal's POL round. -1 if none.
  466. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  467. Prevotes *BitArray // All votes peer has for this round
  468. Precommits *BitArray // All precommits peer has for this round
  469. LastCommitRound int // Round of commit for last height. -1 if none.
  470. LastCommit *BitArray // All commit precommits of commit for last height.
  471. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none.
  472. CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound
  473. }
  474. //-----------------------------------------------------------------------------
  475. var (
  476. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  477. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  478. )
  479. type PeerState struct {
  480. Peer *p2p.Peer
  481. mtx sync.Mutex
  482. PeerRoundState
  483. }
  484. func NewPeerState(peer *p2p.Peer) *PeerState {
  485. return &PeerState{
  486. Peer: peer,
  487. PeerRoundState: PeerRoundState{
  488. Round: -1,
  489. ProposalPOLRound: -1,
  490. LastCommitRound: -1,
  491. CatchupCommitRound: -1,
  492. },
  493. }
  494. }
  495. // Returns an atomic snapshot of the PeerRoundState.
  496. // There's no point in mutating it since it won't change PeerState.
  497. func (ps *PeerState) GetRoundState() *PeerRoundState {
  498. ps.mtx.Lock()
  499. defer ps.mtx.Unlock()
  500. prs := ps.PeerRoundState // copy
  501. return &prs
  502. }
  503. // Returns an atomic snapshot of the PeerRoundState's height
  504. // used by the mempool to ensure peers are caught up before broadcasting new txs
  505. func (ps *PeerState) GetHeight() int {
  506. ps.mtx.Lock()
  507. defer ps.mtx.Unlock()
  508. return ps.PeerRoundState.Height
  509. }
  510. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  511. ps.mtx.Lock()
  512. defer ps.mtx.Unlock()
  513. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  514. return
  515. }
  516. if ps.Proposal {
  517. return
  518. }
  519. ps.Proposal = true
  520. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  521. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  522. ps.ProposalPOLRound = proposal.POLRound
  523. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  524. }
  525. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  526. ps.mtx.Lock()
  527. defer ps.mtx.Unlock()
  528. if ps.Height != height || ps.Round != round {
  529. return
  530. }
  531. ps.ProposalBlockParts.SetIndex(index, true)
  532. }
  533. // Convenience function to send vote to peer.
  534. // Returns true if vote was sent.
  535. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  536. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  537. msg := &VoteMessage{index, vote}
  538. ps.Peer.Send(VoteChannel, msg)
  539. return true
  540. }
  541. return false
  542. }
  543. // votes: Must be the correct Size() for the Height().
  544. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  545. ps.mtx.Lock()
  546. defer ps.mtx.Unlock()
  547. if votes.Size() == 0 {
  548. return 0, nil, false
  549. }
  550. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  551. // Lazily set data using 'votes'.
  552. if votes.IsCommit() {
  553. ps.ensureCatchupCommitRound(height, round, size)
  554. }
  555. ps.ensureVoteBitArrays(height, size)
  556. psVotes := ps.getVoteBitArray(height, round, type_)
  557. if psVotes == nil {
  558. return 0, nil, false // Not something worth sending
  559. }
  560. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  561. ps.setHasVote(height, round, type_, index)
  562. return index, votes.GetByIndex(index), true
  563. }
  564. return 0, nil, false
  565. }
  566. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  567. if ps.Height == height {
  568. if ps.Round == round {
  569. switch type_ {
  570. case types.VoteTypePrevote:
  571. return ps.Prevotes
  572. case types.VoteTypePrecommit:
  573. return ps.Precommits
  574. default:
  575. PanicSanity(Fmt("Unexpected vote type %X", type_))
  576. }
  577. }
  578. if ps.CatchupCommitRound == round {
  579. switch type_ {
  580. case types.VoteTypePrevote:
  581. return nil
  582. case types.VoteTypePrecommit:
  583. return ps.CatchupCommit
  584. default:
  585. PanicSanity(Fmt("Unexpected vote type %X", type_))
  586. }
  587. }
  588. return nil
  589. }
  590. if ps.Height == height+1 {
  591. if ps.LastCommitRound == round {
  592. switch type_ {
  593. case types.VoteTypePrevote:
  594. return nil
  595. case types.VoteTypePrecommit:
  596. return ps.LastCommit
  597. default:
  598. PanicSanity(Fmt("Unexpected vote type %X", type_))
  599. }
  600. }
  601. return nil
  602. }
  603. return nil
  604. }
  605. // 'round': A round for which we have a +2/3 commit.
  606. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  607. if ps.Height != height {
  608. return
  609. }
  610. /*
  611. NOTE: This is wrong, 'round' could change.
  612. e.g. if orig round is not the same as block LastValidation round.
  613. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  614. PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  615. }
  616. */
  617. if ps.CatchupCommitRound == round {
  618. return // Nothing to do!
  619. }
  620. ps.CatchupCommitRound = round
  621. if round == ps.Round {
  622. ps.CatchupCommit = ps.Precommits
  623. } else {
  624. ps.CatchupCommit = NewBitArray(numValidators)
  625. }
  626. }
  627. // NOTE: It's important to make sure that numValidators actually matches
  628. // what the node sees as the number of validators for height.
  629. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  630. ps.mtx.Lock()
  631. defer ps.mtx.Unlock()
  632. ps.ensureVoteBitArrays(height, numValidators)
  633. }
  634. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  635. if ps.Height == height {
  636. if ps.Prevotes == nil {
  637. ps.Prevotes = NewBitArray(numValidators)
  638. }
  639. if ps.Precommits == nil {
  640. ps.Precommits = NewBitArray(numValidators)
  641. }
  642. if ps.CatchupCommit == nil {
  643. ps.CatchupCommit = NewBitArray(numValidators)
  644. }
  645. if ps.ProposalPOL == nil {
  646. ps.ProposalPOL = NewBitArray(numValidators)
  647. }
  648. } else if ps.Height == height+1 {
  649. if ps.LastCommit == nil {
  650. ps.LastCommit = NewBitArray(numValidators)
  651. }
  652. }
  653. }
  654. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  655. ps.mtx.Lock()
  656. defer ps.mtx.Unlock()
  657. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  658. }
  659. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  660. log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
  661. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  662. PanicSanity("Invalid vote type")
  663. }
  664. if ps.Height == height {
  665. if ps.Round == round {
  666. switch type_ {
  667. case types.VoteTypePrevote:
  668. ps.Prevotes.SetIndex(index, true)
  669. log.Info("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  670. case types.VoteTypePrecommit:
  671. ps.Precommits.SetIndex(index, true)
  672. log.Info("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  673. }
  674. } else if ps.CatchupCommitRound == round {
  675. switch type_ {
  676. case types.VoteTypePrevote:
  677. case types.VoteTypePrecommit:
  678. ps.CatchupCommit.SetIndex(index, true)
  679. log.Info("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  680. }
  681. } else if ps.ProposalPOLRound == round {
  682. switch type_ {
  683. case types.VoteTypePrevote:
  684. ps.ProposalPOL.SetIndex(index, true)
  685. log.Info("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  686. case types.VoteTypePrecommit:
  687. }
  688. }
  689. } else if ps.Height == height+1 {
  690. if ps.LastCommitRound == round {
  691. switch type_ {
  692. case types.VoteTypePrevote:
  693. case types.VoteTypePrecommit:
  694. ps.LastCommit.SetIndex(index, true)
  695. log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  696. }
  697. }
  698. } else {
  699. // Does not apply.
  700. }
  701. }
  702. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  703. ps.mtx.Lock()
  704. defer ps.mtx.Unlock()
  705. // Ignore duplicate messages.
  706. if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step {
  707. return
  708. }
  709. // Just remember these values.
  710. psHeight := ps.Height
  711. psRound := ps.Round
  712. //psStep := ps.Step
  713. psCatchupCommitRound := ps.CatchupCommitRound
  714. psCatchupCommit := ps.CatchupCommit
  715. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  716. ps.Height = msg.Height
  717. ps.Round = msg.Round
  718. ps.Step = msg.Step
  719. ps.StartTime = startTime
  720. if psHeight != msg.Height || psRound != msg.Round {
  721. ps.Proposal = false
  722. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  723. ps.ProposalBlockParts = nil
  724. ps.ProposalPOLRound = -1
  725. ps.ProposalPOL = nil
  726. // We'll update the BitArray capacity later.
  727. ps.Prevotes = nil
  728. ps.Precommits = nil
  729. }
  730. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  731. // Peer caught up to CatchupCommitRound.
  732. // Preserve psCatchupCommit!
  733. // NOTE: We prefer to use prs.Precommits if
  734. // pr.Round matches pr.CatchupCommitRound.
  735. ps.Precommits = psCatchupCommit
  736. }
  737. if psHeight != msg.Height {
  738. // Shift Precommits to LastCommit.
  739. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  740. ps.LastCommitRound = msg.LastCommitRound
  741. ps.LastCommit = ps.Precommits
  742. } else {
  743. ps.LastCommitRound = msg.LastCommitRound
  744. ps.LastCommit = nil
  745. }
  746. // We'll update the BitArray capacity later.
  747. ps.CatchupCommitRound = -1
  748. ps.CatchupCommit = nil
  749. }
  750. }
  751. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  752. ps.mtx.Lock()
  753. defer ps.mtx.Unlock()
  754. if ps.Height != msg.Height {
  755. return
  756. }
  757. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  758. ps.ProposalBlockParts = msg.BlockParts
  759. }
  760. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  761. ps.mtx.Lock()
  762. defer ps.mtx.Unlock()
  763. if ps.Height != msg.Height {
  764. return
  765. }
  766. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  767. }
  768. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  769. ps.mtx.Lock()
  770. defer ps.mtx.Unlock()
  771. if ps.Height != msg.Height {
  772. return
  773. }
  774. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  775. return
  776. }
  777. // TODO: Merge onto existing ps.ProposalPOL?
  778. // We might have sent some prevotes in the meantime.
  779. ps.ProposalPOL = msg.ProposalPOL
  780. }
  781. //-----------------------------------------------------------------------------
  782. // Messages
  783. const (
  784. msgTypeNewRoundStep = byte(0x01)
  785. msgTypeCommitStep = byte(0x02)
  786. msgTypeProposal = byte(0x11)
  787. msgTypeProposalPOL = byte(0x12)
  788. msgTypeBlockPart = byte(0x13) // both block & POL
  789. msgTypeVote = byte(0x14)
  790. msgTypeHasVote = byte(0x15)
  791. )
  792. type ConsensusMessage interface{}
  793. var _ = wire.RegisterInterface(
  794. struct{ ConsensusMessage }{},
  795. wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  796. wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  797. wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  798. wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  799. wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  800. wire.ConcreteType{&VoteMessage{}, msgTypeVote},
  801. wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  802. )
  803. // TODO: check for unnecessary extra bytes at the end.
  804. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  805. msgType = bz[0]
  806. n := new(int)
  807. r := bytes.NewReader(bz)
  808. msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  809. return
  810. }
  811. //-------------------------------------
  812. // For every height/round/step transition
  813. type NewRoundStepMessage struct {
  814. Height int
  815. Round int
  816. Step RoundStepType
  817. SecondsSinceStartTime int
  818. LastCommitRound int
  819. }
  820. func (m *NewRoundStepMessage) String() string {
  821. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  822. m.Height, m.Round, m.Step, m.LastCommitRound)
  823. }
  824. //-------------------------------------
  825. type CommitStepMessage struct {
  826. Height int
  827. BlockPartsHeader types.PartSetHeader
  828. BlockParts *BitArray
  829. }
  830. func (m *CommitStepMessage) String() string {
  831. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  832. }
  833. //-------------------------------------
  834. type ProposalMessage struct {
  835. Proposal *types.Proposal
  836. }
  837. func (m *ProposalMessage) String() string {
  838. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  839. }
  840. //-------------------------------------
  841. type ProposalPOLMessage struct {
  842. Height int
  843. ProposalPOLRound int
  844. ProposalPOL *BitArray
  845. }
  846. func (m *ProposalPOLMessage) String() string {
  847. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  848. }
  849. //-------------------------------------
  850. type BlockPartMessage struct {
  851. Height int
  852. Round int
  853. Part *types.Part
  854. }
  855. func (m *BlockPartMessage) String() string {
  856. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  857. }
  858. //-------------------------------------
  859. type VoteMessage struct {
  860. ValidatorIndex int
  861. Vote *types.Vote
  862. }
  863. func (m *VoteMessage) String() string {
  864. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  865. }
  866. //-------------------------------------
  867. type HasVoteMessage struct {
  868. Height int
  869. Round int
  870. Type byte
  871. Index int
  872. }
  873. func (m *HasVoteMessage) String() string {
  874. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  875. }