You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

843 lines
23 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/tendermint/tendermint/binary"
  10. . "github.com/tendermint/tendermint/common"
  11. . "github.com/tendermint/tendermint/consensus/types"
  12. "github.com/tendermint/tendermint/p2p"
  13. sm "github.com/tendermint/tendermint/state"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. StateCh = byte(0x20)
  18. DataCh = byte(0x21)
  19. VoteCh = byte(0x22)
  20. peerStateKey = "ConsensusReactor.peerState"
  21. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  22. )
  23. //-----------------------------------------------------------------------------
  24. type ConsensusReactor struct {
  25. sw *p2p.Switch
  26. started uint32
  27. stopped uint32
  28. quit chan struct{}
  29. blockStore *types.BlockStore
  30. conS *ConsensusState
  31. }
  32. func NewConsensusReactor(consensusState *ConsensusState, blockStore *types.BlockStore) *ConsensusReactor {
  33. conR := &ConsensusReactor{
  34. blockStore: blockStore,
  35. quit: make(chan struct{}),
  36. conS: consensusState,
  37. }
  38. return conR
  39. }
  40. // Implements Reactor
  41. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  42. if atomic.CompareAndSwapUint32(&conR.started, 0, 1) {
  43. log.Info("Starting ConsensusReactor")
  44. conR.sw = sw
  45. conR.conS.Start()
  46. go conR.broadcastNewRoundStepRoutine()
  47. }
  48. }
  49. // Implements Reactor
  50. func (conR *ConsensusReactor) Stop() {
  51. if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) {
  52. log.Info("Stopping ConsensusReactor")
  53. conR.conS.Stop()
  54. close(conR.quit)
  55. }
  56. }
  57. func (conR *ConsensusReactor) IsStopped() bool {
  58. return atomic.LoadUint32(&conR.stopped) == 1
  59. }
  60. // Implements Reactor
  61. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  62. // TODO optimize
  63. return []*p2p.ChannelDescriptor{
  64. &p2p.ChannelDescriptor{
  65. Id: StateCh,
  66. Priority: 5,
  67. },
  68. &p2p.ChannelDescriptor{
  69. Id: DataCh,
  70. Priority: 5,
  71. },
  72. &p2p.ChannelDescriptor{
  73. Id: VoteCh,
  74. Priority: 5,
  75. },
  76. }
  77. }
  78. // Implements Reactor
  79. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  80. // Create peerState for peer
  81. peerState := NewPeerState(peer)
  82. peer.Data.Set(peerStateKey, peerState)
  83. // Begin gossip routines for this peer.
  84. go conR.gossipDataRoutine(peer, peerState)
  85. go conR.gossipVotesRoutine(peer, peerState)
  86. // Send our state to peer.
  87. conR.sendNewRoundStepRoutine(peer)
  88. }
  89. // Implements Reactor
  90. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  91. //peer.Data.Get(peerStateKey).(*PeerState).Disconnect()
  92. }
  93. // Implements Reactor
  94. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  95. // Get round state
  96. rs := conR.conS.GetRoundState()
  97. ps := peer.Data.Get(peerStateKey).(*PeerState)
  98. _, msg_, err := DecodeMessage(msgBytes)
  99. if err != nil {
  100. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  101. return
  102. }
  103. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes)
  104. switch chId {
  105. case StateCh:
  106. switch msg := msg_.(type) {
  107. case *NewRoundStepMessage:
  108. ps.ApplyNewRoundStepMessage(msg, rs)
  109. case *CommitStepMessage:
  110. ps.ApplyCommitStepMessage(msg)
  111. case *HasVoteMessage:
  112. ps.ApplyHasVoteMessage(msg)
  113. default:
  114. // Ignore unknown message
  115. }
  116. case DataCh:
  117. switch msg := msg_.(type) {
  118. case *Proposal:
  119. ps.SetHasProposal(msg)
  120. err = conR.conS.SetProposal(msg)
  121. case *PartMessage:
  122. if msg.Type == partTypeProposalBlock {
  123. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  124. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  125. } else if msg.Type == partTypeProposalPOL {
  126. ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
  127. _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
  128. } else {
  129. // Ignore unknown part type
  130. }
  131. default:
  132. // Ignore unknown message
  133. }
  134. case VoteCh:
  135. switch msg := msg_.(type) {
  136. case *VoteMessage:
  137. vote := msg.Vote
  138. if rs.Height != vote.Height {
  139. return // Wrong height. Not necessarily a bad peer.
  140. }
  141. validatorIndex := msg.ValidatorIndex
  142. address, _ := rs.Validators.GetByIndex(validatorIndex)
  143. added, index, err := conR.conS.AddVote(address, vote)
  144. if err != nil {
  145. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  146. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  147. log.Warn("Found conflicting vote. Publish evidence")
  148. evidenceTx := &types.DupeoutTx{
  149. Address: address,
  150. VoteA: *errDupe.VoteA,
  151. VoteB: *errDupe.VoteB,
  152. }
  153. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  154. } else {
  155. // Probably an invalid signature. Bad peer.
  156. log.Warn("Error attempting to add vote", "error", err)
  157. // TODO: punish peer
  158. }
  159. }
  160. // Initialize Prevotes/Precommits/Commits if needed
  161. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  162. ps.SetHasVote(vote, index)
  163. if added {
  164. msg := &HasVoteMessage{
  165. Height: vote.Height,
  166. Round: vote.Round,
  167. Type: vote.Type,
  168. Index: index,
  169. }
  170. conR.sw.Broadcast(StateCh, msg)
  171. }
  172. default:
  173. // Ignore unknown message
  174. }
  175. default:
  176. // Ignore unknown channel
  177. }
  178. if err != nil {
  179. log.Warn("Error in Receive()", "error", err)
  180. }
  181. }
  182. // Sets our private validator account for signing votes.
  183. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  184. conR.conS.SetPrivValidator(priv)
  185. }
  186. //--------------------------------------
  187. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  188. // Get seconds since beginning of height.
  189. timeElapsed := time.Now().Sub(rs.StartTime)
  190. // Broadcast NewRoundStepMessage
  191. nrsMsg = &NewRoundStepMessage{
  192. Height: rs.Height,
  193. Round: rs.Round,
  194. Step: rs.Step,
  195. SecondsSinceStartTime: uint(timeElapsed.Seconds()),
  196. }
  197. // If the step is commit, then also broadcast a CommitStepMessage.
  198. if rs.Step == RoundStepCommit {
  199. csMsg = &CommitStepMessage{
  200. Height: rs.Height,
  201. BlockParts: rs.ProposalBlockParts.Header(),
  202. BlockBitArray: rs.ProposalBlockParts.BitArray(),
  203. }
  204. }
  205. return
  206. }
  207. // Listens for changes to the ConsensusState.Step by pulling
  208. // on conR.conS.NewStepCh().
  209. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  210. for {
  211. // Get RoundState with new Step or quit.
  212. var rs *RoundState
  213. select {
  214. case rs = <-conR.conS.NewStepCh():
  215. case <-conR.quit:
  216. return
  217. }
  218. nrsMsg, csMsg := makeRoundStepMessages(rs)
  219. if nrsMsg != nil {
  220. conR.sw.Broadcast(StateCh, nrsMsg)
  221. }
  222. if csMsg != nil {
  223. conR.sw.Broadcast(StateCh, csMsg)
  224. }
  225. }
  226. }
  227. func (conR *ConsensusReactor) sendNewRoundStepRoutine(peer *p2p.Peer) {
  228. rs := conR.conS.GetRoundState()
  229. nrsMsg, csMsg := makeRoundStepMessages(rs)
  230. if nrsMsg != nil {
  231. peer.Send(StateCh, nrsMsg)
  232. }
  233. if csMsg != nil {
  234. peer.Send(StateCh, nrsMsg)
  235. }
  236. }
  237. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  238. OUTER_LOOP:
  239. for {
  240. // Manage disconnects from self or peer.
  241. if peer.IsStopped() || conR.IsStopped() {
  242. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  243. return
  244. }
  245. rs := conR.conS.GetRoundState()
  246. prs := ps.GetRoundState()
  247. // Send proposal Block parts?
  248. // NOTE: if we or peer is at RoundStepCommit*, the round
  249. // won't necessarily match, but that's OK.
  250. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
  251. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  252. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
  253. part := rs.ProposalBlockParts.GetPart(index)
  254. msg := &PartMessage{
  255. Height: rs.Height,
  256. Round: rs.Round,
  257. Type: partTypeProposalBlock,
  258. Part: part,
  259. }
  260. peer.Send(DataCh, msg)
  261. ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
  262. continue OUTER_LOOP
  263. }
  264. }
  265. // If the peer is on a previous height, help catch up.
  266. if 0 < prs.Height && prs.Height < rs.Height {
  267. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
  268. if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
  269. // Ensure that the peer's PartSetHeaeder is correct
  270. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  271. if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
  272. log.Debug("Peer ProposalBlockParts mismatch, sleeping",
  273. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  274. time.Sleep(peerGossipSleepDuration)
  275. continue OUTER_LOOP
  276. }
  277. // Load the part
  278. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  279. if part == nil {
  280. log.Warn("Could not load part", "index", index,
  281. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  282. time.Sleep(peerGossipSleepDuration)
  283. continue OUTER_LOOP
  284. }
  285. // Send the part
  286. msg := &PartMessage{
  287. Height: prs.Height,
  288. Round: prs.Round,
  289. Type: partTypeProposalBlock,
  290. Part: part,
  291. }
  292. peer.Send(DataCh, msg)
  293. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  294. continue OUTER_LOOP
  295. } else {
  296. //log.Debug("No parts to send in catch-up, sleeping")
  297. time.Sleep(peerGossipSleepDuration)
  298. continue OUTER_LOOP
  299. }
  300. }
  301. // If height and round don't match, sleep.
  302. if rs.Height != prs.Height || rs.Round != prs.Round {
  303. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  304. time.Sleep(peerGossipSleepDuration)
  305. continue OUTER_LOOP
  306. }
  307. // Send proposal?
  308. if rs.Proposal != nil && !prs.Proposal {
  309. msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal}
  310. peer.Send(DataCh, msg)
  311. ps.SetHasProposal(rs.Proposal)
  312. continue OUTER_LOOP
  313. }
  314. // Send proposal POL parts?
  315. if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) {
  316. if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok {
  317. msg := &PartMessage{
  318. Height: rs.Height,
  319. Round: rs.Round,
  320. Type: partTypeProposalPOL,
  321. Part: rs.ProposalPOLParts.GetPart(index),
  322. }
  323. peer.Send(DataCh, msg)
  324. ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
  325. continue OUTER_LOOP
  326. }
  327. }
  328. // Nothing to do. Sleep.
  329. time.Sleep(peerGossipSleepDuration)
  330. continue OUTER_LOOP
  331. }
  332. }
  333. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  334. OUTER_LOOP:
  335. for {
  336. // Manage disconnects from self or peer.
  337. if peer.IsStopped() || conR.IsStopped() {
  338. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  339. return
  340. }
  341. rs := conR.conS.GetRoundState()
  342. prs := ps.GetRoundState()
  343. trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) {
  344. if prs.Height == voteSet.Height() {
  345. // Initialize Prevotes/Precommits/Commits if needed
  346. ps.EnsureVoteBitArrays(prs.Height, voteSet.Size())
  347. }
  348. // TODO: give priority to our vote.
  349. if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok {
  350. vote := voteSet.GetByIndex(index)
  351. // NOTE: vote may be a commit.
  352. msg := &VoteMessage{index, vote}
  353. peer.Send(VoteCh, msg)
  354. ps.SetHasVote(vote, index)
  355. return true
  356. }
  357. return false
  358. }
  359. trySendCommitFromValidation := func(blockMeta *types.BlockMeta, validation *types.Validation, peerVoteSet BitArray) (sent bool) {
  360. // Initialize Commits if needed
  361. ps.EnsureVoteBitArrays(prs.Height, uint(len(validation.Commits)))
  362. if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
  363. commit := validation.Commits[index]
  364. log.Debug("Picked commit to send", "index", index, "commit", commit)
  365. // Reconstruct vote.
  366. vote := &types.Vote{
  367. Height: prs.Height,
  368. Round: commit.Round,
  369. Type: types.VoteTypeCommit,
  370. BlockHash: blockMeta.Hash,
  371. BlockParts: blockMeta.Parts,
  372. Signature: commit.Signature,
  373. }
  374. msg := &VoteMessage{index, vote}
  375. peer.Send(VoteCh, msg)
  376. ps.SetHasVote(vote, index)
  377. return true
  378. }
  379. return false
  380. }
  381. // If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
  382. if rs.Height == prs.Height {
  383. // If there are lastcommits to send...
  384. if prs.Round == 0 && prs.Step == RoundStepNewHeight {
  385. if prs.LastCommits.Size() == rs.LastCommits.Size() {
  386. if trySendVote(rs.LastCommits, prs.LastCommits) {
  387. continue OUTER_LOOP
  388. }
  389. }
  390. }
  391. // If there are prevotes to send...
  392. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  393. if trySendVote(rs.Prevotes, prs.Prevotes) {
  394. continue OUTER_LOOP
  395. }
  396. }
  397. // If there are precommits to send...
  398. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  399. if trySendVote(rs.Precommits, prs.Precommits) {
  400. continue OUTER_LOOP
  401. }
  402. }
  403. // If there are any commits to send...
  404. if trySendVote(rs.Commits, prs.Commits) {
  405. continue OUTER_LOOP
  406. }
  407. }
  408. // Catchup logic
  409. if prs.Height != 0 && !prs.HasAllCatchupCommits {
  410. // If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits.
  411. if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 {
  412. // If there are lastcommits to send...
  413. if trySendVote(rs.LastCommits, prs.Commits) {
  414. continue OUTER_LOOP
  415. } else {
  416. ps.SetHasAllCatchupCommits(prs.Height)
  417. }
  418. }
  419. // Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation.
  420. if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 {
  421. // Load the blockMeta for block at prs.Height
  422. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  423. // Load the seen validation for prs.Height
  424. validation := conR.blockStore.LoadSeenValidation(prs.Height)
  425. log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation)
  426. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  427. continue OUTER_LOOP
  428. } else {
  429. ps.SetHasAllCatchupCommits(prs.Height)
  430. }
  431. }
  432. // If peer is lagging by more than 1, send Validation.
  433. if rs.Height >= prs.Height+2 {
  434. // Load the blockMeta for block at prs.Height
  435. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  436. // Load the block validation for prs.Height+1,
  437. // which contains commit signatures for prs.Height.
  438. validation := conR.blockStore.LoadBlockValidation(prs.Height + 1)
  439. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height+1, "blockMeta", blockMeta, "validation", validation)
  440. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  441. continue OUTER_LOOP
  442. } else {
  443. ps.SetHasAllCatchupCommits(prs.Height)
  444. }
  445. }
  446. }
  447. // We sent nothing. Sleep...
  448. time.Sleep(peerGossipSleepDuration)
  449. continue OUTER_LOOP
  450. }
  451. }
  452. //-----------------------------------------------------------------------------
  453. // Read only when returned by PeerState.GetRoundState().
  454. type PeerRoundState struct {
  455. Height uint // Height peer is at
  456. Round uint // Round peer is at
  457. Step RoundStep // Step peer is at
  458. StartTime time.Time // Estimated start of round 0 at this height
  459. Proposal bool // True if peer has proposal for this round
  460. ProposalBlockParts types.PartSetHeader //
  461. ProposalBlockBitArray BitArray // True bit -> has part
  462. ProposalPOLParts types.PartSetHeader //
  463. ProposalPOLBitArray BitArray // True bit -> has part
  464. Prevotes BitArray // All votes peer has for this round
  465. Precommits BitArray // All precommits peer has for this round
  466. Commits BitArray // All commits peer has for this height
  467. LastCommits BitArray // All commits peer has for last height
  468. HasAllCatchupCommits bool // Used for catch-up
  469. }
  470. //-----------------------------------------------------------------------------
  471. var (
  472. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  473. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  474. )
  475. type PeerState struct {
  476. mtx sync.Mutex
  477. PeerRoundState
  478. }
  479. func NewPeerState(peer *p2p.Peer) *PeerState {
  480. return &PeerState{}
  481. }
  482. // Returns an atomic snapshot of the PeerRoundState.
  483. // There's no point in mutating it since it won't change PeerState.
  484. func (ps *PeerState) GetRoundState() *PeerRoundState {
  485. ps.mtx.Lock()
  486. defer ps.mtx.Unlock()
  487. prs := ps.PeerRoundState // copy
  488. return &prs
  489. }
  490. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  491. ps.mtx.Lock()
  492. defer ps.mtx.Unlock()
  493. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  494. return
  495. }
  496. if ps.Proposal {
  497. return
  498. }
  499. ps.Proposal = true
  500. ps.ProposalBlockParts = proposal.BlockParts
  501. ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
  502. ps.ProposalPOLParts = proposal.POLParts
  503. ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
  504. }
  505. func (ps *PeerState) SetHasProposalBlockPart(height uint, round uint, index uint) {
  506. ps.mtx.Lock()
  507. defer ps.mtx.Unlock()
  508. if ps.Height != height || ps.Round != round {
  509. return
  510. }
  511. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  512. }
  513. func (ps *PeerState) SetHasProposalPOLPart(height uint, round uint, index uint) {
  514. ps.mtx.Lock()
  515. defer ps.mtx.Unlock()
  516. if ps.Height != height || ps.Round != round {
  517. return
  518. }
  519. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  520. }
  521. func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) {
  522. ps.mtx.Lock()
  523. defer ps.mtx.Unlock()
  524. if ps.Height != height {
  525. return
  526. }
  527. if ps.Prevotes.IsZero() {
  528. ps.Prevotes = NewBitArray(numValidators)
  529. }
  530. if ps.Precommits.IsZero() {
  531. ps.Precommits = NewBitArray(numValidators)
  532. }
  533. if ps.Commits.IsZero() {
  534. ps.Commits = NewBitArray(numValidators)
  535. }
  536. }
  537. func (ps *PeerState) SetHasVote(vote *types.Vote, index uint) {
  538. ps.mtx.Lock()
  539. defer ps.mtx.Unlock()
  540. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  541. }
  542. func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) {
  543. if ps.Height == height+1 && type_ == types.VoteTypeCommit {
  544. // Special case for LastCommits.
  545. ps.LastCommits.SetIndex(index, true)
  546. return
  547. } else if ps.Height != height {
  548. // Does not apply.
  549. return
  550. }
  551. switch type_ {
  552. case types.VoteTypePrevote:
  553. ps.Prevotes.SetIndex(index, true)
  554. case types.VoteTypePrecommit:
  555. ps.Precommits.SetIndex(index, true)
  556. case types.VoteTypeCommit:
  557. if round < ps.Round {
  558. ps.Prevotes.SetIndex(index, true)
  559. ps.Precommits.SetIndex(index, true)
  560. }
  561. ps.Commits.SetIndex(index, true)
  562. default:
  563. panic("Invalid vote type")
  564. }
  565. }
  566. // When catching up, this helps keep track of whether
  567. // we should send more commit votes from the block (validation) store
  568. func (ps *PeerState) SetHasAllCatchupCommits(height uint) {
  569. ps.mtx.Lock()
  570. defer ps.mtx.Unlock()
  571. if ps.Height == height {
  572. ps.HasAllCatchupCommits = true
  573. }
  574. }
  575. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  576. ps.mtx.Lock()
  577. defer ps.mtx.Unlock()
  578. // Just remember these values.
  579. psHeight := ps.Height
  580. psRound := ps.Round
  581. //psStep := ps.Step
  582. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  583. ps.Height = msg.Height
  584. ps.Round = msg.Round
  585. ps.Step = msg.Step
  586. ps.StartTime = startTime
  587. if psHeight != msg.Height || psRound != msg.Round {
  588. ps.Proposal = false
  589. ps.ProposalBlockParts = types.PartSetHeader{}
  590. ps.ProposalBlockBitArray = BitArray{}
  591. ps.ProposalPOLParts = types.PartSetHeader{}
  592. ps.ProposalPOLBitArray = BitArray{}
  593. // We'll update the BitArray capacity later.
  594. ps.Prevotes = BitArray{}
  595. ps.Precommits = BitArray{}
  596. }
  597. if psHeight != msg.Height {
  598. // Shift Commits to LastCommits
  599. if psHeight+1 == msg.Height {
  600. ps.LastCommits = ps.Commits
  601. } else {
  602. ps.LastCommits = BitArray{}
  603. }
  604. // We'll update the BitArray capacity later.
  605. ps.Commits = BitArray{}
  606. ps.HasAllCatchupCommits = false
  607. }
  608. }
  609. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  610. ps.mtx.Lock()
  611. defer ps.mtx.Unlock()
  612. if ps.Height != msg.Height {
  613. return
  614. }
  615. ps.ProposalBlockParts = msg.BlockParts
  616. ps.ProposalBlockBitArray = msg.BlockBitArray
  617. }
  618. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  619. ps.mtx.Lock()
  620. defer ps.mtx.Unlock()
  621. // Special case for LastCommits
  622. if ps.Height == msg.Height+1 && msg.Type == types.VoteTypeCommit {
  623. ps.LastCommits.SetIndex(msg.Index, true)
  624. return
  625. } else if ps.Height != msg.Height {
  626. return
  627. }
  628. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  629. }
  630. //-----------------------------------------------------------------------------
  631. // Messages
  632. const (
  633. msgTypeUnknown = byte(0x00)
  634. msgTypeNewRoundStep = byte(0x01)
  635. msgTypeCommitStep = byte(0x02)
  636. msgTypeProposal = byte(0x11)
  637. msgTypePart = byte(0x12) // both block & POL
  638. msgTypeVote = byte(0x13)
  639. msgTypeHasVote = byte(0x14)
  640. )
  641. // TODO: check for unnecessary extra bytes at the end.
  642. func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
  643. n := new(int64)
  644. // log.Debug(Fmt("decoding msg bytes: %X", bz))
  645. msgType = bz[0]
  646. r := bytes.NewReader(bz)
  647. switch msgType {
  648. // Messages for communicating state changes
  649. case msgTypeNewRoundStep:
  650. msg = binary.ReadBinary(&NewRoundStepMessage{}, r, n, &err)
  651. case msgTypeCommitStep:
  652. msg = binary.ReadBinary(&CommitStepMessage{}, r, n, &err)
  653. // Messages of data
  654. case msgTypeProposal:
  655. r.ReadByte() // Consume the byte
  656. msg = binary.ReadBinary(&Proposal{}, r, n, &err)
  657. case msgTypePart:
  658. msg = binary.ReadBinary(&PartMessage{}, r, n, &err)
  659. case msgTypeVote:
  660. msg = binary.ReadBinary(&VoteMessage{}, r, n, &err)
  661. case msgTypeHasVote:
  662. msg = binary.ReadBinary(&HasVoteMessage{}, r, n, &err)
  663. default:
  664. msg = nil
  665. }
  666. return
  667. }
  668. //-------------------------------------
  669. type NewRoundStepMessage struct {
  670. Height uint
  671. Round uint
  672. Step RoundStep
  673. SecondsSinceStartTime uint
  674. }
  675. func (m *NewRoundStepMessage) TypeByte() byte { return msgTypeNewRoundStep }
  676. func (m *NewRoundStepMessage) String() string {
  677. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v]", m.Height, m.Round, m.Step)
  678. }
  679. //-------------------------------------
  680. type CommitStepMessage struct {
  681. Height uint
  682. BlockParts types.PartSetHeader
  683. BlockBitArray BitArray
  684. }
  685. func (m *CommitStepMessage) TypeByte() byte { return msgTypeCommitStep }
  686. func (m *CommitStepMessage) String() string {
  687. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockParts, m.BlockBitArray)
  688. }
  689. //-------------------------------------
  690. const (
  691. partTypeProposalBlock = byte(0x01)
  692. partTypeProposalPOL = byte(0x02)
  693. )
  694. type PartMessage struct {
  695. Height uint
  696. Round uint
  697. Type byte
  698. Part *types.Part
  699. }
  700. func (m *PartMessage) TypeByte() byte { return msgTypePart }
  701. func (m *PartMessage) String() string {
  702. return fmt.Sprintf("[Part H:%v R:%v T:%X P:%v]", m.Height, m.Round, m.Type, m.Part)
  703. }
  704. //-------------------------------------
  705. type VoteMessage struct {
  706. ValidatorIndex uint
  707. Vote *types.Vote
  708. }
  709. func (m *VoteMessage) TypeByte() byte { return msgTypeVote }
  710. func (m *VoteMessage) String() string {
  711. return fmt.Sprintf("[Vote VI:%v V:%v]", m.ValidatorIndex, m.Vote)
  712. }
  713. //-------------------------------------
  714. type HasVoteMessage struct {
  715. Height uint
  716. Round uint
  717. Type byte
  718. Index uint
  719. }
  720. func (m *HasVoteMessage) TypeByte() byte { return msgTypeHasVote }
  721. func (m *HasVoteMessage) String() string {
  722. return fmt.Sprintf("[HasVote %v/%v T:%X]", m.Height, m.Round, m.Type)
  723. }