You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

852 lines
23 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/tendermint/tendermint/binary"
  10. blk "github.com/tendermint/tendermint/block"
  11. . "github.com/tendermint/tendermint/common"
  12. . "github.com/tendermint/tendermint/consensus/types"
  13. "github.com/tendermint/tendermint/p2p"
  14. sm "github.com/tendermint/tendermint/state"
  15. )
  16. const (
  17. StateCh = byte(0x20)
  18. DataCh = byte(0x21)
  19. VoteCh = byte(0x22)
  20. peerStateKey = "ConsensusReactor.peerState"
  21. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  22. )
  23. //-----------------------------------------------------------------------------
  24. type ConsensusReactor struct {
  25. sw *p2p.Switch
  26. started uint32
  27. stopped uint32
  28. quit chan struct{}
  29. blockStore *blk.BlockStore
  30. conS *ConsensusState
  31. }
  32. func NewConsensusReactor(consensusState *ConsensusState, blockStore *blk.BlockStore) *ConsensusReactor {
  33. conR := &ConsensusReactor{
  34. blockStore: blockStore,
  35. quit: make(chan struct{}),
  36. conS: consensusState,
  37. }
  38. return conR
  39. }
  40. // Implements Reactor
  41. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  42. if atomic.CompareAndSwapUint32(&conR.started, 0, 1) {
  43. log.Info("Starting ConsensusReactor")
  44. conR.sw = sw
  45. conR.conS.Start()
  46. go conR.broadcastNewRoundStepRoutine()
  47. }
  48. }
  49. // Implements Reactor
  50. func (conR *ConsensusReactor) Stop() {
  51. if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) {
  52. log.Info("Stopping ConsensusReactor")
  53. conR.conS.Stop()
  54. close(conR.quit)
  55. }
  56. }
  57. func (conR *ConsensusReactor) IsStopped() bool {
  58. return atomic.LoadUint32(&conR.stopped) == 1
  59. }
  60. // Implements Reactor
  61. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  62. // TODO optimize
  63. return []*p2p.ChannelDescriptor{
  64. &p2p.ChannelDescriptor{
  65. Id: StateCh,
  66. Priority: 5,
  67. },
  68. &p2p.ChannelDescriptor{
  69. Id: DataCh,
  70. Priority: 5,
  71. },
  72. &p2p.ChannelDescriptor{
  73. Id: VoteCh,
  74. Priority: 5,
  75. },
  76. }
  77. }
  78. // Implements Reactor
  79. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  80. // Create peerState for peer
  81. peerState := NewPeerState(peer)
  82. peer.Data.Set(peerStateKey, peerState)
  83. // Begin gossip routines for this peer.
  84. go conR.gossipDataRoutine(peer, peerState)
  85. go conR.gossipVotesRoutine(peer, peerState)
  86. // Send our state to peer.
  87. conR.sendNewRoundStepRoutine(peer)
  88. }
  89. // Implements Reactor
  90. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  91. //peer.Data.Get(peerStateKey).(*PeerState).Disconnect()
  92. }
  93. // Implements Reactor
  94. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  95. // Get round state
  96. rs := conR.conS.GetRoundState()
  97. ps := peer.Data.Get(peerStateKey).(*PeerState)
  98. _, msg_, err := DecodeMessage(msgBytes)
  99. if err != nil {
  100. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  101. return
  102. }
  103. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes)
  104. switch chId {
  105. case StateCh:
  106. switch msg_.(type) {
  107. case *NewRoundStepMessage:
  108. msg := msg_.(*NewRoundStepMessage)
  109. ps.ApplyNewRoundStepMessage(msg, rs)
  110. case *CommitStepMessage:
  111. msg := msg_.(*CommitStepMessage)
  112. ps.ApplyCommitStepMessage(msg)
  113. case *HasVoteMessage:
  114. msg := msg_.(*HasVoteMessage)
  115. ps.ApplyHasVoteMessage(msg)
  116. default:
  117. // Ignore unknown message
  118. }
  119. case DataCh:
  120. switch msg_.(type) {
  121. case *Proposal:
  122. proposal := msg_.(*Proposal)
  123. ps.SetHasProposal(proposal)
  124. err = conR.conS.SetProposal(proposal)
  125. case *PartMessage:
  126. msg := msg_.(*PartMessage)
  127. if msg.Type == partTypeProposalBlock {
  128. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  129. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  130. } else if msg.Type == partTypeProposalPOL {
  131. ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
  132. _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
  133. } else {
  134. // Ignore unknown part type
  135. }
  136. default:
  137. // Ignore unknown message
  138. }
  139. case VoteCh:
  140. switch msg_.(type) {
  141. case *VoteMessage:
  142. voteMessage := msg_.(*VoteMessage)
  143. vote := voteMessage.Vote
  144. if rs.Height != vote.Height {
  145. return // Wrong height. Not necessarily a bad peer.
  146. }
  147. validatorIndex := voteMessage.ValidatorIndex
  148. address, _ := rs.Validators.GetByIndex(validatorIndex)
  149. added, index, err := conR.conS.AddVote(address, vote)
  150. if err != nil {
  151. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  152. if errDupe, ok := err.(*blk.ErrVoteConflictingSignature); ok {
  153. log.Warn("Found conflicting vote. Publish evidence")
  154. evidenceTx := &blk.DupeoutTx{
  155. Address: address,
  156. VoteA: *errDupe.VoteA,
  157. VoteB: *errDupe.VoteB,
  158. }
  159. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  160. } else {
  161. // Probably an invalid signature. Bad peer.
  162. log.Warn("Error attempting to add vote", "error", err)
  163. // TODO: punish peer
  164. }
  165. }
  166. // Initialize Prevotes/Precommits/Commits if needed
  167. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  168. ps.SetHasVote(vote, index)
  169. if added {
  170. msg := &HasVoteMessage{
  171. Height: vote.Height,
  172. Round: vote.Round,
  173. Type: vote.Type,
  174. Index: index,
  175. }
  176. conR.sw.Broadcast(StateCh, msg)
  177. }
  178. default:
  179. // Ignore unknown message
  180. }
  181. default:
  182. // Ignore unknown channel
  183. }
  184. if err != nil {
  185. log.Warn("Error in Receive()", "error", err)
  186. }
  187. }
  188. // Sets our private validator account for signing votes.
  189. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  190. conR.conS.SetPrivValidator(priv)
  191. }
  192. //--------------------------------------
  193. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  194. // Get seconds since beginning of height.
  195. timeElapsed := time.Now().Sub(rs.StartTime)
  196. // Broadcast NewRoundStepMessage
  197. nrsMsg = &NewRoundStepMessage{
  198. Height: rs.Height,
  199. Round: rs.Round,
  200. Step: rs.Step,
  201. SecondsSinceStartTime: uint(timeElapsed.Seconds()),
  202. }
  203. // If the step is commit, then also broadcast a CommitStepMessage.
  204. if rs.Step == RoundStepCommit {
  205. csMsg = &CommitStepMessage{
  206. Height: rs.Height,
  207. BlockParts: rs.ProposalBlockParts.Header(),
  208. BlockBitArray: rs.ProposalBlockParts.BitArray(),
  209. }
  210. }
  211. return
  212. }
  213. // Listens for changes to the ConsensusState.Step by pulling
  214. // on conR.conS.NewStepCh().
  215. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  216. for {
  217. // Get RoundState with new Step or quit.
  218. var rs *RoundState
  219. select {
  220. case rs = <-conR.conS.NewStepCh():
  221. case <-conR.quit:
  222. return
  223. }
  224. nrsMsg, csMsg := makeRoundStepMessages(rs)
  225. if nrsMsg != nil {
  226. conR.sw.Broadcast(StateCh, nrsMsg)
  227. }
  228. if csMsg != nil {
  229. conR.sw.Broadcast(StateCh, csMsg)
  230. }
  231. }
  232. }
  233. func (conR *ConsensusReactor) sendNewRoundStepRoutine(peer *p2p.Peer) {
  234. rs := conR.conS.GetRoundState()
  235. nrsMsg, csMsg := makeRoundStepMessages(rs)
  236. if nrsMsg != nil {
  237. peer.Send(StateCh, nrsMsg)
  238. }
  239. if csMsg != nil {
  240. peer.Send(StateCh, nrsMsg)
  241. }
  242. }
  243. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  244. OUTER_LOOP:
  245. for {
  246. // Manage disconnects from self or peer.
  247. if peer.IsStopped() || conR.IsStopped() {
  248. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  249. return
  250. }
  251. rs := conR.conS.GetRoundState()
  252. prs := ps.GetRoundState()
  253. // Send proposal Block parts?
  254. // NOTE: if we or peer is at RoundStepCommit*, the round
  255. // won't necessarily match, but that's OK.
  256. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
  257. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  258. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
  259. part := rs.ProposalBlockParts.GetPart(index)
  260. msg := &PartMessage{
  261. Height: rs.Height,
  262. Round: rs.Round,
  263. Type: partTypeProposalBlock,
  264. Part: part,
  265. }
  266. peer.Send(DataCh, msg)
  267. ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
  268. continue OUTER_LOOP
  269. }
  270. }
  271. // If the peer is on a previous height, help catch up.
  272. if 0 < prs.Height && prs.Height < rs.Height {
  273. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
  274. if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
  275. // Ensure that the peer's PartSetHeaeder is correct
  276. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  277. if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
  278. log.Debug("Peer ProposalBlockParts mismatch, sleeping",
  279. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  280. time.Sleep(peerGossipSleepDuration)
  281. continue OUTER_LOOP
  282. }
  283. // Load the part
  284. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  285. if part == nil {
  286. log.Warn("Could not load part", "index", index,
  287. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  288. time.Sleep(peerGossipSleepDuration)
  289. continue OUTER_LOOP
  290. }
  291. // Send the part
  292. msg := &PartMessage{
  293. Height: prs.Height,
  294. Round: prs.Round,
  295. Type: partTypeProposalBlock,
  296. Part: part,
  297. }
  298. peer.Send(DataCh, msg)
  299. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  300. continue OUTER_LOOP
  301. } else {
  302. //log.Debug("No parts to send in catch-up, sleeping")
  303. time.Sleep(peerGossipSleepDuration)
  304. continue OUTER_LOOP
  305. }
  306. }
  307. // If height and round don't match, sleep.
  308. if rs.Height != prs.Height || rs.Round != prs.Round {
  309. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  310. time.Sleep(peerGossipSleepDuration)
  311. continue OUTER_LOOP
  312. }
  313. // Send proposal?
  314. if rs.Proposal != nil && !prs.Proposal {
  315. msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal}
  316. peer.Send(DataCh, msg)
  317. ps.SetHasProposal(rs.Proposal)
  318. continue OUTER_LOOP
  319. }
  320. // Send proposal POL parts?
  321. if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) {
  322. if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok {
  323. msg := &PartMessage{
  324. Height: rs.Height,
  325. Round: rs.Round,
  326. Type: partTypeProposalPOL,
  327. Part: rs.ProposalPOLParts.GetPart(index),
  328. }
  329. peer.Send(DataCh, msg)
  330. ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
  331. continue OUTER_LOOP
  332. }
  333. }
  334. // Nothing to do. Sleep.
  335. time.Sleep(peerGossipSleepDuration)
  336. continue OUTER_LOOP
  337. }
  338. }
  339. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  340. OUTER_LOOP:
  341. for {
  342. // Manage disconnects from self or peer.
  343. if peer.IsStopped() || conR.IsStopped() {
  344. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  345. return
  346. }
  347. rs := conR.conS.GetRoundState()
  348. prs := ps.GetRoundState()
  349. trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) {
  350. if prs.Height == voteSet.Height() {
  351. // Initialize Prevotes/Precommits/Commits if needed
  352. ps.EnsureVoteBitArrays(prs.Height, voteSet.Size())
  353. }
  354. // TODO: give priority to our vote.
  355. if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok {
  356. vote := voteSet.GetByIndex(index)
  357. // NOTE: vote may be a commit.
  358. msg := &VoteMessage{index, vote}
  359. peer.Send(VoteCh, msg)
  360. ps.SetHasVote(vote, index)
  361. return true
  362. }
  363. return false
  364. }
  365. trySendCommitFromValidation := func(blockMeta *blk.BlockMeta, validation *blk.Validation, peerVoteSet BitArray) (sent bool) {
  366. // Initialize Commits if needed
  367. ps.EnsureVoteBitArrays(prs.Height, uint(len(validation.Commits)))
  368. if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
  369. commit := validation.Commits[index]
  370. log.Debug("Picked commit to send", "index", index, "commit", commit)
  371. // Reconstruct vote.
  372. vote := &blk.Vote{
  373. Height: prs.Height,
  374. Round: commit.Round,
  375. Type: blk.VoteTypeCommit,
  376. BlockHash: blockMeta.Hash,
  377. BlockParts: blockMeta.Parts,
  378. Signature: commit.Signature,
  379. }
  380. msg := &VoteMessage{index, vote}
  381. peer.Send(VoteCh, msg)
  382. ps.SetHasVote(vote, index)
  383. return true
  384. }
  385. return false
  386. }
  387. // If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
  388. if rs.Height == prs.Height {
  389. // If there are lastcommits to send...
  390. if prs.Round == 0 && prs.Step == RoundStepNewHeight {
  391. if prs.LastCommits.Size() == rs.LastCommits.Size() {
  392. if trySendVote(rs.LastCommits, prs.LastCommits) {
  393. continue OUTER_LOOP
  394. }
  395. }
  396. }
  397. // If there are prevotes to send...
  398. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  399. if trySendVote(rs.Prevotes, prs.Prevotes) {
  400. continue OUTER_LOOP
  401. }
  402. }
  403. // If there are precommits to send...
  404. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  405. if trySendVote(rs.Precommits, prs.Precommits) {
  406. continue OUTER_LOOP
  407. }
  408. }
  409. // If there are any commits to send...
  410. if trySendVote(rs.Commits, prs.Commits) {
  411. continue OUTER_LOOP
  412. }
  413. }
  414. // Catchup logic
  415. if prs.Height != 0 && !prs.HasAllCatchupCommits {
  416. // If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits.
  417. if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 {
  418. // If there are lastcommits to send...
  419. if trySendVote(rs.LastCommits, prs.Commits) {
  420. continue OUTER_LOOP
  421. } else {
  422. ps.SetHasAllCatchupCommits(prs.Height)
  423. }
  424. }
  425. // Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation.
  426. if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 {
  427. // Load the blockMeta for block at prs.Height
  428. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  429. // Load the seen validation for prs.Height
  430. validation := conR.blockStore.LoadSeenValidation(prs.Height)
  431. log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation)
  432. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  433. continue OUTER_LOOP
  434. } else {
  435. ps.SetHasAllCatchupCommits(prs.Height)
  436. }
  437. }
  438. // If peer is lagging by more than 1, send Validation.
  439. if rs.Height >= prs.Height+2 {
  440. // Load the blockMeta for block at prs.Height
  441. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  442. // Load the block validation for prs.Height+1,
  443. // which contains commit signatures for prs.Height.
  444. validation := conR.blockStore.LoadBlockValidation(prs.Height + 1)
  445. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height+1, "blockMeta", blockMeta, "validation", validation)
  446. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  447. continue OUTER_LOOP
  448. } else {
  449. ps.SetHasAllCatchupCommits(prs.Height)
  450. }
  451. }
  452. }
  453. // We sent nothing. Sleep...
  454. time.Sleep(peerGossipSleepDuration)
  455. continue OUTER_LOOP
  456. }
  457. }
  458. //-----------------------------------------------------------------------------
  459. // Read only when returned by PeerState.GetRoundState().
  460. type PeerRoundState struct {
  461. Height uint // Height peer is at
  462. Round uint // Round peer is at
  463. Step RoundStep // Step peer is at
  464. StartTime time.Time // Estimated start of round 0 at this height
  465. Proposal bool // True if peer has proposal for this round
  466. ProposalBlockParts blk.PartSetHeader //
  467. ProposalBlockBitArray BitArray // True bit -> has part
  468. ProposalPOLParts blk.PartSetHeader //
  469. ProposalPOLBitArray BitArray // True bit -> has part
  470. Prevotes BitArray // All votes peer has for this round
  471. Precommits BitArray // All precommits peer has for this round
  472. Commits BitArray // All commits peer has for this height
  473. LastCommits BitArray // All commits peer has for last height
  474. HasAllCatchupCommits bool // Used for catch-up
  475. }
  476. //-----------------------------------------------------------------------------
  477. var (
  478. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  479. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  480. )
  481. type PeerState struct {
  482. mtx sync.Mutex
  483. PeerRoundState
  484. }
  485. func NewPeerState(peer *p2p.Peer) *PeerState {
  486. return &PeerState{}
  487. }
  488. // Returns an atomic snapshot of the PeerRoundState.
  489. // There's no point in mutating it since it won't change PeerState.
  490. func (ps *PeerState) GetRoundState() *PeerRoundState {
  491. ps.mtx.Lock()
  492. defer ps.mtx.Unlock()
  493. prs := ps.PeerRoundState // copy
  494. return &prs
  495. }
  496. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  497. ps.mtx.Lock()
  498. defer ps.mtx.Unlock()
  499. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  500. return
  501. }
  502. if ps.Proposal {
  503. return
  504. }
  505. ps.Proposal = true
  506. ps.ProposalBlockParts = proposal.BlockParts
  507. ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
  508. ps.ProposalPOLParts = proposal.POLParts
  509. ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
  510. }
  511. func (ps *PeerState) SetHasProposalBlockPart(height uint, round uint, index uint) {
  512. ps.mtx.Lock()
  513. defer ps.mtx.Unlock()
  514. if ps.Height != height || ps.Round != round {
  515. return
  516. }
  517. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  518. }
  519. func (ps *PeerState) SetHasProposalPOLPart(height uint, round uint, index uint) {
  520. ps.mtx.Lock()
  521. defer ps.mtx.Unlock()
  522. if ps.Height != height || ps.Round != round {
  523. return
  524. }
  525. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  526. }
  527. func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) {
  528. ps.mtx.Lock()
  529. defer ps.mtx.Unlock()
  530. if ps.Height != height {
  531. return
  532. }
  533. if ps.Prevotes.IsZero() {
  534. ps.Prevotes = NewBitArray(numValidators)
  535. }
  536. if ps.Precommits.IsZero() {
  537. ps.Precommits = NewBitArray(numValidators)
  538. }
  539. if ps.Commits.IsZero() {
  540. ps.Commits = NewBitArray(numValidators)
  541. }
  542. }
  543. func (ps *PeerState) SetHasVote(vote *blk.Vote, index uint) {
  544. ps.mtx.Lock()
  545. defer ps.mtx.Unlock()
  546. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  547. }
  548. func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) {
  549. if ps.Height == height+1 && type_ == blk.VoteTypeCommit {
  550. // Special case for LastCommits.
  551. ps.LastCommits.SetIndex(index, true)
  552. return
  553. } else if ps.Height != height {
  554. // Does not apply.
  555. return
  556. }
  557. switch type_ {
  558. case blk.VoteTypePrevote:
  559. ps.Prevotes.SetIndex(index, true)
  560. case blk.VoteTypePrecommit:
  561. ps.Precommits.SetIndex(index, true)
  562. case blk.VoteTypeCommit:
  563. if round < ps.Round {
  564. ps.Prevotes.SetIndex(index, true)
  565. ps.Precommits.SetIndex(index, true)
  566. }
  567. ps.Commits.SetIndex(index, true)
  568. default:
  569. panic("Invalid vote type")
  570. }
  571. }
  572. // When catching up, this helps keep track of whether
  573. // we should send more commit votes from the block (validation) store
  574. func (ps *PeerState) SetHasAllCatchupCommits(height uint) {
  575. ps.mtx.Lock()
  576. defer ps.mtx.Unlock()
  577. if ps.Height == height {
  578. ps.HasAllCatchupCommits = true
  579. }
  580. }
  581. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  582. ps.mtx.Lock()
  583. defer ps.mtx.Unlock()
  584. // Just remember these values.
  585. psHeight := ps.Height
  586. psRound := ps.Round
  587. //psStep := ps.Step
  588. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  589. ps.Height = msg.Height
  590. ps.Round = msg.Round
  591. ps.Step = msg.Step
  592. ps.StartTime = startTime
  593. if psHeight != msg.Height || psRound != msg.Round {
  594. ps.Proposal = false
  595. ps.ProposalBlockParts = blk.PartSetHeader{}
  596. ps.ProposalBlockBitArray = BitArray{}
  597. ps.ProposalPOLParts = blk.PartSetHeader{}
  598. ps.ProposalPOLBitArray = BitArray{}
  599. // We'll update the BitArray capacity later.
  600. ps.Prevotes = BitArray{}
  601. ps.Precommits = BitArray{}
  602. }
  603. if psHeight != msg.Height {
  604. // Shift Commits to LastCommits
  605. if psHeight+1 == msg.Height {
  606. ps.LastCommits = ps.Commits
  607. } else {
  608. ps.LastCommits = BitArray{}
  609. }
  610. // We'll update the BitArray capacity later.
  611. ps.Commits = BitArray{}
  612. ps.HasAllCatchupCommits = false
  613. }
  614. }
  615. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  616. ps.mtx.Lock()
  617. defer ps.mtx.Unlock()
  618. if ps.Height != msg.Height {
  619. return
  620. }
  621. ps.ProposalBlockParts = msg.BlockParts
  622. ps.ProposalBlockBitArray = msg.BlockBitArray
  623. }
  624. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  625. ps.mtx.Lock()
  626. defer ps.mtx.Unlock()
  627. // Special case for LastCommits
  628. if ps.Height == msg.Height+1 && msg.Type == blk.VoteTypeCommit {
  629. ps.LastCommits.SetIndex(msg.Index, true)
  630. return
  631. } else if ps.Height != msg.Height {
  632. return
  633. }
  634. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  635. }
  636. //-----------------------------------------------------------------------------
  637. // Messages
  638. const (
  639. msgTypeUnknown = byte(0x00)
  640. msgTypeNewRoundStep = byte(0x01)
  641. msgTypeCommitStep = byte(0x02)
  642. msgTypeProposal = byte(0x11)
  643. msgTypePart = byte(0x12) // both block & POL
  644. msgTypeVote = byte(0x13)
  645. msgTypeHasVote = byte(0x14)
  646. )
  647. // TODO: check for unnecessary extra bytes at the end.
  648. func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
  649. n := new(int64)
  650. // log.Debug(Fmt("decoding msg bytes: %X", bz))
  651. msgType = bz[0]
  652. r := bytes.NewReader(bz)
  653. switch msgType {
  654. // Messages for communicating state changes
  655. case msgTypeNewRoundStep:
  656. msg = binary.ReadBinary(&NewRoundStepMessage{}, r, n, &err)
  657. case msgTypeCommitStep:
  658. msg = binary.ReadBinary(&CommitStepMessage{}, r, n, &err)
  659. // Messages of data
  660. case msgTypeProposal:
  661. r.ReadByte() // Consume the byte
  662. msg = binary.ReadBinary(&Proposal{}, r, n, &err)
  663. case msgTypePart:
  664. msg = binary.ReadBinary(&PartMessage{}, r, n, &err)
  665. case msgTypeVote:
  666. msg = binary.ReadBinary(&VoteMessage{}, r, n, &err)
  667. case msgTypeHasVote:
  668. msg = binary.ReadBinary(&HasVoteMessage{}, r, n, &err)
  669. default:
  670. msg = nil
  671. }
  672. return
  673. }
  674. //-------------------------------------
  675. type NewRoundStepMessage struct {
  676. Height uint
  677. Round uint
  678. Step RoundStep
  679. SecondsSinceStartTime uint
  680. }
  681. func (m *NewRoundStepMessage) TypeByte() byte { return msgTypeNewRoundStep }
  682. func (m *NewRoundStepMessage) String() string {
  683. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v]", m.Height, m.Round, m.Step)
  684. }
  685. //-------------------------------------
  686. type CommitStepMessage struct {
  687. Height uint
  688. BlockParts blk.PartSetHeader
  689. BlockBitArray BitArray
  690. }
  691. func (m *CommitStepMessage) TypeByte() byte { return msgTypeCommitStep }
  692. func (m *CommitStepMessage) String() string {
  693. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockParts, m.BlockBitArray)
  694. }
  695. //-------------------------------------
  696. const (
  697. partTypeProposalBlock = byte(0x01)
  698. partTypeProposalPOL = byte(0x02)
  699. )
  700. type PartMessage struct {
  701. Height uint
  702. Round uint
  703. Type byte
  704. Part *blk.Part
  705. }
  706. func (m *PartMessage) TypeByte() byte { return msgTypePart }
  707. func (m *PartMessage) String() string {
  708. return fmt.Sprintf("[Part H:%v R:%v T:%X P:%v]", m.Height, m.Round, m.Type, m.Part)
  709. }
  710. //-------------------------------------
  711. type VoteMessage struct {
  712. ValidatorIndex uint
  713. Vote *blk.Vote
  714. }
  715. func (m *VoteMessage) TypeByte() byte { return msgTypeVote }
  716. func (m *VoteMessage) String() string {
  717. return fmt.Sprintf("[Vote VI:%v V:%v]", m.ValidatorIndex, m.Vote)
  718. }
  719. //-------------------------------------
  720. type HasVoteMessage struct {
  721. Height uint
  722. Round uint
  723. Type byte
  724. Index uint
  725. }
  726. func (m *HasVoteMessage) TypeByte() byte { return msgTypeHasVote }
  727. func (m *HasVoteMessage) String() string {
  728. return fmt.Sprintf("[HasVote %v/%v T:%X]", m.Height, m.Round, m.Type)
  729. }