You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

859 lines
24 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. . "github.com/tendermint/tendermint/binary"
  12. . "github.com/tendermint/tendermint/blocks"
  13. . "github.com/tendermint/tendermint/common"
  14. . "github.com/tendermint/tendermint/mempool"
  15. "github.com/tendermint/tendermint/p2p"
  16. . "github.com/tendermint/tendermint/state"
  17. )
  18. const (
  19. StateCh = byte(0x20)
  20. DataCh = byte(0x21)
  21. VoteCh = byte(0x22)
  22. peerStateKey = "ConsensusReactor.peerState"
  23. voteTypeNil = byte(0x00)
  24. voteTypeBlock = byte(0x01)
  25. roundDuration0 = 60 * time.Second // The first round is 60 seconds long.
  26. roundDurationDelta = 15 * time.Second // Each successive round lasts 15 seconds longer.
  27. roundDeadlineBare = float64(1.0 / 3.0) // When the bare vote is due.
  28. roundDeadlinePrecommit = float64(2.0 / 3.0) // When the precommit vote is due.
  29. newBlockWaitDuration = roundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds.
  30. peerGossipSleepDuration = 50 * time.Millisecond // Time to sleep if there's nothing to send.
  31. hasVotesThreshold = 50 // After this many new votes we'll send a HasVotesMessage.
  32. )
  33. //-----------------------------------------------------------------------------
  34. // total duration of given round
  35. func calcRoundDuration(round uint16) time.Duration {
  36. return roundDuration0 + roundDurationDelta*time.Duration(round)
  37. }
  38. // startTime is when round zero started.
  39. func calcRoundStartTime(round uint16, startTime time.Time) time.Time {
  40. return startTime.Add(roundDuration0*time.Duration(round) +
  41. roundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2)))
  42. }
  43. // calculates the current round given startTime of round zero.
  44. // NOTE: round is zero if startTime is in the future.
  45. func calcRound(startTime time.Time) uint16 {
  46. now := time.Now()
  47. if now.Before(startTime) {
  48. return 0
  49. }
  50. // Start + D_0 * R + D_delta * (R^2 - R)/2 <= Now; find largest integer R.
  51. // D_delta * R^2 + (2D_0 - D_delta) * R + 2(Start - Now) <= 0.
  52. // AR^2 + BR + C <= 0; A = D_delta, B = (2_D0 - D_delta), C = 2(Start - Now).
  53. // R = Floor((-B + Sqrt(B^2 - 4AC))/2A)
  54. A := float64(roundDurationDelta)
  55. B := 2.0*float64(roundDuration0) - float64(roundDurationDelta)
  56. C := 2.0 * float64(startTime.Sub(now))
  57. R := math.Floor((-B + math.Sqrt(B*B-4.0*A*C)/(2*A)))
  58. if math.IsNaN(R) {
  59. panic("Could not calc round, should not happen")
  60. }
  61. if R > math.MaxInt16 {
  62. Panicf("Could not calc round, round overflow: %v", R)
  63. }
  64. if R < 0 {
  65. return 0
  66. }
  67. return uint16(R)
  68. }
  69. // convenience
  70. // NOTE: elapsedRatio can be negative if startTime is in the future.
  71. func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration,
  72. roundElapsed time.Duration, elapsedRatio float64) {
  73. round = calcRound(startTime)
  74. roundStartTime = calcRoundStartTime(round, startTime)
  75. roundDuration = calcRoundDuration(round)
  76. roundElapsed = time.Now().Sub(roundStartTime)
  77. elapsedRatio = float64(roundElapsed) / float64(roundDuration)
  78. return
  79. }
  80. //-----------------------------------------------------------------------------
  81. type ConsensusReactor struct {
  82. sw *p2p.Switch
  83. quit chan struct{}
  84. started uint32
  85. stopped uint32
  86. conS *ConsensusState
  87. doActionCh chan RoundAction
  88. }
  89. func NewConsensusReactor(sw *p2p.Switch, blockStore *BlockStore, mempool *Mempool, state *State) *ConsensusReactor {
  90. conS := NewConsensusState(state, blockStore, mempool)
  91. conR := &ConsensusReactor{
  92. sw: sw,
  93. quit: make(chan struct{}),
  94. conS: conS,
  95. doActionCh: make(chan RoundAction, 1),
  96. }
  97. return conR
  98. }
  99. // Sets our private validator account for signing votes.
  100. func (conR *ConsensusReactor) SetPrivValidator(priv *PrivValidator) {
  101. conR.conS.SetPrivValidator(priv)
  102. }
  103. func (conR *ConsensusReactor) Start() {
  104. if atomic.CompareAndSwapUint32(&conR.started, 0, 1) {
  105. log.Info("Starting ConsensusReactor")
  106. go conR.proposeAndVoteRoutine()
  107. }
  108. }
  109. func (conR *ConsensusReactor) Stop() {
  110. if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) {
  111. log.Info("Stopping ConsensusReactor")
  112. close(conR.quit)
  113. }
  114. }
  115. func (conR *ConsensusReactor) IsStopped() bool {
  116. return atomic.LoadUint32(&conR.stopped) == 1
  117. }
  118. // Implements Reactor
  119. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  120. // TODO optimize
  121. return []*p2p.ChannelDescriptor{
  122. &p2p.ChannelDescriptor{
  123. Id: StateCh,
  124. Priority: 5,
  125. },
  126. &p2p.ChannelDescriptor{
  127. Id: DataCh,
  128. Priority: 5,
  129. },
  130. &p2p.ChannelDescriptor{
  131. Id: VoteCh,
  132. Priority: 5,
  133. },
  134. }
  135. }
  136. // Implements Reactor
  137. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  138. // Create peerState for peer
  139. peerState := NewPeerState(peer)
  140. peer.Data.Set(peerStateKey, peerState)
  141. // Begin gossip routines for this peer.
  142. go conR.gossipDataRoutine(peer, peerState)
  143. go conR.gossipVotesRoutine(peer, peerState)
  144. }
  145. // Implements Reactor
  146. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  147. //peer.Data.Get(peerStateKey).(*PeerState).Disconnect()
  148. }
  149. // Implements Reactor
  150. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  151. // Get round state
  152. rs := conR.conS.GetRoundState()
  153. ps := peer.Data.Get(peerStateKey).(*PeerState)
  154. _, msg_ := decodeMessage(msgBytes)
  155. voteAddCounter := 0
  156. var err error = nil
  157. switch chId {
  158. case StateCh:
  159. switch msg_.(type) {
  160. case *NewRoundStepMessage:
  161. msg := msg_.(*NewRoundStepMessage)
  162. err = ps.ApplyNewRoundStepMessage(msg)
  163. case *HasVotesMessage:
  164. msg := msg_.(*HasVotesMessage)
  165. err = ps.ApplyHasVotesMessage(msg)
  166. default:
  167. // Ignore unknown message
  168. }
  169. case DataCh:
  170. switch msg_.(type) {
  171. case *Proposal:
  172. proposal := msg_.(*Proposal)
  173. ps.SetHasProposal(proposal.Height, proposal.Round)
  174. err = conR.conS.SetProposal(proposal)
  175. case *PartMessage:
  176. msg := msg_.(*PartMessage)
  177. if msg.Type == partTypeProposalBlock {
  178. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  179. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  180. } else if msg.Type == partTypeProposalPOL {
  181. ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
  182. _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
  183. } else {
  184. // Ignore unknown part type
  185. }
  186. default:
  187. // Ignore unknown message
  188. }
  189. case VoteCh:
  190. switch msg_.(type) {
  191. case *Vote:
  192. vote := msg_.(*Vote)
  193. // We can't deal with votes from another height,
  194. // as they have a different validator set.
  195. if vote.Height != rs.Height || vote.Height != ps.Height {
  196. return
  197. }
  198. index, val := rs.Validators.GetById(vote.SignerId)
  199. if val == nil {
  200. log.Warning("Peer gave us an invalid vote.")
  201. return
  202. }
  203. ps.SetHasVote(rs.Height, rs.Round, vote.Type, uint32(index))
  204. added, err := conR.conS.AddVote(vote)
  205. if err != nil {
  206. log.Warning("Error attempting to add vote: %v", err)
  207. }
  208. if added {
  209. // Maybe send HasVotesMessage
  210. voteAddCounter++
  211. if voteAddCounter%hasVotesThreshold == 0 {
  212. // TODO optimize.
  213. msg := &HasVotesMessage{
  214. Height: rs.Height,
  215. Round: rs.Round,
  216. Votes: rs.Votes.BitArray(),
  217. Precommits: rs.Precommits.BitArray(),
  218. Commits: rs.Commits.BitArray(),
  219. }
  220. conR.sw.Broadcast(StateCh, msg)
  221. }
  222. }
  223. default:
  224. // Ignore unknown message
  225. }
  226. default:
  227. // Ignore unknown channel
  228. }
  229. if err != nil {
  230. log.Warning("Error in Receive(): %v", err)
  231. }
  232. }
  233. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  234. OUTER_LOOP:
  235. for {
  236. // Manage disconnects from self or peer.
  237. if peer.IsStopped() || conR.IsStopped() {
  238. log.Info("Stopping gossipDataRoutine for %v.", peer)
  239. return
  240. }
  241. rs := conR.conS.GetRoundState()
  242. prs := ps.GetRoundState()
  243. // If height and round doesn't match, sleep.
  244. if rs.Height != prs.Height || rs.Round != prs.Round {
  245. time.Sleep(peerGossipSleepDuration)
  246. continue OUTER_LOOP
  247. }
  248. // Send proposal?
  249. if rs.Proposal != nil && !prs.Proposal {
  250. msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal}
  251. peer.Send(DataCh, msg)
  252. ps.SetHasProposal(rs.Height, rs.Round)
  253. continue OUTER_LOOP
  254. }
  255. // Send proposal block part?
  256. if index, ok := rs.ProposalBlockPartSet.BitArray().Sub(
  257. prs.ProposalBlockBitArray).PickRandom(); ok {
  258. msg := &PartMessage{
  259. Height: rs.Height,
  260. Round: rs.Round,
  261. Type: partTypeProposalBlock,
  262. Part: rs.ProposalBlockPartSet.GetPart(uint16(index)),
  263. }
  264. peer.Send(DataCh, msg)
  265. ps.SetHasProposalBlockPart(rs.Height, rs.Round, uint16(index))
  266. continue OUTER_LOOP
  267. }
  268. // Send proposal POL part?
  269. if index, ok := rs.ProposalPOLPartSet.BitArray().Sub(
  270. prs.ProposalPOLBitArray).PickRandom(); ok {
  271. msg := &PartMessage{
  272. Height: rs.Height,
  273. Round: rs.Round,
  274. Type: partTypeProposalPOL,
  275. Part: rs.ProposalPOLPartSet.GetPart(uint16(index)),
  276. }
  277. peer.Send(DataCh, msg)
  278. ps.SetHasProposalPOLPart(rs.Height, rs.Round, uint16(index))
  279. continue OUTER_LOOP
  280. }
  281. // Nothing to do. Sleep.
  282. time.Sleep(peerGossipSleepDuration)
  283. continue OUTER_LOOP
  284. }
  285. }
  286. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  287. OUTER_LOOP:
  288. for {
  289. // Manage disconnects from self or peer.
  290. if peer.IsStopped() || conR.IsStopped() {
  291. log.Info("Stopping gossipVotesRoutine for %v.", peer)
  292. return
  293. }
  294. rs := conR.conS.GetRoundState()
  295. prs := ps.GetRoundState()
  296. // If height doens't match, sleep.
  297. if rs.Height != prs.Height {
  298. time.Sleep(peerGossipSleepDuration)
  299. continue OUTER_LOOP
  300. }
  301. // If there are bare votes to send...
  302. if prs.Step <= RoundStepVote {
  303. index, ok := rs.Votes.BitArray().Sub(prs.Votes).PickRandom()
  304. if ok {
  305. valId, val := rs.Validators.GetByIndex(uint32(index))
  306. if val != nil {
  307. vote := rs.Votes.GetVote(valId)
  308. msg := p2p.TypedMessage{msgTypeVote, vote}
  309. peer.Send(VoteCh, msg)
  310. ps.SetHasVote(rs.Height, rs.Round, VoteTypeBare, uint32(index))
  311. continue OUTER_LOOP
  312. } else {
  313. log.Error("index is not a valid validator index")
  314. }
  315. }
  316. }
  317. // If there are precommits to send...
  318. if prs.Step <= RoundStepPrecommit {
  319. index, ok := rs.Precommits.BitArray().Sub(prs.Precommits).PickRandom()
  320. if ok {
  321. valId, val := rs.Validators.GetByIndex(uint32(index))
  322. if val != nil {
  323. vote := rs.Precommits.GetVote(valId)
  324. msg := p2p.TypedMessage{msgTypeVote, vote}
  325. peer.Send(VoteCh, msg)
  326. ps.SetHasVote(rs.Height, rs.Round, VoteTypePrecommit, uint32(index))
  327. continue OUTER_LOOP
  328. } else {
  329. log.Error("index is not a valid validator index")
  330. }
  331. }
  332. }
  333. // If there are any commits to send...
  334. index, ok := rs.Commits.BitArray().Sub(prs.Commits).PickRandom()
  335. if ok {
  336. valId, val := rs.Validators.GetByIndex(uint32(index))
  337. if val != nil {
  338. vote := rs.Commits.GetVote(valId)
  339. msg := p2p.TypedMessage{msgTypeVote, vote}
  340. peer.Send(VoteCh, msg)
  341. ps.SetHasVote(rs.Height, rs.Round, VoteTypeCommit, uint32(index))
  342. continue OUTER_LOOP
  343. } else {
  344. log.Error("index is not a valid validator index")
  345. }
  346. }
  347. // We sent nothing. Sleep...
  348. time.Sleep(peerGossipSleepDuration)
  349. continue OUTER_LOOP
  350. }
  351. }
  352. // Signs a vote document and broadcasts it.
  353. func (conR *ConsensusReactor) signAndBroadcastVote(rs *RoundState, vote *Vote) {
  354. if rs.PrivValidator != nil {
  355. rs.PrivValidator.Sign(vote)
  356. conR.conS.AddVote(vote)
  357. msg := p2p.TypedMessage{msgTypeVote, vote}
  358. conR.sw.Broadcast(VoteCh, msg)
  359. }
  360. }
  361. //-------------------------------------
  362. func (conR *ConsensusReactor) runStepPropose(rs *RoundState) {
  363. conR.conS.MakeProposal()
  364. }
  365. func (conR *ConsensusReactor) runStepVote(rs *RoundState) {
  366. // If we have a locked block, we must vote for that.
  367. // NOTE: a locked block is already valid.
  368. if rs.LockedBlock != nil {
  369. conR.signAndBroadcastVote(rs, &Vote{
  370. Height: rs.Height,
  371. Round: rs.Round,
  372. Type: VoteTypeBare,
  373. BlockHash: rs.LockedBlock.Hash(),
  374. })
  375. }
  376. // Try staging proposed block.
  377. // If Block is nil, an error is returned.
  378. err := conR.conS.stageBlock(rs.ProposalBlock)
  379. if err != nil {
  380. // Vote nil
  381. conR.signAndBroadcastVote(rs, &Vote{
  382. Height: rs.Height,
  383. Round: rs.Round,
  384. Type: VoteTypeBare,
  385. BlockHash: nil,
  386. })
  387. } else {
  388. // Vote for block
  389. conR.signAndBroadcastVote(rs, &Vote{
  390. Height: rs.Height,
  391. Round: rs.Round,
  392. Type: VoteTypeBare,
  393. BlockHash: rs.ProposalBlock.Hash(),
  394. })
  395. }
  396. }
  397. func (conR *ConsensusReactor) runStepPrecommit(rs *RoundState) {
  398. // If we see a 2/3 majority of votes for a block, lock.
  399. hash := conR.conS.LockOrUnlock(rs.Height, rs.Round)
  400. if len(hash) > 0 {
  401. // Precommit
  402. conR.signAndBroadcastVote(rs, &Vote{
  403. Height: rs.Height,
  404. Round: rs.Round,
  405. Type: VoteTypePrecommit,
  406. BlockHash: hash,
  407. })
  408. }
  409. }
  410. func (conR *ConsensusReactor) runStepCommit(rs *RoundState) bool {
  411. // If we see a 2/3 majority of precommits for a block, commit.
  412. block := conR.conS.Commit(rs.Height, rs.Round)
  413. if block == nil {
  414. return false
  415. } else {
  416. conR.signAndBroadcastVote(rs, &Vote{
  417. Height: rs.Height,
  418. Round: rs.Round,
  419. Type: VoteTypePrecommit,
  420. BlockHash: block.Hash(),
  421. })
  422. return true
  423. }
  424. }
  425. //-------------------------------------
  426. type RoundAction struct {
  427. Height uint32 // The block height for which consensus is reaching for.
  428. Round uint16 // The round number at given height.
  429. XnToStep uint8 // Transition to this step. Action depends on this value.
  430. RoundElapsed time.Duration // Duration since round start.
  431. }
  432. // Source of all round state transitions and votes.
  433. // It can be preemptively woken up via a message to
  434. // doActionCh.
  435. func (conR *ConsensusReactor) proposeAndVoteRoutine() {
  436. // Figure out when to wake up next (in the absence of other events)
  437. setAlarm := func() {
  438. if len(conR.doActionCh) > 0 {
  439. return // Already going to wake up later.
  440. }
  441. // Figure out which height/round/step we're at,
  442. // then schedule an action for when it is due.
  443. rs := conR.conS.GetRoundState()
  444. _, _, roundDuration, roundElapsed, elapsedRatio := calcRoundInfo(rs.StartTime)
  445. switch rs.Step {
  446. case RoundStepStart:
  447. // It's a new RoundState.
  448. if elapsedRatio < 0 {
  449. // startTime is in the future.
  450. time.Sleep(time.Duration(-1.0*elapsedRatio) * roundDuration)
  451. }
  452. conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepPropose, roundElapsed}
  453. case RoundStepPropose:
  454. // Wake up when it's time to vote.
  455. time.Sleep(time.Duration(roundDeadlineBare-elapsedRatio) * roundDuration)
  456. conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepVote, roundElapsed}
  457. case RoundStepVote:
  458. // Wake up when it's time to precommit.
  459. time.Sleep(time.Duration(roundDeadlinePrecommit-elapsedRatio) * roundDuration)
  460. conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepPrecommit, roundElapsed}
  461. case RoundStepPrecommit:
  462. // Wake up when the round is over.
  463. time.Sleep(time.Duration(1.0-elapsedRatio) * roundDuration)
  464. conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepCommit, roundElapsed}
  465. case RoundStepCommit:
  466. // This shouldn't happen.
  467. // Before setAlarm() got called,
  468. // logic should have created a new RoundState for the next round.
  469. panic("Should not happen")
  470. }
  471. }
  472. for {
  473. func() {
  474. roundAction := <-conR.doActionCh
  475. // Always set the alarm after any processing below.
  476. defer setAlarm()
  477. height := roundAction.Height
  478. round := roundAction.Round
  479. step := roundAction.XnToStep
  480. roundElapsed := roundAction.RoundElapsed
  481. rs := conR.conS.GetRoundState()
  482. if height != rs.Height || round != rs.Round {
  483. return // Action is not relevant
  484. }
  485. // Run step
  486. if step == RoundStepPropose && rs.Step == RoundStepStart {
  487. conR.runStepPropose(rs)
  488. } else if step == RoundStepVote && rs.Step <= RoundStepPropose {
  489. conR.runStepVote(rs)
  490. } else if step == RoundStepPrecommit && rs.Step <= RoundStepVote {
  491. conR.runStepPrecommit(rs)
  492. } else if step == RoundStepCommit && rs.Step <= RoundStepPrecommit {
  493. didCommit := conR.runStepCommit(rs)
  494. if didCommit {
  495. // We already set up ConsensusState for the next height
  496. // (it happens in the call to conR.runStepCommit).
  497. } else {
  498. // Prepare a new RoundState for the next state.
  499. conR.conS.SetupRound(rs.Round + 1)
  500. return // setAlarm() takes care of the rest.
  501. }
  502. } else {
  503. return // Action is not relevant.
  504. }
  505. // Transition to new step.
  506. conR.conS.SetStep(step)
  507. // Broadcast NewRoundStepMessage.
  508. msg := &NewRoundStepMessage{
  509. Height: height,
  510. Round: round,
  511. Step: step,
  512. SecondsSinceStartTime: uint32(roundElapsed.Seconds()),
  513. }
  514. conR.sw.Broadcast(StateCh, msg)
  515. }()
  516. }
  517. }
  518. //-----------------------------------------------------------------------------
  519. // Read only when returned by PeerState.GetRoundState().
  520. type PeerRoundState struct {
  521. Height uint32 // Height peer is at
  522. Round uint16 // Round peer is at
  523. Step uint8 // Step peer is at
  524. StartTime time.Time // Estimated start of round 0 at this height
  525. Proposal bool // True if peer has proposal for this round
  526. ProposalBlockHash []byte // Block parts merkle root
  527. ProposalBlockBitArray BitArray // Block parts bitarray
  528. ProposalPOLHash []byte // POL parts merkle root
  529. ProposalPOLBitArray BitArray // POL parts bitarray
  530. Votes BitArray // All votes peer has for this round
  531. Precommits BitArray // All precommits peer has for this round
  532. Commits BitArray // All commits peer has for this height
  533. }
  534. //-----------------------------------------------------------------------------
  535. var (
  536. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  537. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  538. )
  539. type PeerState struct {
  540. mtx sync.Mutex
  541. PeerRoundState
  542. }
  543. func NewPeerState(peer *p2p.Peer) *PeerState {
  544. return &PeerState{}
  545. }
  546. // Returns an atomic snapshot of the PeerRoundState.
  547. // There's no point in mutating it since it won't change PeerState.
  548. func (ps *PeerState) GetRoundState() *PeerRoundState {
  549. ps.mtx.Lock()
  550. defer ps.mtx.Unlock()
  551. prs := ps.PeerRoundState // copy
  552. return &prs
  553. }
  554. func (ps *PeerState) SetHasProposal(height uint32, round uint16) {
  555. ps.mtx.Lock()
  556. defer ps.mtx.Unlock()
  557. if ps.Height == height && ps.Round == round {
  558. ps.Proposal = true
  559. }
  560. }
  561. func (ps *PeerState) SetHasProposalBlockPart(height uint32, round uint16, index uint16) {
  562. ps.mtx.Lock()
  563. defer ps.mtx.Unlock()
  564. if ps.Height == height && ps.Round == round {
  565. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  566. }
  567. }
  568. func (ps *PeerState) SetHasProposalPOLPart(height uint32, round uint16, index uint16) {
  569. ps.mtx.Lock()
  570. defer ps.mtx.Unlock()
  571. if ps.Height == height && ps.Round == round {
  572. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  573. }
  574. }
  575. func (ps *PeerState) SetHasVote(height uint32, round uint16, type_ uint8, index uint32) {
  576. ps.mtx.Lock()
  577. defer ps.mtx.Unlock()
  578. if ps.Height == height && (ps.Round == round || type_ == VoteTypeCommit) {
  579. switch type_ {
  580. case VoteTypeBare:
  581. ps.Votes.SetIndex(uint(index), true)
  582. case VoteTypePrecommit:
  583. ps.Precommits.SetIndex(uint(index), true)
  584. case VoteTypeCommit:
  585. ps.Commits.SetIndex(uint(index), true)
  586. default:
  587. panic("Invalid vote type")
  588. }
  589. }
  590. }
  591. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) error {
  592. ps.mtx.Lock()
  593. defer ps.mtx.Unlock()
  594. // Set step state
  595. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  596. ps.Height = msg.Height
  597. ps.Round = msg.Round
  598. ps.Step = msg.Step
  599. ps.StartTime = startTime
  600. // Reset the rest
  601. ps.Proposal = false
  602. ps.ProposalBlockHash = nil
  603. ps.ProposalBlockBitArray = nil
  604. ps.ProposalPOLHash = nil
  605. ps.ProposalPOLBitArray = nil
  606. ps.Votes = nil
  607. ps.Precommits = nil
  608. if ps.Height != msg.Height {
  609. ps.Commits = nil
  610. }
  611. return nil
  612. }
  613. func (ps *PeerState) ApplyHasVotesMessage(msg *HasVotesMessage) error {
  614. ps.mtx.Lock()
  615. defer ps.mtx.Unlock()
  616. if ps.Height == msg.Height {
  617. ps.Commits = ps.Commits.Or(msg.Commits)
  618. if ps.Round == msg.Round {
  619. ps.Votes = ps.Votes.Or(msg.Votes)
  620. ps.Precommits = ps.Precommits.Or(msg.Precommits)
  621. } else {
  622. ps.Votes = msg.Votes
  623. ps.Precommits = msg.Precommits
  624. }
  625. }
  626. return nil
  627. }
  628. //-----------------------------------------------------------------------------
  629. // Messages
  630. const (
  631. msgTypeUnknown = byte(0x00)
  632. // Messages for communicating state changes
  633. msgTypeNewRoundStep = byte(0x01)
  634. msgTypeHasVotes = byte(0x02)
  635. // Messages of data
  636. msgTypeProposal = byte(0x11)
  637. msgTypePart = byte(0x12) // both block & POL
  638. msgTypeVote = byte(0x13)
  639. )
  640. // TODO: check for unnecessary extra bytes at the end.
  641. func decodeMessage(bz []byte) (msgType byte, msg interface{}) {
  642. n, err := new(int64), new(error)
  643. // log.Debug("decoding msg bytes: %X", bz)
  644. msgType = bz[0]
  645. r := bytes.NewReader(bz[1:])
  646. switch msgType {
  647. // Messages for communicating state changes
  648. case msgTypeNewRoundStep:
  649. msg = readNewRoundStepMessage(r, n, err)
  650. case msgTypeHasVotes:
  651. msg = readHasVotesMessage(r, n, err)
  652. // Messages of data
  653. case msgTypeProposal:
  654. msg = ReadProposal(r, n, err)
  655. case msgTypePart:
  656. msg = readPartMessage(r, n, err)
  657. case msgTypeVote:
  658. msg = ReadVote(r, n, err)
  659. default:
  660. msg = nil
  661. }
  662. return
  663. }
  664. //-------------------------------------
  665. type NewRoundStepMessage struct {
  666. Height uint32
  667. Round uint16
  668. Step uint8
  669. SecondsSinceStartTime uint32
  670. }
  671. func readNewRoundStepMessage(r io.Reader, n *int64, err *error) *NewRoundStepMessage {
  672. return &NewRoundStepMessage{
  673. Height: ReadUInt32(r, n, err),
  674. Round: ReadUInt16(r, n, err),
  675. Step: ReadUInt8(r, n, err),
  676. SecondsSinceStartTime: ReadUInt32(r, n, err),
  677. }
  678. }
  679. func (m *NewRoundStepMessage) WriteTo(w io.Writer) (n int64, err error) {
  680. WriteByte(w, msgTypeNewRoundStep, &n, &err)
  681. WriteUInt32(w, m.Height, &n, &err)
  682. WriteUInt16(w, m.Round, &n, &err)
  683. WriteUInt8(w, m.Step, &n, &err)
  684. WriteUInt32(w, m.SecondsSinceStartTime, &n, &err)
  685. return
  686. }
  687. func (m *NewRoundStepMessage) String() string {
  688. return fmt.Sprintf("[NewRoundStepMessage H:%v R:%v]", m.Height, m.Round)
  689. }
  690. //-------------------------------------
  691. type HasVotesMessage struct {
  692. Height uint32
  693. Round uint16
  694. Votes BitArray
  695. Precommits BitArray
  696. Commits BitArray
  697. }
  698. func readHasVotesMessage(r io.Reader, n *int64, err *error) *HasVotesMessage {
  699. return &HasVotesMessage{
  700. Height: ReadUInt32(r, n, err),
  701. Round: ReadUInt16(r, n, err),
  702. Votes: ReadBitArray(r, n, err),
  703. Precommits: ReadBitArray(r, n, err),
  704. Commits: ReadBitArray(r, n, err),
  705. }
  706. }
  707. func (m *HasVotesMessage) WriteTo(w io.Writer) (n int64, err error) {
  708. WriteByte(w, msgTypeHasVotes, &n, &err)
  709. WriteUInt32(w, m.Height, &n, &err)
  710. WriteUInt16(w, m.Round, &n, &err)
  711. WriteBinary(w, m.Votes, &n, &err)
  712. WriteBinary(w, m.Precommits, &n, &err)
  713. WriteBinary(w, m.Commits, &n, &err)
  714. return
  715. }
  716. func (m *HasVotesMessage) String() string {
  717. return fmt.Sprintf("[HasVotesMessage H:%v R:%v]", m.Height, m.Round)
  718. }
  719. //-------------------------------------
  720. const (
  721. partTypeProposalBlock = byte(0x01)
  722. partTypeProposalPOL = byte(0x02)
  723. )
  724. type PartMessage struct {
  725. Height uint32
  726. Round uint16
  727. Type byte
  728. Part *Part
  729. }
  730. func readPartMessage(r io.Reader, n *int64, err *error) *PartMessage {
  731. return &PartMessage{
  732. Height: ReadUInt32(r, n, err),
  733. Round: ReadUInt16(r, n, err),
  734. Type: ReadByte(r, n, err),
  735. Part: ReadPart(r, n, err),
  736. }
  737. }
  738. func (m *PartMessage) WriteTo(w io.Writer) (n int64, err error) {
  739. WriteByte(w, msgTypePart, &n, &err)
  740. WriteUInt32(w, m.Height, &n, &err)
  741. WriteUInt16(w, m.Round, &n, &err)
  742. WriteByte(w, m.Type, &n, &err)
  743. WriteBinary(w, m.Part, &n, &err)
  744. return
  745. }
  746. func (m *PartMessage) String() string {
  747. return fmt.Sprintf("[PartMessage H:%v R:%v T:%X]", m.Height, m.Round, m.Type)
  748. }