You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

796 lines
22 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. . "github.com/tendermint/tendermint/binary"
  10. . "github.com/tendermint/tendermint/block"
  11. . "github.com/tendermint/tendermint/common"
  12. . "github.com/tendermint/tendermint/consensus/types"
  13. "github.com/tendermint/tendermint/mempool"
  14. "github.com/tendermint/tendermint/p2p"
  15. "github.com/tendermint/tendermint/state"
  16. )
  17. const (
  18. StateCh = byte(0x20)
  19. DataCh = byte(0x21)
  20. VoteCh = byte(0x22)
  21. peerStateKey = "ConsensusReactor.peerState"
  22. peerGossipSleepDuration = 1000 * time.Millisecond // Time to sleep if there's nothing to send.
  23. )
  24. //-----------------------------------------------------------------------------
  25. type ConsensusReactor struct {
  26. sw *p2p.Switch
  27. started uint32
  28. stopped uint32
  29. quit chan struct{}
  30. blockStore *BlockStore
  31. conS *ConsensusState
  32. }
  33. func NewConsensusReactor(blockStore *BlockStore, mempoolReactor *mempool.MempoolReactor, state *state.State) *ConsensusReactor {
  34. conS := NewConsensusState(state, blockStore, mempoolReactor)
  35. conR := &ConsensusReactor{
  36. blockStore: blockStore,
  37. quit: make(chan struct{}),
  38. conS: conS,
  39. }
  40. return conR
  41. }
  42. // Implements Reactor
  43. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  44. if atomic.CompareAndSwapUint32(&conR.started, 0, 1) {
  45. log.Info("Starting ConsensusReactor")
  46. conR.sw = sw
  47. conR.conS.Start()
  48. go conR.broadcastNewRoundStepRoutine()
  49. }
  50. }
  51. // Implements Reactor
  52. func (conR *ConsensusReactor) Stop() {
  53. if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) {
  54. log.Info("Stopping ConsensusReactor")
  55. conR.conS.Stop()
  56. close(conR.quit)
  57. }
  58. }
  59. func (conR *ConsensusReactor) IsStopped() bool {
  60. return atomic.LoadUint32(&conR.stopped) == 1
  61. }
  62. // Implements Reactor
  63. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  64. // TODO optimize
  65. return []*p2p.ChannelDescriptor{
  66. &p2p.ChannelDescriptor{
  67. Id: StateCh,
  68. Priority: 5,
  69. },
  70. &p2p.ChannelDescriptor{
  71. Id: DataCh,
  72. Priority: 5,
  73. },
  74. &p2p.ChannelDescriptor{
  75. Id: VoteCh,
  76. Priority: 5,
  77. },
  78. }
  79. }
  80. // Implements Reactor
  81. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  82. // Create peerState for peer
  83. peerState := NewPeerState(peer)
  84. peer.Data.Set(peerStateKey, peerState)
  85. // Begin gossip routines for this peer.
  86. go conR.gossipDataRoutine(peer, peerState)
  87. go conR.gossipVotesRoutine(peer, peerState)
  88. }
  89. // Implements Reactor
  90. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  91. //peer.Data.Get(peerStateKey).(*PeerState).Disconnect()
  92. }
  93. // Implements Reactor
  94. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  95. // Get round state
  96. rs := conR.conS.GetRoundState()
  97. ps := peer.Data.Get(peerStateKey).(*PeerState)
  98. _, msg_, err := DecodeMessage(msgBytes)
  99. if err != nil {
  100. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  101. return
  102. }
  103. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes)
  104. switch chId {
  105. case StateCh:
  106. switch msg_.(type) {
  107. case *NewRoundStepMessage:
  108. msg := msg_.(*NewRoundStepMessage)
  109. ps.ApplyNewRoundStepMessage(msg, rs)
  110. case *CommitStepMessage:
  111. msg := msg_.(*CommitStepMessage)
  112. ps.ApplyCommitStepMessage(msg)
  113. case *HasVoteMessage:
  114. msg := msg_.(*HasVoteMessage)
  115. ps.ApplyHasVoteMessage(msg)
  116. default:
  117. // Ignore unknown message
  118. }
  119. case DataCh:
  120. switch msg_.(type) {
  121. case *Proposal:
  122. proposal := msg_.(*Proposal)
  123. ps.SetHasProposal(proposal)
  124. err = conR.conS.SetProposal(proposal)
  125. case *PartMessage:
  126. msg := msg_.(*PartMessage)
  127. if msg.Type == partTypeProposalBlock {
  128. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  129. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  130. } else if msg.Type == partTypeProposalPOL {
  131. ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
  132. _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
  133. } else {
  134. // Ignore unknown part type
  135. }
  136. default:
  137. // Ignore unknown message
  138. }
  139. case VoteCh:
  140. switch msg_.(type) {
  141. case *VoteMessage:
  142. voteMessage := msg_.(*VoteMessage)
  143. vote := voteMessage.Vote
  144. if rs.Height != vote.Height {
  145. return // Wrong height. Not necessarily a bad peer.
  146. }
  147. validatorIndex := voteMessage.ValidatorIndex
  148. address, _ := rs.Validators.GetByIndex(validatorIndex)
  149. added, index, err := conR.conS.AddVote(address, vote)
  150. if err != nil {
  151. // Probably an invalid signature. Bad peer.
  152. log.Warn("Error attempting to add vote", "error", err)
  153. }
  154. // Initialize Prevotes/Precommits/Commits if needed
  155. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  156. ps.SetHasVote(vote, index)
  157. if added {
  158. msg := &HasVoteMessage{
  159. Height: vote.Height,
  160. Round: vote.Round,
  161. Type: vote.Type,
  162. Index: index,
  163. }
  164. conR.sw.Broadcast(StateCh, msg)
  165. }
  166. default:
  167. // Ignore unknown message
  168. }
  169. default:
  170. // Ignore unknown channel
  171. }
  172. if err != nil {
  173. log.Warn("Error in Receive()", "error", err)
  174. }
  175. }
  176. // Sets our private validator account for signing votes.
  177. func (conR *ConsensusReactor) SetPrivValidator(priv *state.PrivValidator) {
  178. conR.conS.SetPrivValidator(priv)
  179. }
  180. //--------------------------------------
  181. // Listens for changes to the ConsensusState.Step by pulling
  182. // on conR.conS.NewStepCh().
  183. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  184. for {
  185. // Get RoundState with new Step or quit.
  186. var rs *RoundState
  187. select {
  188. case rs = <-conR.conS.NewStepCh():
  189. case <-conR.quit:
  190. return
  191. }
  192. // Get seconds since beginning of height.
  193. // Due to the condition documented, this is safe.
  194. timeElapsed := time.Now().Sub(rs.StartTime)
  195. // Broadcast NewRoundStepMessage
  196. {
  197. msg := &NewRoundStepMessage{
  198. Height: rs.Height,
  199. Round: rs.Round,
  200. Step: rs.Step,
  201. SecondsSinceStartTime: uint(timeElapsed.Seconds()),
  202. }
  203. conR.sw.Broadcast(StateCh, msg)
  204. }
  205. // If the step is commit, then also broadcast a CommitStepMessage.
  206. if rs.Step == RoundStepCommit {
  207. msg := &CommitStepMessage{
  208. Height: rs.Height,
  209. BlockParts: rs.ProposalBlockParts.Header(),
  210. BlockBitArray: rs.ProposalBlockParts.BitArray(),
  211. }
  212. conR.sw.Broadcast(StateCh, msg)
  213. }
  214. }
  215. }
  216. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  217. OUTER_LOOP:
  218. for {
  219. // Manage disconnects from self or peer.
  220. if peer.IsStopped() || conR.IsStopped() {
  221. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  222. return
  223. }
  224. rs := conR.conS.GetRoundState()
  225. prs := ps.GetRoundState()
  226. // Send proposal Block parts?
  227. // NOTE: if we or peer is at RoundStepCommit*, the round
  228. // won't necessarily match, but that's OK.
  229. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
  230. log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  231. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
  232. part := rs.ProposalBlockParts.GetPart(index)
  233. msg := &PartMessage{
  234. Height: rs.Height,
  235. Round: rs.Round,
  236. Type: partTypeProposalBlock,
  237. Part: part,
  238. }
  239. peer.Send(DataCh, msg)
  240. ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
  241. continue OUTER_LOOP
  242. }
  243. }
  244. // If the peer is on a previous height, help catch up.
  245. if 0 < prs.Height && prs.Height < rs.Height {
  246. log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height)
  247. if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
  248. // Ensure that the peer's PartSetHeaeder is correct
  249. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  250. if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
  251. log.Debug("Peer ProposalBlockParts mismatch, sleeping",
  252. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  253. time.Sleep(peerGossipSleepDuration)
  254. continue OUTER_LOOP
  255. }
  256. // Load the part
  257. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  258. if part == nil {
  259. log.Warn("Could not load part", "index", index,
  260. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  261. time.Sleep(peerGossipSleepDuration)
  262. continue OUTER_LOOP
  263. }
  264. // Send the part
  265. msg := &PartMessage{
  266. Height: prs.Height,
  267. Round: prs.Round,
  268. Type: partTypeProposalBlock,
  269. Part: part,
  270. }
  271. peer.Send(DataCh, msg)
  272. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  273. continue OUTER_LOOP
  274. } else {
  275. log.Debug("No parts to send in catch-up, sleeping")
  276. time.Sleep(peerGossipSleepDuration)
  277. continue OUTER_LOOP
  278. }
  279. }
  280. // If height and round doesn't match, sleep.
  281. if rs.Height != prs.Height || rs.Round != prs.Round {
  282. log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  283. time.Sleep(peerGossipSleepDuration)
  284. continue OUTER_LOOP
  285. }
  286. // Send proposal?
  287. if rs.Proposal != nil && !prs.Proposal {
  288. msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal}
  289. peer.Send(DataCh, msg)
  290. ps.SetHasProposal(rs.Proposal)
  291. continue OUTER_LOOP
  292. }
  293. // Send proposal POL parts?
  294. if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) {
  295. if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok {
  296. msg := &PartMessage{
  297. Height: rs.Height,
  298. Round: rs.Round,
  299. Type: partTypeProposalPOL,
  300. Part: rs.ProposalPOLParts.GetPart(index),
  301. }
  302. peer.Send(DataCh, msg)
  303. ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
  304. continue OUTER_LOOP
  305. }
  306. }
  307. // Nothing to do. Sleep.
  308. time.Sleep(peerGossipSleepDuration)
  309. continue OUTER_LOOP
  310. }
  311. }
  312. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  313. OUTER_LOOP:
  314. for {
  315. // Manage disconnects from self or peer.
  316. if peer.IsStopped() || conR.IsStopped() {
  317. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  318. return
  319. }
  320. rs := conR.conS.GetRoundState()
  321. prs := ps.GetRoundState()
  322. trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) {
  323. // TODO: give priority to our vote.
  324. if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok {
  325. vote := voteSet.GetByIndex(index)
  326. // NOTE: vote may be a commit.
  327. msg := &VoteMessage{index, vote}
  328. peer.Send(VoteCh, msg)
  329. ps.SetHasVote(vote, index)
  330. return true
  331. }
  332. return false
  333. }
  334. // If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
  335. if rs.Height == prs.Height {
  336. // If there are lastcommits to send...
  337. if prs.Round == 0 && prs.Step == RoundStepNewHeight {
  338. if prs.LastCommits.Size() == rs.LastCommits.Size() {
  339. if trySendVote(rs.LastCommits, prs.LastCommits) {
  340. continue OUTER_LOOP
  341. }
  342. }
  343. }
  344. // Initialize Prevotes/Precommits/Commits if needed
  345. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  346. // If there are prevotes to send...
  347. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  348. if trySendVote(rs.Prevotes, prs.Prevotes) {
  349. continue OUTER_LOOP
  350. }
  351. }
  352. // If there are precommits to send...
  353. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  354. if trySendVote(rs.Precommits, prs.Precommits) {
  355. continue OUTER_LOOP
  356. }
  357. }
  358. // If there are any commits to send...
  359. if trySendVote(rs.Commits, prs.Commits) {
  360. continue OUTER_LOOP
  361. }
  362. }
  363. // If peer is lagging by height 1, match our LastCommits to peer's Commits.
  364. if rs.Height == prs.Height+1 {
  365. // Initialize Commits if needed
  366. ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommits.Size())
  367. // If there are lastcommits to send...
  368. if trySendVote(rs.LastCommits, prs.Commits) {
  369. continue OUTER_LOOP
  370. }
  371. }
  372. // If peer is lagging by more than 1, load and send Validation and send Commits.
  373. if prs.Height != 0 && !prs.HasAllCatchupCommits && rs.Height >= prs.Height+2 {
  374. // Load the block header and validation for prs.Height+1,
  375. // which contains commit signatures for prs.Height.
  376. header, validation := conR.conS.LoadHeaderValidation(prs.Height + 1)
  377. size := uint(len(validation.Commits))
  378. log.Debug("Loaded HeaderValidation for catch-up", "height", prs.Height+1, "header", header, "validation", validation, "size", size)
  379. // Initialize Commits if needed
  380. ps.EnsureVoteBitArrays(prs.Height, size)
  381. if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
  382. commit := validation.Commits[index]
  383. log.Debug("Picked commit to send", "index", index, "commit", commit)
  384. // Reconstruct vote.
  385. vote := &Vote{
  386. Height: prs.Height,
  387. Round: commit.Round,
  388. Type: VoteTypeCommit,
  389. BlockHash: header.LastBlockHash,
  390. BlockParts: header.LastBlockParts,
  391. Signature: commit.Signature,
  392. }
  393. msg := &VoteMessage{index, vote}
  394. peer.Send(VoteCh, msg)
  395. ps.SetHasVote(vote, index)
  396. continue OUTER_LOOP
  397. } else {
  398. log.Debug("No commits to send", "ours", validation.BitArray(), "theirs", prs.Commits)
  399. ps.SetHasAllCatchupCommits(prs.Height)
  400. }
  401. }
  402. // We sent nothing. Sleep...
  403. time.Sleep(peerGossipSleepDuration)
  404. continue OUTER_LOOP
  405. }
  406. }
  407. //-----------------------------------------------------------------------------
  408. // Read only when returned by PeerState.GetRoundState().
  409. type PeerRoundState struct {
  410. Height uint // Height peer is at
  411. Round uint // Round peer is at
  412. Step RoundStep // Step peer is at
  413. StartTime time.Time // Estimated start of round 0 at this height
  414. Proposal bool // True if peer has proposal for this round
  415. ProposalBlockParts PartSetHeader //
  416. ProposalBlockBitArray BitArray // True bit -> has part
  417. ProposalPOLParts PartSetHeader //
  418. ProposalPOLBitArray BitArray // True bit -> has part
  419. Prevotes BitArray // All votes peer has for this round
  420. Precommits BitArray // All precommits peer has for this round
  421. Commits BitArray // All commits peer has for this height
  422. LastCommits BitArray // All commits peer has for last height
  423. HasAllCatchupCommits bool // Used for catch-up
  424. }
  425. //-----------------------------------------------------------------------------
  426. var (
  427. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  428. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  429. )
  430. type PeerState struct {
  431. mtx sync.Mutex
  432. PeerRoundState
  433. }
  434. func NewPeerState(peer *p2p.Peer) *PeerState {
  435. return &PeerState{}
  436. }
  437. // Returns an atomic snapshot of the PeerRoundState.
  438. // There's no point in mutating it since it won't change PeerState.
  439. func (ps *PeerState) GetRoundState() *PeerRoundState {
  440. ps.mtx.Lock()
  441. defer ps.mtx.Unlock()
  442. prs := ps.PeerRoundState // copy
  443. return &prs
  444. }
  445. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  446. ps.mtx.Lock()
  447. defer ps.mtx.Unlock()
  448. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  449. return
  450. }
  451. if ps.Proposal {
  452. return
  453. }
  454. ps.Proposal = true
  455. ps.ProposalBlockParts = proposal.BlockParts
  456. ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
  457. ps.ProposalPOLParts = proposal.POLParts
  458. ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
  459. }
  460. func (ps *PeerState) SetHasProposalBlockPart(height uint, round uint, index uint) {
  461. ps.mtx.Lock()
  462. defer ps.mtx.Unlock()
  463. if ps.Height != height || ps.Round != round {
  464. return
  465. }
  466. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  467. }
  468. func (ps *PeerState) SetHasProposalPOLPart(height uint, round uint, index uint) {
  469. ps.mtx.Lock()
  470. defer ps.mtx.Unlock()
  471. if ps.Height != height || ps.Round != round {
  472. return
  473. }
  474. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  475. }
  476. func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) {
  477. ps.mtx.Lock()
  478. defer ps.mtx.Unlock()
  479. if ps.Height != height {
  480. return
  481. }
  482. if ps.Prevotes.IsZero() {
  483. ps.Prevotes = NewBitArray(numValidators)
  484. }
  485. if ps.Precommits.IsZero() {
  486. ps.Precommits = NewBitArray(numValidators)
  487. }
  488. if ps.Commits.IsZero() {
  489. ps.Commits = NewBitArray(numValidators)
  490. }
  491. }
  492. func (ps *PeerState) SetHasVote(vote *Vote, index uint) {
  493. ps.mtx.Lock()
  494. defer ps.mtx.Unlock()
  495. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  496. }
  497. func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) {
  498. if ps.Height == height+1 && type_ == VoteTypeCommit {
  499. // Special case for LastCommits.
  500. ps.LastCommits.SetIndex(index, true)
  501. return
  502. } else if ps.Height != height {
  503. // Does not apply.
  504. return
  505. }
  506. switch type_ {
  507. case VoteTypePrevote:
  508. ps.Prevotes.SetIndex(index, true)
  509. case VoteTypePrecommit:
  510. ps.Precommits.SetIndex(index, true)
  511. case VoteTypeCommit:
  512. if round < ps.Round {
  513. ps.Prevotes.SetIndex(index, true)
  514. ps.Precommits.SetIndex(index, true)
  515. }
  516. ps.Commits.SetIndex(index, true)
  517. default:
  518. panic("Invalid vote type")
  519. }
  520. }
  521. // When catching up, this helps keep track of whether
  522. // we should send more commit votes from the block (validation) store
  523. func (ps *PeerState) SetHasAllCatchupCommits(height uint) {
  524. ps.mtx.Lock()
  525. defer ps.mtx.Unlock()
  526. if ps.Height == height {
  527. ps.HasAllCatchupCommits = true
  528. }
  529. }
  530. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  531. ps.mtx.Lock()
  532. defer ps.mtx.Unlock()
  533. // Just remember these values.
  534. psHeight := ps.Height
  535. psRound := ps.Round
  536. //psStep := ps.Step
  537. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  538. ps.Height = msg.Height
  539. ps.Round = msg.Round
  540. ps.Step = msg.Step
  541. ps.StartTime = startTime
  542. if psHeight != msg.Height || psRound != msg.Round {
  543. ps.Proposal = false
  544. ps.ProposalBlockParts = PartSetHeader{}
  545. ps.ProposalBlockBitArray = BitArray{}
  546. ps.ProposalPOLParts = PartSetHeader{}
  547. ps.ProposalPOLBitArray = BitArray{}
  548. // We'll update the BitArray capacity later.
  549. ps.Prevotes = BitArray{}
  550. ps.Precommits = BitArray{}
  551. }
  552. if psHeight != msg.Height {
  553. // Shift Commits to LastCommits
  554. if psHeight+1 == msg.Height {
  555. ps.LastCommits = ps.Commits
  556. } else {
  557. ps.LastCommits = BitArray{}
  558. }
  559. // We'll update the BitArray capacity later.
  560. ps.Commits = BitArray{}
  561. ps.HasAllCatchupCommits = false
  562. }
  563. }
  564. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  565. ps.mtx.Lock()
  566. defer ps.mtx.Unlock()
  567. if ps.Height != msg.Height {
  568. return
  569. }
  570. ps.ProposalBlockParts = msg.BlockParts
  571. ps.ProposalBlockBitArray = msg.BlockBitArray
  572. }
  573. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  574. ps.mtx.Lock()
  575. defer ps.mtx.Unlock()
  576. // Special case for LastCommits
  577. if ps.Height == msg.Height+1 && msg.Type == VoteTypeCommit {
  578. ps.LastCommits.SetIndex(msg.Index, true)
  579. return
  580. } else if ps.Height != msg.Height {
  581. return
  582. }
  583. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  584. }
  585. //-----------------------------------------------------------------------------
  586. // Messages
  587. const (
  588. msgTypeUnknown = byte(0x00)
  589. msgTypeNewRoundStep = byte(0x01)
  590. msgTypeCommitStep = byte(0x02)
  591. msgTypeProposal = byte(0x11)
  592. msgTypePart = byte(0x12) // both block & POL
  593. msgTypeVote = byte(0x13)
  594. msgTypeHasVote = byte(0x14)
  595. )
  596. // TODO: check for unnecessary extra bytes at the end.
  597. func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
  598. n := new(int64)
  599. // log.Debug(Fmt("decoding msg bytes: %X", bz))
  600. msgType = bz[0]
  601. r := bytes.NewReader(bz)
  602. switch msgType {
  603. // Messages for communicating state changes
  604. case msgTypeNewRoundStep:
  605. msg = ReadBinary(&NewRoundStepMessage{}, r, n, &err)
  606. case msgTypeCommitStep:
  607. msg = ReadBinary(&CommitStepMessage{}, r, n, &err)
  608. // Messages of data
  609. case msgTypeProposal:
  610. r.ReadByte() // Consume the byte
  611. msg = ReadBinary(&Proposal{}, r, n, &err)
  612. case msgTypePart:
  613. msg = ReadBinary(&PartMessage{}, r, n, &err)
  614. case msgTypeVote:
  615. msg = ReadBinary(&VoteMessage{}, r, n, &err)
  616. case msgTypeHasVote:
  617. msg = ReadBinary(&HasVoteMessage{}, r, n, &err)
  618. default:
  619. msg = nil
  620. }
  621. return
  622. }
  623. //-------------------------------------
  624. type NewRoundStepMessage struct {
  625. Height uint
  626. Round uint
  627. Step RoundStep
  628. SecondsSinceStartTime uint
  629. }
  630. func (m *NewRoundStepMessage) TypeByte() byte { return msgTypeNewRoundStep }
  631. func (m *NewRoundStepMessage) String() string {
  632. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v]", m.Height, m.Round, m.Step)
  633. }
  634. //-------------------------------------
  635. type CommitStepMessage struct {
  636. Height uint
  637. BlockParts PartSetHeader
  638. BlockBitArray BitArray
  639. }
  640. func (m *CommitStepMessage) TypeByte() byte { return msgTypeCommitStep }
  641. func (m *CommitStepMessage) String() string {
  642. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockParts, m.BlockBitArray)
  643. }
  644. //-------------------------------------
  645. const (
  646. partTypeProposalBlock = byte(0x01)
  647. partTypeProposalPOL = byte(0x02)
  648. )
  649. type PartMessage struct {
  650. Height uint
  651. Round uint
  652. Type byte
  653. Part *Part
  654. }
  655. func (m *PartMessage) TypeByte() byte { return msgTypePart }
  656. func (m *PartMessage) String() string {
  657. return fmt.Sprintf("[Part H:%v R:%v T:%X P:%v]", m.Height, m.Round, m.Type, m.Part)
  658. }
  659. //-------------------------------------
  660. type VoteMessage struct {
  661. ValidatorIndex uint
  662. Vote *Vote
  663. }
  664. func (m *VoteMessage) TypeByte() byte { return msgTypeVote }
  665. func (m *VoteMessage) String() string {
  666. return fmt.Sprintf("[Vote VI:%v V:%v]", m.ValidatorIndex, m.Vote)
  667. }
  668. //-------------------------------------
  669. type HasVoteMessage struct {
  670. Height uint
  671. Round uint
  672. Type byte
  673. Index uint
  674. }
  675. func (m *HasVoteMessage) TypeByte() byte { return msgTypeHasVote }
  676. func (m *HasVoteMessage) String() string {
  677. return fmt.Sprintf("[HasVote %v/%v T:%X]", m.Height, m.Round, m.Type)
  678. }