You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1010 lines
29 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. bc "github.com/tendermint/tendermint/blockchain"
  10. . "github.com/tendermint/tendermint/common"
  11. "github.com/tendermint/tendermint/events"
  12. "github.com/tendermint/tendermint/p2p"
  13. sm "github.com/tendermint/tendermint/state"
  14. "github.com/tendermint/tendermint/types"
  15. "github.com/tendermint/tendermint/wire"
  16. )
  17. const (
  18. StateChannel = byte(0x20)
  19. DataChannel = byte(0x21)
  20. VoteChannel = byte(0x22)
  21. PeerStateKey = "ConsensusReactor.peerState"
  22. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  23. )
  24. //-----------------------------------------------------------------------------
  25. type ConsensusReactor struct {
  26. p2p.BaseReactor
  27. blockStore *bc.BlockStore
  28. conS *ConsensusState
  29. fastSync bool
  30. evsw events.Fireable
  31. }
  32. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
  33. conR := &ConsensusReactor{
  34. blockStore: blockStore,
  35. conS: consensusState,
  36. fastSync: fastSync,
  37. }
  38. conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR)
  39. return conR
  40. }
  41. func (conR *ConsensusReactor) OnStart() error {
  42. log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
  43. conR.BaseReactor.OnStart()
  44. if !conR.fastSync {
  45. _, err := conR.conS.Start()
  46. if err != nil {
  47. return err
  48. }
  49. }
  50. go conR.broadcastNewRoundStepRoutine()
  51. return nil
  52. }
  53. func (conR *ConsensusReactor) OnStop() {
  54. conR.BaseReactor.OnStop()
  55. conR.conS.Stop()
  56. }
  57. // Switch from the fast_sync to the consensus:
  58. // reset the state, turn off fast_sync, start the consensus-state-machine
  59. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  60. log.Notice("SwitchToConsensus")
  61. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  62. // broadcast a NewRoundStepMessage.
  63. conR.conS.updateToState(state)
  64. conR.fastSync = false
  65. conR.conS.Start()
  66. }
  67. // Implements Reactor
  68. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  69. // TODO optimize
  70. return []*p2p.ChannelDescriptor{
  71. &p2p.ChannelDescriptor{
  72. ID: StateChannel,
  73. Priority: 5,
  74. SendQueueCapacity: 100,
  75. },
  76. &p2p.ChannelDescriptor{
  77. ID: DataChannel,
  78. Priority: 5,
  79. SendQueueCapacity: 2,
  80. },
  81. &p2p.ChannelDescriptor{
  82. ID: VoteChannel,
  83. Priority: 5,
  84. SendQueueCapacity: 40,
  85. },
  86. }
  87. }
  88. // Implements Reactor
  89. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  90. if !conR.IsRunning() {
  91. return
  92. }
  93. // Create peerState for peer
  94. peerState := NewPeerState(peer)
  95. peer.Data.Set(PeerStateKey, peerState)
  96. // Begin gossip routines for this peer.
  97. go conR.gossipDataRoutine(peer, peerState)
  98. go conR.gossipVotesRoutine(peer, peerState)
  99. // Send our state to peer.
  100. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  101. if !conR.fastSync {
  102. conR.sendNewRoundStepMessage(peer)
  103. }
  104. }
  105. // Implements Reactor
  106. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. if !conR.IsRunning() {
  108. return
  109. }
  110. // TODO
  111. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  112. }
  113. // Implements Reactor
  114. // NOTE: We process these messages even when we're fast_syncing.
  115. func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
  116. if !conR.IsRunning() {
  117. log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes)
  118. return
  119. }
  120. // Get peer states
  121. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  122. _, msg, err := DecodeMessage(msgBytes)
  123. if err != nil {
  124. log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
  125. // TODO punish peer?
  126. return
  127. }
  128. log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg)
  129. switch chID {
  130. case StateChannel:
  131. switch msg := msg.(type) {
  132. case *NewRoundStepMessage:
  133. ps.ApplyNewRoundStepMessage(msg)
  134. case *CommitStepMessage:
  135. ps.ApplyCommitStepMessage(msg)
  136. case *HasVoteMessage:
  137. ps.ApplyHasVoteMessage(msg)
  138. default:
  139. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  140. }
  141. case DataChannel:
  142. if conR.fastSync {
  143. log.Warn("Ignoring message received during fastSync", "msg", msg)
  144. return
  145. }
  146. switch msg := msg.(type) {
  147. case *ProposalMessage:
  148. ps.SetHasProposal(msg.Proposal)
  149. err = conR.conS.SetProposal(msg.Proposal)
  150. case *ProposalPOLMessage:
  151. ps.ApplyProposalPOLMessage(msg)
  152. case *BlockPartMessage:
  153. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  154. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
  155. default:
  156. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  157. }
  158. case VoteChannel:
  159. if conR.fastSync {
  160. log.Warn("Ignoring message received during fastSync", "msg", msg)
  161. return
  162. }
  163. switch msg := msg.(type) {
  164. case *VoteMessage:
  165. vote, valIndex := msg.Vote, msg.ValidatorIndex
  166. // attempt to add the vote and dupeout the validator if its a duplicate signature
  167. added, err := conR.conS.TryAddVote(valIndex, vote, peer.Key)
  168. if err == ErrAddingVote {
  169. // TODO: punish peer
  170. } else if err != nil {
  171. return
  172. }
  173. cs := conR.conS
  174. cs.mtx.Lock()
  175. height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
  176. cs.mtx.Unlock()
  177. ps.EnsureVoteBitArrays(height, valSize)
  178. ps.EnsureVoteBitArrays(height-1, lastCommitSize)
  179. ps.SetHasVote(vote, valIndex)
  180. if added {
  181. // If rs.Height == vote.Height && rs.Round < vote.Round,
  182. // the peer is sending us CatchupCommit precommits.
  183. // We could make note of this and help filter in broadcastHasVoteMessage().
  184. conR.broadcastHasVoteMessage(vote, valIndex)
  185. }
  186. default:
  187. // don't punish (leave room for soft upgrades)
  188. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  189. }
  190. default:
  191. log.Warn(Fmt("Unknown channel %X", chID))
  192. }
  193. if err != nil {
  194. log.Warn("Error in Receive()", "error", err)
  195. }
  196. }
  197. // Broadcasts HasVoteMessage to peers that care.
  198. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  199. msg := &HasVoteMessage{
  200. Height: vote.Height,
  201. Round: vote.Round,
  202. Type: vote.Type,
  203. Index: index,
  204. }
  205. conR.Switch.Broadcast(StateChannel, msg)
  206. /*
  207. // TODO: Make this broadcast more selective.
  208. for _, peer := range conR.Switch.Peers().List() {
  209. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  210. prs := ps.GetRoundState()
  211. if prs.Height == vote.Height {
  212. // TODO: Also filter on round?
  213. peer.TrySend(StateChannel, msg)
  214. } else {
  215. // Height doesn't match
  216. // TODO: check a field, maybe CatchupCommitRound?
  217. // TODO: But that requires changing the struct field comment.
  218. }
  219. }
  220. */
  221. }
  222. // Sets our private validator account for signing votes.
  223. func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
  224. conR.conS.SetPrivValidator(priv)
  225. }
  226. // implements events.Eventable
  227. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  228. conR.evsw = evsw
  229. conR.conS.SetFireable(evsw)
  230. }
  231. //--------------------------------------
  232. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  233. nrsMsg = &NewRoundStepMessage{
  234. Height: rs.Height,
  235. Round: rs.Round,
  236. Step: rs.Step,
  237. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  238. LastCommitRound: rs.LastCommit.Round(),
  239. }
  240. if rs.Step == RoundStepCommit {
  241. csMsg = &CommitStepMessage{
  242. Height: rs.Height,
  243. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  244. BlockParts: rs.ProposalBlockParts.BitArray(),
  245. }
  246. }
  247. return
  248. }
  249. // Listens for changes to the ConsensusState.Step by pulling
  250. // on conR.conS.NewStepCh().
  251. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  252. for {
  253. // Get RoundState with new Step or quit.
  254. var rs *RoundState
  255. select {
  256. case rs = <-conR.conS.NewStepCh():
  257. case <-conR.Quit:
  258. return
  259. }
  260. nrsMsg, csMsg := makeRoundStepMessages(rs)
  261. if nrsMsg != nil {
  262. conR.Switch.Broadcast(StateChannel, nrsMsg)
  263. }
  264. if csMsg != nil {
  265. conR.Switch.Broadcast(StateChannel, csMsg)
  266. }
  267. }
  268. }
  269. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  270. rs := conR.conS.GetRoundState()
  271. nrsMsg, csMsg := makeRoundStepMessages(rs)
  272. if nrsMsg != nil {
  273. peer.Send(StateChannel, nrsMsg)
  274. }
  275. if csMsg != nil {
  276. peer.Send(StateChannel, csMsg)
  277. }
  278. }
  279. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  280. log := log.New("peer", peer.Key)
  281. OUTER_LOOP:
  282. for {
  283. // Manage disconnects from self or peer.
  284. if !peer.IsRunning() || !conR.IsRunning() {
  285. log.Notice(Fmt("Stopping gossipDataRoutine for %v.", peer))
  286. return
  287. }
  288. rs := conR.conS.GetRoundState()
  289. prs := ps.GetRoundState()
  290. // Send proposal Block parts?
  291. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  292. //log.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  293. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  294. part := rs.ProposalBlockParts.GetPart(index)
  295. msg := &BlockPartMessage{
  296. Height: rs.Height, // This tells peer that this part applies to us.
  297. Round: rs.Round, // This tells peer that this part applies to us.
  298. Part: part,
  299. }
  300. peer.Send(DataChannel, msg)
  301. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  302. continue OUTER_LOOP
  303. }
  304. }
  305. // If the peer is on a previous height, help catch up.
  306. if (0 < prs.Height) && (prs.Height < rs.Height) {
  307. //log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  308. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  309. // Ensure that the peer's PartSetHeader is correct
  310. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  311. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  312. log.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  313. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  314. time.Sleep(peerGossipSleepDuration)
  315. continue OUTER_LOOP
  316. }
  317. // Load the part
  318. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  319. if part == nil {
  320. log.Warn("Could not load part", "index", index,
  321. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  322. time.Sleep(peerGossipSleepDuration)
  323. continue OUTER_LOOP
  324. }
  325. // Send the part
  326. msg := &BlockPartMessage{
  327. Height: prs.Height, // Not our height, so it doesn't matter.
  328. Round: prs.Round, // Not our height, so it doesn't matter.
  329. Part: part,
  330. }
  331. peer.Send(DataChannel, msg)
  332. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  333. continue OUTER_LOOP
  334. } else {
  335. //log.Info("No parts to send in catch-up, sleeping")
  336. time.Sleep(peerGossipSleepDuration)
  337. continue OUTER_LOOP
  338. }
  339. }
  340. // If height and round don't match, sleep.
  341. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  342. //log.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  343. time.Sleep(peerGossipSleepDuration)
  344. continue OUTER_LOOP
  345. }
  346. // By here, height and round match.
  347. // Proposal block parts were already matched and sent if any were wanted.
  348. // (These can match on hash so the round doesn't matter)
  349. // Now consider sending other things, like the Proposal itself.
  350. // Send Proposal && ProposalPOL BitArray?
  351. if rs.Proposal != nil && !prs.Proposal {
  352. // Proposal
  353. {
  354. msg := &ProposalMessage{Proposal: rs.Proposal}
  355. peer.Send(DataChannel, msg)
  356. ps.SetHasProposal(rs.Proposal)
  357. }
  358. // ProposalPOL.
  359. // Peer must receive ProposalMessage first.
  360. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  361. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  362. if 0 <= rs.Proposal.POLRound {
  363. msg := &ProposalPOLMessage{
  364. Height: rs.Height,
  365. ProposalPOLRound: rs.Proposal.POLRound,
  366. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  367. }
  368. peer.Send(DataChannel, msg)
  369. }
  370. continue OUTER_LOOP
  371. }
  372. // Nothing to do. Sleep.
  373. time.Sleep(peerGossipSleepDuration)
  374. continue OUTER_LOOP
  375. }
  376. }
  377. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  378. log := log.New("peer", peer.Key)
  379. // Simple hack to throttle logs upon sleep.
  380. var sleeping = 0
  381. OUTER_LOOP:
  382. for {
  383. // Manage disconnects from self or peer.
  384. if !peer.IsRunning() || !conR.IsRunning() {
  385. log.Notice(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  386. return
  387. }
  388. rs := conR.conS.GetRoundState()
  389. prs := ps.GetRoundState()
  390. switch sleeping {
  391. case 1: // First sleep
  392. sleeping = 2
  393. case 2: // No more sleep
  394. sleeping = 0
  395. }
  396. log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  397. "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  398. // If height matches, then send LastCommit, Prevotes, Precommits.
  399. if rs.Height == prs.Height {
  400. // If there are lastCommits to send...
  401. if prs.Step == RoundStepNewHeight {
  402. if ps.PickSendVote(rs.LastCommit) {
  403. log.Info("Picked rs.LastCommit to send")
  404. continue OUTER_LOOP
  405. }
  406. }
  407. // If there are prevotes to send...
  408. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  409. if ps.PickSendVote(rs.Votes.Prevotes(rs.Round)) {
  410. log.Info("Picked rs.Prevotes(rs.Round) to send")
  411. continue OUTER_LOOP
  412. }
  413. }
  414. // If there are precommits to send...
  415. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  416. if ps.PickSendVote(rs.Votes.Precommits(rs.Round)) {
  417. log.Info("Picked rs.Precommits(rs.Round) to send")
  418. continue OUTER_LOOP
  419. }
  420. }
  421. // If there are prevotes to send for the last round...
  422. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrevote {
  423. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  424. log.Info("Picked rs.Prevotes(prs.Round) to send")
  425. continue OUTER_LOOP
  426. }
  427. }
  428. // If there are precommits to send for the last round...
  429. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrecommit {
  430. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  431. log.Info("Picked rs.Precommits(prs.Round) to send")
  432. continue OUTER_LOOP
  433. }
  434. }
  435. // If there are POLPrevotes to send...
  436. if 0 <= prs.ProposalPOLRound {
  437. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  438. if ps.PickSendVote(polPrevotes) {
  439. log.Info("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  440. continue OUTER_LOOP
  441. }
  442. }
  443. }
  444. }
  445. // Special catchup logic.
  446. // If peer is lagging by height 1, send LastCommit.
  447. if prs.Height != 0 && rs.Height == prs.Height+1 {
  448. if ps.PickSendVote(rs.LastCommit) {
  449. log.Info("Picked rs.LastCommit to send")
  450. continue OUTER_LOOP
  451. }
  452. }
  453. // Catchup logic
  454. // If peer is lagging by more than 1, send Validation.
  455. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  456. // Load the block validation for prs.Height,
  457. // which contains precommit signatures for prs.Height.
  458. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  459. log.Info("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  460. if ps.PickSendVote(validation) {
  461. log.Info("Picked Catchup validation to send")
  462. continue OUTER_LOOP
  463. }
  464. }
  465. if sleeping == 0 {
  466. // We sent nothing. Sleep...
  467. sleeping = 1
  468. log.Info("No votes to send, sleeping", "peer", peer,
  469. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  470. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  471. } else if sleeping == 2 {
  472. // Continued sleep...
  473. sleeping = 1
  474. }
  475. time.Sleep(peerGossipSleepDuration)
  476. continue OUTER_LOOP
  477. }
  478. }
  479. //-----------------------------------------------------------------------------
  480. // Read only when returned by PeerState.GetRoundState().
  481. type PeerRoundState struct {
  482. Height int // Height peer is at
  483. Round int // Round peer is at, -1 if unknown.
  484. Step RoundStepType // Step peer is at
  485. StartTime time.Time // Estimated start of round 0 at this height
  486. Proposal bool // True if peer has proposal for this round
  487. ProposalBlockPartsHeader types.PartSetHeader //
  488. ProposalBlockParts *BitArray //
  489. ProposalPOLRound int // Proposal's POL round. -1 if none.
  490. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  491. Prevotes *BitArray // All votes peer has for this round
  492. Precommits *BitArray // All precommits peer has for this round
  493. LastCommitRound int // Round of commit for last height. -1 if none.
  494. LastCommit *BitArray // All commit precommits of commit for last height.
  495. CatchupCommitRound int // Round that we believe commit round is. -1 if none.
  496. CatchupCommit *BitArray // All commit precommits peer has for this height
  497. }
  498. //-----------------------------------------------------------------------------
  499. var (
  500. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  501. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  502. )
  503. type PeerState struct {
  504. Peer *p2p.Peer
  505. mtx sync.Mutex
  506. PeerRoundState
  507. }
  508. func NewPeerState(peer *p2p.Peer) *PeerState {
  509. return &PeerState{
  510. Peer: peer,
  511. PeerRoundState: PeerRoundState{
  512. Round: -1,
  513. ProposalPOLRound: -1,
  514. LastCommitRound: -1,
  515. CatchupCommitRound: -1,
  516. },
  517. }
  518. }
  519. // Returns an atomic snapshot of the PeerRoundState.
  520. // There's no point in mutating it since it won't change PeerState.
  521. func (ps *PeerState) GetRoundState() *PeerRoundState {
  522. ps.mtx.Lock()
  523. defer ps.mtx.Unlock()
  524. prs := ps.PeerRoundState // copy
  525. return &prs
  526. }
  527. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  528. ps.mtx.Lock()
  529. defer ps.mtx.Unlock()
  530. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  531. return
  532. }
  533. if ps.Proposal {
  534. return
  535. }
  536. ps.Proposal = true
  537. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  538. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  539. ps.ProposalPOLRound = proposal.POLRound
  540. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  541. }
  542. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  543. ps.mtx.Lock()
  544. defer ps.mtx.Unlock()
  545. if ps.Height != height || ps.Round != round {
  546. return
  547. }
  548. ps.ProposalBlockParts.SetIndex(index, true)
  549. }
  550. // Convenience function to send vote to peer.
  551. // Returns true if vote was sent.
  552. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  553. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  554. msg := &VoteMessage{index, vote}
  555. ps.Peer.Send(VoteChannel, msg)
  556. return true
  557. }
  558. return false
  559. }
  560. // votes: Must be the correct Size() for the Height().
  561. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  562. ps.mtx.Lock()
  563. defer ps.mtx.Unlock()
  564. if votes.Size() == 0 {
  565. return 0, nil, false
  566. }
  567. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  568. // Lazily set data using 'votes'.
  569. if votes.IsCommit() {
  570. ps.ensureCatchupCommitRound(height, round, size)
  571. }
  572. ps.ensureVoteBitArrays(height, size)
  573. psVotes := ps.getVoteBitArray(height, round, type_)
  574. if psVotes == nil {
  575. return 0, nil, false // Not something worth sending
  576. }
  577. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  578. ps.setHasVote(height, round, type_, index)
  579. return index, votes.GetByIndex(index), true
  580. }
  581. return 0, nil, false
  582. }
  583. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  584. if ps.Height == height {
  585. if ps.Round == round {
  586. switch type_ {
  587. case types.VoteTypePrevote:
  588. return ps.Prevotes
  589. case types.VoteTypePrecommit:
  590. return ps.Precommits
  591. default:
  592. PanicSanity(Fmt("Unexpected vote type %X", type_))
  593. }
  594. }
  595. if ps.CatchupCommitRound == round {
  596. switch type_ {
  597. case types.VoteTypePrevote:
  598. return nil
  599. case types.VoteTypePrecommit:
  600. return ps.CatchupCommit
  601. default:
  602. PanicSanity(Fmt("Unexpected vote type %X", type_))
  603. }
  604. }
  605. return nil
  606. }
  607. if ps.Height == height+1 {
  608. if ps.LastCommitRound == round {
  609. switch type_ {
  610. case types.VoteTypePrevote:
  611. return nil
  612. case types.VoteTypePrecommit:
  613. return ps.LastCommit
  614. default:
  615. PanicSanity(Fmt("Unexpected vote type %X", type_))
  616. }
  617. }
  618. return nil
  619. }
  620. return nil
  621. }
  622. // NOTE: 'round' is what we know to be the commit round for height.
  623. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  624. if ps.Height != height {
  625. return
  626. }
  627. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  628. PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  629. }
  630. if ps.CatchupCommitRound == round {
  631. return // Nothing to do!
  632. }
  633. ps.CatchupCommitRound = round
  634. if round == ps.Round {
  635. ps.CatchupCommit = ps.Precommits
  636. } else {
  637. ps.CatchupCommit = NewBitArray(numValidators)
  638. }
  639. }
  640. // NOTE: It's important to make sure that numValidators actually matches
  641. // what the node sees as the number of validators for height.
  642. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  643. ps.mtx.Lock()
  644. defer ps.mtx.Unlock()
  645. ps.ensureVoteBitArrays(height, numValidators)
  646. }
  647. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  648. if ps.Height == height {
  649. if ps.Prevotes == nil {
  650. ps.Prevotes = NewBitArray(numValidators)
  651. }
  652. if ps.Precommits == nil {
  653. ps.Precommits = NewBitArray(numValidators)
  654. }
  655. if ps.CatchupCommit == nil {
  656. ps.CatchupCommit = NewBitArray(numValidators)
  657. }
  658. if ps.ProposalPOL == nil {
  659. ps.ProposalPOL = NewBitArray(numValidators)
  660. }
  661. } else if ps.Height == height+1 {
  662. if ps.LastCommit == nil {
  663. ps.LastCommit = NewBitArray(numValidators)
  664. }
  665. }
  666. }
  667. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  668. ps.mtx.Lock()
  669. defer ps.mtx.Unlock()
  670. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  671. }
  672. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  673. log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
  674. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  675. PanicSanity("Invalid vote type")
  676. }
  677. if ps.Height == height {
  678. if ps.Round == round {
  679. switch type_ {
  680. case types.VoteTypePrevote:
  681. ps.Prevotes.SetIndex(index, true)
  682. log.Info("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  683. case types.VoteTypePrecommit:
  684. ps.Precommits.SetIndex(index, true)
  685. log.Info("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  686. }
  687. } else if ps.CatchupCommitRound == round {
  688. switch type_ {
  689. case types.VoteTypePrevote:
  690. case types.VoteTypePrecommit:
  691. ps.CatchupCommit.SetIndex(index, true)
  692. log.Info("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  693. }
  694. } else if ps.ProposalPOLRound == round {
  695. switch type_ {
  696. case types.VoteTypePrevote:
  697. ps.ProposalPOL.SetIndex(index, true)
  698. log.Info("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  699. case types.VoteTypePrecommit:
  700. }
  701. }
  702. } else if ps.Height == height+1 {
  703. if ps.LastCommitRound == round {
  704. switch type_ {
  705. case types.VoteTypePrevote:
  706. case types.VoteTypePrecommit:
  707. ps.LastCommit.SetIndex(index, true)
  708. log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  709. }
  710. }
  711. } else {
  712. // Does not apply.
  713. }
  714. }
  715. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  716. ps.mtx.Lock()
  717. defer ps.mtx.Unlock()
  718. // Ignore duplicate messages.
  719. if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step {
  720. return
  721. }
  722. // Just remember these values.
  723. psHeight := ps.Height
  724. psRound := ps.Round
  725. //psStep := ps.Step
  726. psCatchupCommitRound := ps.CatchupCommitRound
  727. psCatchupCommit := ps.CatchupCommit
  728. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  729. ps.Height = msg.Height
  730. ps.Round = msg.Round
  731. ps.Step = msg.Step
  732. ps.StartTime = startTime
  733. if psHeight != msg.Height || psRound != msg.Round {
  734. ps.Proposal = false
  735. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  736. ps.ProposalBlockParts = nil
  737. ps.ProposalPOLRound = -1
  738. ps.ProposalPOL = nil
  739. // We'll update the BitArray capacity later.
  740. ps.Prevotes = nil
  741. ps.Precommits = nil
  742. }
  743. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  744. // Peer caught up to CatchupCommitRound.
  745. // Preserve psCatchupCommit!
  746. // NOTE: We prefer to use prs.Precommits if
  747. // pr.Round matches pr.CatchupCommitRound.
  748. ps.Precommits = psCatchupCommit
  749. }
  750. if psHeight != msg.Height {
  751. // Shift Precommits to LastCommit.
  752. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  753. ps.LastCommitRound = msg.LastCommitRound
  754. ps.LastCommit = ps.Precommits
  755. } else {
  756. ps.LastCommitRound = msg.LastCommitRound
  757. ps.LastCommit = nil
  758. }
  759. // We'll update the BitArray capacity later.
  760. ps.CatchupCommitRound = -1
  761. ps.CatchupCommit = nil
  762. }
  763. }
  764. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  765. ps.mtx.Lock()
  766. defer ps.mtx.Unlock()
  767. if ps.Height != msg.Height {
  768. return
  769. }
  770. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  771. ps.ProposalBlockParts = msg.BlockParts
  772. }
  773. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  774. ps.mtx.Lock()
  775. defer ps.mtx.Unlock()
  776. if ps.Height != msg.Height {
  777. return
  778. }
  779. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  780. }
  781. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  782. ps.mtx.Lock()
  783. defer ps.mtx.Unlock()
  784. if ps.Height != msg.Height {
  785. return
  786. }
  787. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  788. return
  789. }
  790. // TODO: Merge onto existing ps.ProposalPOL?
  791. // We might have sent some prevotes in the meantime.
  792. ps.ProposalPOL = msg.ProposalPOL
  793. }
  794. //-----------------------------------------------------------------------------
  795. // Messages
  796. const (
  797. msgTypeNewRoundStep = byte(0x01)
  798. msgTypeCommitStep = byte(0x02)
  799. msgTypeProposal = byte(0x11)
  800. msgTypeProposalPOL = byte(0x12)
  801. msgTypeBlockPart = byte(0x13) // both block & POL
  802. msgTypeVote = byte(0x14)
  803. msgTypeHasVote = byte(0x15)
  804. )
  805. type ConsensusMessage interface{}
  806. var _ = wire.RegisterInterface(
  807. struct{ ConsensusMessage }{},
  808. wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  809. wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  810. wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  811. wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  812. wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  813. wire.ConcreteType{&VoteMessage{}, msgTypeVote},
  814. wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  815. )
  816. // TODO: check for unnecessary extra bytes at the end.
  817. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  818. msgType = bz[0]
  819. n := new(int64)
  820. r := bytes.NewReader(bz)
  821. msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  822. return
  823. }
  824. //-------------------------------------
  825. // For every height/round/step transition
  826. type NewRoundStepMessage struct {
  827. Height int
  828. Round int
  829. Step RoundStepType
  830. SecondsSinceStartTime int
  831. LastCommitRound int
  832. }
  833. func (m *NewRoundStepMessage) String() string {
  834. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  835. m.Height, m.Round, m.Step, m.LastCommitRound)
  836. }
  837. //-------------------------------------
  838. type CommitStepMessage struct {
  839. Height int
  840. BlockPartsHeader types.PartSetHeader
  841. BlockParts *BitArray
  842. }
  843. func (m *CommitStepMessage) String() string {
  844. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  845. }
  846. //-------------------------------------
  847. type ProposalMessage struct {
  848. Proposal *types.Proposal
  849. }
  850. func (m *ProposalMessage) String() string {
  851. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  852. }
  853. //-------------------------------------
  854. type ProposalPOLMessage struct {
  855. Height int
  856. ProposalPOLRound int
  857. ProposalPOL *BitArray
  858. }
  859. func (m *ProposalPOLMessage) String() string {
  860. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  861. }
  862. //-------------------------------------
  863. type BlockPartMessage struct {
  864. Height int
  865. Round int
  866. Part *types.Part
  867. }
  868. func (m *BlockPartMessage) String() string {
  869. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  870. }
  871. //-------------------------------------
  872. type VoteMessage struct {
  873. ValidatorIndex int
  874. Vote *types.Vote
  875. }
  876. func (m *VoteMessage) String() string {
  877. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  878. }
  879. //-------------------------------------
  880. type HasVoteMessage struct {
  881. Height int
  882. Round int
  883. Type byte
  884. Index int
  885. }
  886. func (m *HasVoteMessage) String() string {
  887. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  888. }