You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1006 lines
29 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. bc "github.com/tendermint/tendermint/blockchain"
  10. . "github.com/tendermint/tendermint/common"
  11. "github.com/tendermint/tendermint/events"
  12. "github.com/tendermint/tendermint/p2p"
  13. sm "github.com/tendermint/tendermint/state"
  14. "github.com/tendermint/tendermint/types"
  15. "github.com/tendermint/tendermint/wire"
  16. )
  17. const (
  18. StateChannel = byte(0x20)
  19. DataChannel = byte(0x21)
  20. VoteChannel = byte(0x22)
  21. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  22. )
  23. //-----------------------------------------------------------------------------
  24. type ConsensusReactor struct {
  25. p2p.BaseReactor
  26. blockStore *bc.BlockStore
  27. conS *ConsensusState
  28. fastSync bool
  29. evsw events.Fireable
  30. }
  31. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
  32. conR := &ConsensusReactor{
  33. blockStore: blockStore,
  34. conS: consensusState,
  35. fastSync: fastSync,
  36. }
  37. conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR)
  38. return conR
  39. }
  40. func (conR *ConsensusReactor) OnStart() error {
  41. log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
  42. conR.BaseReactor.OnStart()
  43. if !conR.fastSync {
  44. _, err := conR.conS.Start()
  45. if err != nil {
  46. return err
  47. }
  48. }
  49. go conR.broadcastNewRoundStepRoutine()
  50. return nil
  51. }
  52. func (conR *ConsensusReactor) OnStop() {
  53. conR.BaseReactor.OnStop()
  54. conR.conS.Stop()
  55. }
  56. // Switch from the fast_sync to the consensus:
  57. // reset the state, turn off fast_sync, start the consensus-state-machine
  58. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  59. log.Notice("SwitchToConsensus")
  60. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  61. // broadcast a NewRoundStepMessage.
  62. conR.conS.updateToState(state)
  63. conR.fastSync = false
  64. conR.conS.Start()
  65. }
  66. // Implements Reactor
  67. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  68. // TODO optimize
  69. return []*p2p.ChannelDescriptor{
  70. &p2p.ChannelDescriptor{
  71. ID: StateChannel,
  72. Priority: 5,
  73. SendQueueCapacity: 100,
  74. },
  75. &p2p.ChannelDescriptor{
  76. ID: DataChannel,
  77. Priority: 5,
  78. SendQueueCapacity: 2,
  79. },
  80. &p2p.ChannelDescriptor{
  81. ID: VoteChannel,
  82. Priority: 5,
  83. SendQueueCapacity: 40,
  84. },
  85. }
  86. }
  87. // Implements Reactor
  88. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  89. if !conR.IsRunning() {
  90. return
  91. }
  92. // Create peerState for peer
  93. peerState := NewPeerState(peer)
  94. peer.Data.Set(types.PeerStateKey, peerState)
  95. // Begin gossip routines for this peer.
  96. go conR.gossipDataRoutine(peer, peerState)
  97. go conR.gossipVotesRoutine(peer, peerState)
  98. // Send our state to peer.
  99. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  100. if !conR.fastSync {
  101. conR.sendNewRoundStepMessage(peer)
  102. }
  103. }
  104. // Implements Reactor
  105. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  106. if !conR.IsRunning() {
  107. return
  108. }
  109. // TODO
  110. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  111. }
  112. // Implements Reactor
  113. // NOTE: We process these messages even when we're fast_syncing.
  114. func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
  115. if !conR.IsRunning() {
  116. log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes)
  117. return
  118. }
  119. // Get peer states
  120. ps := peer.Data.Get(types.PeerStateKey).(*PeerState)
  121. _, msg, err := DecodeMessage(msgBytes)
  122. if err != nil {
  123. log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
  124. // TODO punish peer?
  125. return
  126. }
  127. log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg)
  128. switch chID {
  129. case StateChannel:
  130. switch msg := msg.(type) {
  131. case *NewRoundStepMessage:
  132. ps.ApplyNewRoundStepMessage(msg)
  133. case *CommitStepMessage:
  134. ps.ApplyCommitStepMessage(msg)
  135. case *HasVoteMessage:
  136. ps.ApplyHasVoteMessage(msg)
  137. default:
  138. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  139. }
  140. case DataChannel:
  141. if conR.fastSync {
  142. log.Warn("Ignoring message received during fastSync", "msg", msg)
  143. return
  144. }
  145. switch msg := msg.(type) {
  146. case *ProposalMessage:
  147. ps.SetHasProposal(msg.Proposal)
  148. err = conR.conS.SetProposal(msg.Proposal)
  149. case *ProposalPOLMessage:
  150. ps.ApplyProposalPOLMessage(msg)
  151. case *BlockPartMessage:
  152. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  153. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
  154. default:
  155. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  156. }
  157. case VoteChannel:
  158. if conR.fastSync {
  159. log.Warn("Ignoring message received during fastSync", "msg", msg)
  160. return
  161. }
  162. switch msg := msg.(type) {
  163. case *VoteMessage:
  164. vote, valIndex := msg.Vote, msg.ValidatorIndex
  165. // attempt to add the vote and dupeout the validator if its a duplicate signature
  166. added, err := conR.conS.TryAddVote(valIndex, vote, peer.Key)
  167. if err == ErrAddingVote {
  168. // TODO: punish peer
  169. } else if err != nil {
  170. return
  171. }
  172. cs := conR.conS
  173. cs.mtx.Lock()
  174. height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
  175. cs.mtx.Unlock()
  176. ps.EnsureVoteBitArrays(height, valSize)
  177. ps.EnsureVoteBitArrays(height-1, lastCommitSize)
  178. ps.SetHasVote(vote, valIndex)
  179. if added {
  180. // If rs.Height == vote.Height && rs.Round < vote.Round,
  181. // the peer is sending us CatchupCommit precommits.
  182. // We could make note of this and help filter in broadcastHasVoteMessage().
  183. conR.broadcastHasVoteMessage(vote, valIndex)
  184. }
  185. default:
  186. // don't punish (leave room for soft upgrades)
  187. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  188. }
  189. default:
  190. log.Warn(Fmt("Unknown channel %X", chID))
  191. }
  192. if err != nil {
  193. log.Warn("Error in Receive()", "error", err)
  194. }
  195. }
  196. // Broadcasts HasVoteMessage to peers that care.
  197. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  198. msg := &HasVoteMessage{
  199. Height: vote.Height,
  200. Round: vote.Round,
  201. Type: vote.Type,
  202. Index: index,
  203. }
  204. conR.Switch.Broadcast(StateChannel, msg)
  205. /*
  206. // TODO: Make this broadcast more selective.
  207. for _, peer := range conR.Switch.Peers().List() {
  208. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  209. prs := ps.GetRoundState()
  210. if prs.Height == vote.Height {
  211. // TODO: Also filter on round?
  212. peer.TrySend(StateChannel, msg)
  213. } else {
  214. // Height doesn't match
  215. // TODO: check a field, maybe CatchupCommitRound?
  216. // TODO: But that requires changing the struct field comment.
  217. }
  218. }
  219. */
  220. }
  221. // Sets our private validator account for signing votes.
  222. func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
  223. conR.conS.SetPrivValidator(priv)
  224. }
  225. // implements events.Eventable
  226. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  227. conR.evsw = evsw
  228. conR.conS.SetFireable(evsw)
  229. }
  230. //--------------------------------------
  231. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  232. nrsMsg = &NewRoundStepMessage{
  233. Height: rs.Height,
  234. Round: rs.Round,
  235. Step: rs.Step,
  236. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  237. LastCommitRound: rs.LastCommit.Round(),
  238. }
  239. if rs.Step == RoundStepCommit {
  240. csMsg = &CommitStepMessage{
  241. Height: rs.Height,
  242. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  243. BlockParts: rs.ProposalBlockParts.BitArray(),
  244. }
  245. }
  246. return
  247. }
  248. // Listens for changes to the ConsensusState.Step by pulling
  249. // on conR.conS.NewStepCh().
  250. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  251. for {
  252. // Get RoundState with new Step or quit.
  253. var rs *RoundState
  254. select {
  255. case rs = <-conR.conS.NewStepCh():
  256. case <-conR.Quit:
  257. return
  258. }
  259. nrsMsg, csMsg := makeRoundStepMessages(rs)
  260. if nrsMsg != nil {
  261. conR.Switch.Broadcast(StateChannel, nrsMsg)
  262. }
  263. if csMsg != nil {
  264. conR.Switch.Broadcast(StateChannel, csMsg)
  265. }
  266. }
  267. }
  268. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  269. rs := conR.conS.GetRoundState()
  270. nrsMsg, csMsg := makeRoundStepMessages(rs)
  271. if nrsMsg != nil {
  272. peer.Send(StateChannel, nrsMsg)
  273. }
  274. if csMsg != nil {
  275. peer.Send(StateChannel, csMsg)
  276. }
  277. }
  278. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  279. log := log.New("peer", peer.Key)
  280. OUTER_LOOP:
  281. for {
  282. // Manage disconnects from self or peer.
  283. if !peer.IsRunning() || !conR.IsRunning() {
  284. log.Notice(Fmt("Stopping gossipDataRoutine for %v.", peer))
  285. return
  286. }
  287. rs := conR.conS.GetRoundState()
  288. prs := ps.GetRoundState()
  289. // Send proposal Block parts?
  290. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  291. //log.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  292. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  293. part := rs.ProposalBlockParts.GetPart(index)
  294. msg := &BlockPartMessage{
  295. Height: rs.Height, // This tells peer that this part applies to us.
  296. Round: rs.Round, // This tells peer that this part applies to us.
  297. Part: part,
  298. }
  299. peer.Send(DataChannel, msg)
  300. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  301. continue OUTER_LOOP
  302. }
  303. }
  304. // If the peer is on a previous height, help catch up.
  305. if (0 < prs.Height) && (prs.Height < rs.Height) {
  306. //log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  307. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  308. // Ensure that the peer's PartSetHeader is correct
  309. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  310. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  311. log.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  312. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  313. time.Sleep(peerGossipSleepDuration)
  314. continue OUTER_LOOP
  315. }
  316. // Load the part
  317. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  318. if part == nil {
  319. log.Warn("Could not load part", "index", index,
  320. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  321. time.Sleep(peerGossipSleepDuration)
  322. continue OUTER_LOOP
  323. }
  324. // Send the part
  325. msg := &BlockPartMessage{
  326. Height: prs.Height, // Not our height, so it doesn't matter.
  327. Round: prs.Round, // Not our height, so it doesn't matter.
  328. Part: part,
  329. }
  330. peer.Send(DataChannel, msg)
  331. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  332. continue OUTER_LOOP
  333. } else {
  334. //log.Info("No parts to send in catch-up, sleeping")
  335. time.Sleep(peerGossipSleepDuration)
  336. continue OUTER_LOOP
  337. }
  338. }
  339. // If height and round don't match, sleep.
  340. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  341. //log.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  342. time.Sleep(peerGossipSleepDuration)
  343. continue OUTER_LOOP
  344. }
  345. // By here, height and round match.
  346. // Proposal block parts were already matched and sent if any were wanted.
  347. // (These can match on hash so the round doesn't matter)
  348. // Now consider sending other things, like the Proposal itself.
  349. // Send Proposal && ProposalPOL BitArray?
  350. if rs.Proposal != nil && !prs.Proposal {
  351. // Proposal
  352. {
  353. msg := &ProposalMessage{Proposal: rs.Proposal}
  354. peer.Send(DataChannel, msg)
  355. ps.SetHasProposal(rs.Proposal)
  356. }
  357. // ProposalPOL.
  358. // Peer must receive ProposalMessage first.
  359. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  360. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  361. if 0 <= rs.Proposal.POLRound {
  362. msg := &ProposalPOLMessage{
  363. Height: rs.Height,
  364. ProposalPOLRound: rs.Proposal.POLRound,
  365. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  366. }
  367. peer.Send(DataChannel, msg)
  368. }
  369. continue OUTER_LOOP
  370. }
  371. // Nothing to do. Sleep.
  372. time.Sleep(peerGossipSleepDuration)
  373. continue OUTER_LOOP
  374. }
  375. }
  376. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  377. log := log.New("peer", peer.Key)
  378. // Simple hack to throttle logs upon sleep.
  379. var sleeping = 0
  380. OUTER_LOOP:
  381. for {
  382. // Manage disconnects from self or peer.
  383. if !peer.IsRunning() || !conR.IsRunning() {
  384. log.Notice(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  385. return
  386. }
  387. rs := conR.conS.GetRoundState()
  388. prs := ps.GetRoundState()
  389. switch sleeping {
  390. case 1: // First sleep
  391. sleeping = 2
  392. case 2: // No more sleep
  393. sleeping = 0
  394. }
  395. log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  396. "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  397. // If height matches, then send LastCommit, Prevotes, Precommits.
  398. if rs.Height == prs.Height {
  399. // If there are lastCommits to send...
  400. if prs.Step == RoundStepNewHeight {
  401. if ps.PickSendVote(rs.LastCommit) {
  402. log.Info("Picked rs.LastCommit to send")
  403. continue OUTER_LOOP
  404. }
  405. }
  406. // If there are prevotes to send...
  407. if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
  408. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  409. log.Info("Picked rs.Prevotes(prs.Round) to send")
  410. continue OUTER_LOOP
  411. }
  412. }
  413. // If there are precommits to send...
  414. if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
  415. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  416. log.Info("Picked rs.Precommits(prs.Round) to send")
  417. continue OUTER_LOOP
  418. }
  419. }
  420. // If there are POLPrevotes to send...
  421. if prs.ProposalPOLRound != -1 {
  422. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  423. if ps.PickSendVote(polPrevotes) {
  424. log.Info("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  425. continue OUTER_LOOP
  426. }
  427. }
  428. }
  429. }
  430. // Special catchup logic.
  431. // If peer is lagging by height 1, send LastCommit.
  432. if prs.Height != 0 && rs.Height == prs.Height+1 {
  433. if ps.PickSendVote(rs.LastCommit) {
  434. log.Info("Picked rs.LastCommit to send")
  435. continue OUTER_LOOP
  436. }
  437. }
  438. // Catchup logic
  439. // If peer is lagging by more than 1, send Validation.
  440. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  441. // Load the block validation for prs.Height,
  442. // which contains precommit signatures for prs.Height.
  443. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  444. log.Info("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  445. if ps.PickSendVote(validation) {
  446. log.Info("Picked Catchup validation to send")
  447. continue OUTER_LOOP
  448. }
  449. }
  450. if sleeping == 0 {
  451. // We sent nothing. Sleep...
  452. sleeping = 1
  453. log.Info("No votes to send, sleeping", "peer", peer,
  454. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  455. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  456. } else if sleeping == 2 {
  457. // Continued sleep...
  458. sleeping = 1
  459. }
  460. time.Sleep(peerGossipSleepDuration)
  461. continue OUTER_LOOP
  462. }
  463. }
  464. //-----------------------------------------------------------------------------
  465. // Read only when returned by PeerState.GetRoundState().
  466. type PeerRoundState struct {
  467. Height int // Height peer is at
  468. Round int // Round peer is at, -1 if unknown.
  469. Step RoundStepType // Step peer is at
  470. StartTime time.Time // Estimated start of round 0 at this height
  471. Proposal bool // True if peer has proposal for this round
  472. ProposalBlockPartsHeader types.PartSetHeader //
  473. ProposalBlockParts *BitArray //
  474. ProposalPOLRound int // Proposal's POL round. -1 if none.
  475. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  476. Prevotes *BitArray // All votes peer has for this round
  477. Precommits *BitArray // All precommits peer has for this round
  478. LastCommitRound int // Round of commit for last height. -1 if none.
  479. LastCommit *BitArray // All commit precommits of commit for last height.
  480. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none.
  481. CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound
  482. }
  483. //-----------------------------------------------------------------------------
  484. var (
  485. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  486. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  487. )
  488. type PeerState struct {
  489. Peer *p2p.Peer
  490. mtx sync.Mutex
  491. PeerRoundState
  492. }
  493. func NewPeerState(peer *p2p.Peer) *PeerState {
  494. return &PeerState{
  495. Peer: peer,
  496. PeerRoundState: PeerRoundState{
  497. Round: -1,
  498. ProposalPOLRound: -1,
  499. LastCommitRound: -1,
  500. CatchupCommitRound: -1,
  501. },
  502. }
  503. }
  504. // Returns an atomic snapshot of the PeerRoundState.
  505. // There's no point in mutating it since it won't change PeerState.
  506. func (ps *PeerState) GetRoundState() *PeerRoundState {
  507. ps.mtx.Lock()
  508. defer ps.mtx.Unlock()
  509. prs := ps.PeerRoundState // copy
  510. return &prs
  511. }
  512. // Returns an atomic snapshot of the PeerRoundState's height
  513. // used by the mempool to ensure peers are caught up before broadcasting new txs
  514. func (ps *PeerState) GetHeight() int {
  515. ps.mtx.Lock()
  516. defer ps.mtx.Unlock()
  517. return ps.PeerRoundState.Height
  518. }
  519. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  520. ps.mtx.Lock()
  521. defer ps.mtx.Unlock()
  522. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  523. return
  524. }
  525. if ps.Proposal {
  526. return
  527. }
  528. ps.Proposal = true
  529. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  530. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  531. ps.ProposalPOLRound = proposal.POLRound
  532. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  533. }
  534. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  535. ps.mtx.Lock()
  536. defer ps.mtx.Unlock()
  537. if ps.Height != height || ps.Round != round {
  538. return
  539. }
  540. ps.ProposalBlockParts.SetIndex(index, true)
  541. }
  542. // Convenience function to send vote to peer.
  543. // Returns true if vote was sent.
  544. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  545. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  546. msg := &VoteMessage{index, vote}
  547. ps.Peer.Send(VoteChannel, msg)
  548. return true
  549. }
  550. return false
  551. }
  552. // votes: Must be the correct Size() for the Height().
  553. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  554. ps.mtx.Lock()
  555. defer ps.mtx.Unlock()
  556. if votes.Size() == 0 {
  557. return 0, nil, false
  558. }
  559. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  560. // Lazily set data using 'votes'.
  561. if votes.IsCommit() {
  562. ps.ensureCatchupCommitRound(height, round, size)
  563. }
  564. ps.ensureVoteBitArrays(height, size)
  565. psVotes := ps.getVoteBitArray(height, round, type_)
  566. if psVotes == nil {
  567. return 0, nil, false // Not something worth sending
  568. }
  569. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  570. ps.setHasVote(height, round, type_, index)
  571. return index, votes.GetByIndex(index), true
  572. }
  573. return 0, nil, false
  574. }
  575. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  576. if ps.Height == height {
  577. if ps.Round == round {
  578. switch type_ {
  579. case types.VoteTypePrevote:
  580. return ps.Prevotes
  581. case types.VoteTypePrecommit:
  582. return ps.Precommits
  583. default:
  584. PanicSanity(Fmt("Unexpected vote type %X", type_))
  585. }
  586. }
  587. if ps.CatchupCommitRound == round {
  588. switch type_ {
  589. case types.VoteTypePrevote:
  590. return nil
  591. case types.VoteTypePrecommit:
  592. return ps.CatchupCommit
  593. default:
  594. PanicSanity(Fmt("Unexpected vote type %X", type_))
  595. }
  596. }
  597. return nil
  598. }
  599. if ps.Height == height+1 {
  600. if ps.LastCommitRound == round {
  601. switch type_ {
  602. case types.VoteTypePrevote:
  603. return nil
  604. case types.VoteTypePrecommit:
  605. return ps.LastCommit
  606. default:
  607. PanicSanity(Fmt("Unexpected vote type %X", type_))
  608. }
  609. }
  610. return nil
  611. }
  612. return nil
  613. }
  614. // 'round': A round for which we have a +2/3 commit.
  615. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  616. if ps.Height != height {
  617. return
  618. }
  619. /*
  620. NOTE: This is wrong, 'round' could change.
  621. e.g. if orig round is not the same as block LastValidation round.
  622. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  623. PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  624. }
  625. */
  626. if ps.CatchupCommitRound == round {
  627. return // Nothing to do!
  628. }
  629. ps.CatchupCommitRound = round
  630. if round == ps.Round {
  631. ps.CatchupCommit = ps.Precommits
  632. } else {
  633. ps.CatchupCommit = NewBitArray(numValidators)
  634. }
  635. }
  636. // NOTE: It's important to make sure that numValidators actually matches
  637. // what the node sees as the number of validators for height.
  638. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  639. ps.mtx.Lock()
  640. defer ps.mtx.Unlock()
  641. ps.ensureVoteBitArrays(height, numValidators)
  642. }
  643. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  644. if ps.Height == height {
  645. if ps.Prevotes == nil {
  646. ps.Prevotes = NewBitArray(numValidators)
  647. }
  648. if ps.Precommits == nil {
  649. ps.Precommits = NewBitArray(numValidators)
  650. }
  651. if ps.CatchupCommit == nil {
  652. ps.CatchupCommit = NewBitArray(numValidators)
  653. }
  654. if ps.ProposalPOL == nil {
  655. ps.ProposalPOL = NewBitArray(numValidators)
  656. }
  657. } else if ps.Height == height+1 {
  658. if ps.LastCommit == nil {
  659. ps.LastCommit = NewBitArray(numValidators)
  660. }
  661. }
  662. }
  663. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  664. ps.mtx.Lock()
  665. defer ps.mtx.Unlock()
  666. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  667. }
  668. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  669. log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
  670. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  671. PanicSanity("Invalid vote type")
  672. }
  673. if ps.Height == height {
  674. if ps.Round == round {
  675. switch type_ {
  676. case types.VoteTypePrevote:
  677. ps.Prevotes.SetIndex(index, true)
  678. log.Info("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  679. case types.VoteTypePrecommit:
  680. ps.Precommits.SetIndex(index, true)
  681. log.Info("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  682. }
  683. } else if ps.CatchupCommitRound == round {
  684. switch type_ {
  685. case types.VoteTypePrevote:
  686. case types.VoteTypePrecommit:
  687. ps.CatchupCommit.SetIndex(index, true)
  688. log.Info("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  689. }
  690. } else if ps.ProposalPOLRound == round {
  691. switch type_ {
  692. case types.VoteTypePrevote:
  693. ps.ProposalPOL.SetIndex(index, true)
  694. log.Info("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  695. case types.VoteTypePrecommit:
  696. }
  697. }
  698. } else if ps.Height == height+1 {
  699. if ps.LastCommitRound == round {
  700. switch type_ {
  701. case types.VoteTypePrevote:
  702. case types.VoteTypePrecommit:
  703. ps.LastCommit.SetIndex(index, true)
  704. log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  705. }
  706. }
  707. } else {
  708. // Does not apply.
  709. }
  710. }
  711. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  712. ps.mtx.Lock()
  713. defer ps.mtx.Unlock()
  714. // Ignore duplicate messages.
  715. if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step {
  716. return
  717. }
  718. // Just remember these values.
  719. psHeight := ps.Height
  720. psRound := ps.Round
  721. //psStep := ps.Step
  722. psCatchupCommitRound := ps.CatchupCommitRound
  723. psCatchupCommit := ps.CatchupCommit
  724. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  725. ps.Height = msg.Height
  726. ps.Round = msg.Round
  727. ps.Step = msg.Step
  728. ps.StartTime = startTime
  729. if psHeight != msg.Height || psRound != msg.Round {
  730. ps.Proposal = false
  731. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  732. ps.ProposalBlockParts = nil
  733. ps.ProposalPOLRound = -1
  734. ps.ProposalPOL = nil
  735. // We'll update the BitArray capacity later.
  736. ps.Prevotes = nil
  737. ps.Precommits = nil
  738. }
  739. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  740. // Peer caught up to CatchupCommitRound.
  741. // Preserve psCatchupCommit!
  742. // NOTE: We prefer to use prs.Precommits if
  743. // pr.Round matches pr.CatchupCommitRound.
  744. ps.Precommits = psCatchupCommit
  745. }
  746. if psHeight != msg.Height {
  747. // Shift Precommits to LastCommit.
  748. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  749. ps.LastCommitRound = msg.LastCommitRound
  750. ps.LastCommit = ps.Precommits
  751. } else {
  752. ps.LastCommitRound = msg.LastCommitRound
  753. ps.LastCommit = nil
  754. }
  755. // We'll update the BitArray capacity later.
  756. ps.CatchupCommitRound = -1
  757. ps.CatchupCommit = nil
  758. }
  759. }
  760. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  761. ps.mtx.Lock()
  762. defer ps.mtx.Unlock()
  763. if ps.Height != msg.Height {
  764. return
  765. }
  766. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  767. ps.ProposalBlockParts = msg.BlockParts
  768. }
  769. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  770. ps.mtx.Lock()
  771. defer ps.mtx.Unlock()
  772. if ps.Height != msg.Height {
  773. return
  774. }
  775. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  776. }
  777. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  778. ps.mtx.Lock()
  779. defer ps.mtx.Unlock()
  780. if ps.Height != msg.Height {
  781. return
  782. }
  783. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  784. return
  785. }
  786. // TODO: Merge onto existing ps.ProposalPOL?
  787. // We might have sent some prevotes in the meantime.
  788. ps.ProposalPOL = msg.ProposalPOL
  789. }
  790. //-----------------------------------------------------------------------------
  791. // Messages
  792. const (
  793. msgTypeNewRoundStep = byte(0x01)
  794. msgTypeCommitStep = byte(0x02)
  795. msgTypeProposal = byte(0x11)
  796. msgTypeProposalPOL = byte(0x12)
  797. msgTypeBlockPart = byte(0x13) // both block & POL
  798. msgTypeVote = byte(0x14)
  799. msgTypeHasVote = byte(0x15)
  800. )
  801. type ConsensusMessage interface{}
  802. var _ = wire.RegisterInterface(
  803. struct{ ConsensusMessage }{},
  804. wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  805. wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  806. wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  807. wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  808. wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  809. wire.ConcreteType{&VoteMessage{}, msgTypeVote},
  810. wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  811. )
  812. // TODO: check for unnecessary extra bytes at the end.
  813. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  814. msgType = bz[0]
  815. n := new(int64)
  816. r := bytes.NewReader(bz)
  817. msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  818. return
  819. }
  820. //-------------------------------------
  821. // For every height/round/step transition
  822. type NewRoundStepMessage struct {
  823. Height int
  824. Round int
  825. Step RoundStepType
  826. SecondsSinceStartTime int
  827. LastCommitRound int
  828. }
  829. func (m *NewRoundStepMessage) String() string {
  830. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  831. m.Height, m.Round, m.Step, m.LastCommitRound)
  832. }
  833. //-------------------------------------
  834. type CommitStepMessage struct {
  835. Height int
  836. BlockPartsHeader types.PartSetHeader
  837. BlockParts *BitArray
  838. }
  839. func (m *CommitStepMessage) String() string {
  840. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  841. }
  842. //-------------------------------------
  843. type ProposalMessage struct {
  844. Proposal *types.Proposal
  845. }
  846. func (m *ProposalMessage) String() string {
  847. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  848. }
  849. //-------------------------------------
  850. type ProposalPOLMessage struct {
  851. Height int
  852. ProposalPOLRound int
  853. ProposalPOL *BitArray
  854. }
  855. func (m *ProposalPOLMessage) String() string {
  856. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  857. }
  858. //-------------------------------------
  859. type BlockPartMessage struct {
  860. Height int
  861. Round int
  862. Part *types.Part
  863. }
  864. func (m *BlockPartMessage) String() string {
  865. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  866. }
  867. //-------------------------------------
  868. type VoteMessage struct {
  869. ValidatorIndex int
  870. Vote *types.Vote
  871. }
  872. func (m *VoteMessage) String() string {
  873. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  874. }
  875. //-------------------------------------
  876. type HasVoteMessage struct {
  877. Height int
  878. Round int
  879. Type byte
  880. Index int
  881. }
  882. func (m *HasVoteMessage) String() string {
  883. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  884. }