You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1094 lines
30 KiB

  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "math/rand"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. . "github.com/tendermint/tendermint/binary"
  13. . "github.com/tendermint/tendermint/blocks"
  14. . "github.com/tendermint/tendermint/common"
  15. "github.com/tendermint/tendermint/p2p"
  16. . "github.com/tendermint/tendermint/state"
  17. )
  18. const (
  19. ProposalCh = byte(0x20)
  20. KnownPartsCh = byte(0x21)
  21. VoteCh = byte(0x22)
  22. voteTypeNil = byte(0x00)
  23. voteTypeBlock = byte(0x01)
  24. roundDuration0 = 60 * time.Second // The first round is 60 seconds long.
  25. roundDurationDelta = 15 * time.Second // Each successive round lasts 15 seconds longer.
  26. roundDeadlineBare = float64(1.0 / 3.0) // When the bare vote is due.
  27. roundDeadlinePrecommit = float64(2.0 / 3.0) // When the precommit vote is due.
  28. newBlockWaitDuration = roundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds.
  29. voteRankCutoff = 2 // Higher ranks --> do not send votes.
  30. unsolicitedVoteRate = 0.01 // Probability of sending a high ranked vote.
  31. )
  32. //-----------------------------------------------------------------------------
  33. // total duration of given round
  34. func calcRoundDuration(round uint16) time.Duration {
  35. return roundDuration0 + roundDurationDelta*time.Duration(round)
  36. }
  37. // startTime is when round zero started.
  38. func calcRoundStartTime(round uint16, startTime time.Time) time.Time {
  39. return startTime.Add(roundDuration0*time.Duration(round) +
  40. roundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2)))
  41. }
  42. // calcs the current round given startTime of round zero.
  43. // NOTE: round is zero if startTime is in the future.
  44. func calcRound(startTime time.Time) uint16 {
  45. now := time.Now()
  46. if now.Before(startTime) {
  47. return 0
  48. }
  49. // Start + D_0 * R + D_delta * (R^2 - R)/2 <= Now; find largest integer R.
  50. // D_delta * R^2 + (2D_0 - D_delta) * R + 2(Start - Now) <= 0.
  51. // AR^2 + BR + C <= 0; A = D_delta, B = (2_D0 - D_delta), C = 2(Start - Now).
  52. // R = Floor((-B + Sqrt(B^2 - 4AC))/2A)
  53. A := float64(roundDurationDelta)
  54. B := 2.0*float64(roundDuration0) - float64(roundDurationDelta)
  55. C := 2.0 * float64(startTime.Sub(now))
  56. R := math.Floor((-B + math.Sqrt(B*B-4.0*A*C)/(2*A)))
  57. if math.IsNaN(R) {
  58. panic("Could not calc round, should not happen")
  59. }
  60. if R > math.MaxInt16 {
  61. Panicf("Could not calc round, round overflow: %v", R)
  62. }
  63. if R < 0 {
  64. return 0
  65. }
  66. return uint16(R)
  67. }
  68. // convenience
  69. // NOTE: elapsedRatio can be negative if startTime is in the future.
  70. func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration,
  71. roundElapsed time.Duration, elapsedRatio float64) {
  72. round = calcRound(startTime)
  73. roundStartTime = calcRoundStartTime(round, startTime)
  74. roundDuration = calcRoundDuration(round)
  75. roundElapsed = time.Now().Sub(roundStartTime)
  76. elapsedRatio = float64(roundElapsed) / float64(roundDuration)
  77. return
  78. }
  79. //-----------------------------------------------------------------------------
  80. type ConsensusManager struct {
  81. sw *p2p.Switch
  82. swEvents chan interface{}
  83. quit chan struct{}
  84. started uint32
  85. stopped uint32
  86. cs *ConsensusState
  87. blockStore *BlockStore
  88. doActionCh chan RoundAction
  89. mtx sync.Mutex
  90. state *State
  91. privValidator *PrivValidator
  92. peerStates map[string]*PeerState
  93. stagedProposal *BlockPartSet
  94. stagedState *State
  95. }
  96. func NewConsensusManager(sw *p2p.Switch, state *State, blockStore *BlockStore) *ConsensusManager {
  97. swEvents := make(chan interface{})
  98. sw.AddEventListener("ConsensusManager.swEvents", swEvents)
  99. cs := NewConsensusState(state)
  100. cm := &ConsensusManager{
  101. sw: sw,
  102. swEvents: swEvents,
  103. quit: make(chan struct{}),
  104. cs: cs,
  105. blockStore: blockStore,
  106. doActionCh: make(chan RoundAction, 1),
  107. state: state,
  108. peerStates: make(map[string]*PeerState),
  109. }
  110. return cm
  111. }
  112. // Sets our private validator account for signing votes.
  113. func (cm *ConsensusManager) SetPrivValidator(priv *PrivValidator) {
  114. cm.mtx.Lock()
  115. defer cm.mtx.Unlock()
  116. cm.privValidator = priv
  117. }
  118. func (cm *ConsensusManager) PrivValidator() *PrivValidator {
  119. cm.mtx.Lock()
  120. defer cm.mtx.Unlock()
  121. return cm.privValidator
  122. }
  123. func (cm *ConsensusManager) Start() {
  124. if atomic.CompareAndSwapUint32(&cm.started, 0, 1) {
  125. log.Info("Starting ConsensusManager")
  126. go cm.switchEventsRoutine()
  127. go cm.gossipProposalRoutine()
  128. go cm.knownPartsRoutine()
  129. go cm.gossipVoteRoutine()
  130. go cm.proposeAndVoteRoutine()
  131. }
  132. }
  133. func (cm *ConsensusManager) Stop() {
  134. if atomic.CompareAndSwapUint32(&cm.stopped, 0, 1) {
  135. log.Info("Stopping ConsensusManager")
  136. close(cm.quit)
  137. close(cm.swEvents)
  138. }
  139. }
  140. // Handle peer new/done events
  141. func (cm *ConsensusManager) switchEventsRoutine() {
  142. for {
  143. swEvent, ok := <-cm.swEvents
  144. if !ok {
  145. break
  146. }
  147. switch swEvent.(type) {
  148. case p2p.SwitchEventNewPeer:
  149. event := swEvent.(p2p.SwitchEventNewPeer)
  150. // Create peerState for event.Peer
  151. cm.mtx.Lock()
  152. cm.peerStates[event.Peer.Key] = NewPeerState(event.Peer)
  153. cm.mtx.Unlock()
  154. // Share our state with event.Peer
  155. // By sending KnownBlockPartsMessage,
  156. // we send our height/round + startTime, and known block parts,
  157. // which is sufficient for the peer to begin interacting with us.
  158. event.Peer.TrySend(ProposalCh, cm.makeKnownBlockPartsMessage(cm.cs.RoundState()))
  159. case p2p.SwitchEventDonePeer:
  160. event := swEvent.(p2p.SwitchEventDonePeer)
  161. // Delete peerState for event.Peer
  162. cm.mtx.Lock()
  163. peerState := cm.peerStates[event.Peer.Key]
  164. if peerState != nil {
  165. peerState.Disconnect()
  166. delete(cm.peerStates, event.Peer.Key)
  167. }
  168. cm.mtx.Unlock()
  169. default:
  170. log.Warning("Unhandled switch event type")
  171. }
  172. }
  173. }
  174. // Like, how large is it and how often can we send it?
  175. func (cm *ConsensusManager) makeKnownBlockPartsMessage(rs *RoundState) *KnownBlockPartsMessage {
  176. return &KnownBlockPartsMessage{
  177. Height: rs.Height,
  178. SecondsSinceStartTime: uint32(time.Now().Sub(rs.StartTime).Seconds()),
  179. BlockPartsBitArray: rs.Proposal.BitArray(),
  180. }
  181. }
  182. // NOTE: may return nil, but (nil).Wants*() returns false.
  183. func (cm *ConsensusManager) getPeerState(peer *p2p.Peer) *PeerState {
  184. cm.mtx.Lock()
  185. defer cm.mtx.Unlock()
  186. return cm.peerStates[peer.Key]
  187. }
  188. func (cm *ConsensusManager) gossipProposalRoutine() {
  189. OUTER_LOOP:
  190. for {
  191. // Get round state
  192. rs := cm.cs.RoundState()
  193. // Receive incoming message on ProposalCh
  194. inMsg, ok := cm.sw.Receive(ProposalCh)
  195. if !ok {
  196. break OUTER_LOOP // Client has stopped
  197. }
  198. _, msg_ := decodeMessage(inMsg.Bytes)
  199. log.Info("gossipProposalRoutine received %v", msg_)
  200. switch msg_.(type) {
  201. case *BlockPartMessage:
  202. msg := msg_.(*BlockPartMessage)
  203. // Add the block part if the height matches.
  204. if msg.BlockPart.Height == rs.Height &&
  205. msg.BlockPart.Round == rs.Round {
  206. // TODO Continue if we've already voted, then no point processing the part.
  207. // Check that the signature is valid and from proposer.
  208. if rs.Proposer.Verify(msg.BlockPart.Hash(), msg.BlockPart.Signature) {
  209. // TODO handle bad peer.
  210. continue OUTER_LOOP
  211. }
  212. // If we are the proposer, then don't do anything else.
  213. // We're already sending peers our proposal on another routine.
  214. privValidator := cm.PrivValidator()
  215. if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id {
  216. continue OUTER_LOOP
  217. }
  218. // Add and process the block part
  219. added, err := rs.Proposal.AddBlockPart(msg.BlockPart)
  220. if err == ErrInvalidBlockPartConflict {
  221. // TODO: Bad validator
  222. } else if err != nil {
  223. Panicf("Unexpected blockPartsSet error %v", err)
  224. }
  225. if added {
  226. // If peer wants this part, send peer the part
  227. // and our new blockParts state.
  228. kbpMsg := cm.makeKnownBlockPartsMessage(rs)
  229. partMsg := &BlockPartMessage{BlockPart: msg.BlockPart}
  230. for _, peer := range cm.sw.Peers().List() {
  231. peerState := cm.getPeerState(peer)
  232. if peerState.WantsBlockPart(msg.BlockPart) {
  233. peer.TrySend(KnownPartsCh, kbpMsg)
  234. peer.TrySend(ProposalCh, partMsg)
  235. }
  236. }
  237. } else {
  238. // We failed to process the block part.
  239. // Either an error, which we handled, or duplicate part.
  240. continue OUTER_LOOP
  241. }
  242. }
  243. default:
  244. // Ignore unknown message
  245. // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
  246. }
  247. }
  248. // Cleanup
  249. }
  250. func (cm *ConsensusManager) knownPartsRoutine() {
  251. OUTER_LOOP:
  252. for {
  253. // Receive incoming message on ProposalCh
  254. inMsg, ok := cm.sw.Receive(KnownPartsCh)
  255. if !ok {
  256. break OUTER_LOOP // Client has stopped
  257. }
  258. _, msg_ := decodeMessage(inMsg.Bytes)
  259. log.Info("knownPartsRoutine received %v", msg_)
  260. msg, ok := msg_.(*KnownBlockPartsMessage)
  261. if !ok {
  262. // Ignore unknown message type
  263. // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
  264. continue OUTER_LOOP
  265. }
  266. peerState := cm.getPeerState(inMsg.MConn.Peer)
  267. if !peerState.IsConnected() {
  268. // Peer disconnected before we were able to process.
  269. continue OUTER_LOOP
  270. }
  271. peerState.ApplyKnownBlockPartsMessage(msg)
  272. }
  273. // Cleanup
  274. }
  275. // Signs a vote document and broadcasts it.
  276. // hash can be nil to vote "nil"
  277. func (cm *ConsensusManager) signAndVote(vote *Vote) error {
  278. privValidator := cm.PrivValidator()
  279. if privValidator != nil {
  280. err := privValidator.SignVote(vote)
  281. if err != nil {
  282. return err
  283. }
  284. msg := p2p.TypedMessage{msgTypeVote, vote}
  285. cm.sw.Broadcast(VoteCh, msg)
  286. }
  287. return nil
  288. }
  289. func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error {
  290. // Already staged?
  291. cm.mtx.Lock()
  292. if cm.stagedProposal == proposal {
  293. cm.mtx.Unlock()
  294. return nil
  295. } else {
  296. cm.mtx.Unlock()
  297. }
  298. // Basic validation
  299. if !proposal.IsComplete() {
  300. return errors.New("Incomplete proposal BlockPartSet")
  301. }
  302. block := proposal.Block()
  303. err := block.ValidateBasic()
  304. if err != nil {
  305. return err
  306. }
  307. // Create a copy of the state for staging
  308. cm.mtx.Lock()
  309. stateCopy := cm.state.Copy() // Deep copy the state before staging.
  310. cm.mtx.Unlock()
  311. // Commit block onto the copied state.
  312. err = stateCopy.CommitBlock(block)
  313. if err != nil {
  314. return err
  315. }
  316. // Looks good!
  317. cm.mtx.Lock()
  318. cm.stagedProposal = proposal
  319. cm.stagedState = stateCopy
  320. cm.mtx.Unlock()
  321. return nil
  322. }
  323. // Constructs an unsigned proposal
  324. func (cm *ConsensusManager) constructProposal(rs *RoundState) (*BlockPartSet, error) {
  325. // XXX implement, first implement mempool
  326. // proposal := block.ToBlockPartSet()
  327. return nil, nil
  328. }
  329. // Vote for (or against) the proposal for this round.
  330. // Call during transition from RoundStepProposal to RoundStepVote.
  331. // We may not have received a full proposal.
  332. func (cm *ConsensusManager) voteProposal(rs *RoundState) error {
  333. // If we're locked, must vote that.
  334. locked := cm.cs.LockedProposal()
  335. if locked != nil {
  336. block := locked.Block()
  337. err := cm.signAndVote(&Vote{
  338. Height: rs.Height,
  339. Round: rs.Round,
  340. Type: VoteTypeBare,
  341. Hash: block.Hash(),
  342. })
  343. return err
  344. }
  345. // Stage proposal
  346. err := cm.stageProposal(rs.Proposal)
  347. if err != nil {
  348. // Vote for nil, whatever the error.
  349. err := cm.signAndVote(&Vote{
  350. Height: rs.Height,
  351. Round: rs.Round,
  352. Type: VoteTypeBare,
  353. Hash: nil,
  354. })
  355. return err
  356. }
  357. // Vote for block.
  358. err = cm.signAndVote(&Vote{
  359. Height: rs.Height,
  360. Round: rs.Round,
  361. Type: VoteTypeBare,
  362. Hash: rs.Proposal.Block().Hash(),
  363. })
  364. return err
  365. }
  366. // Precommit proposal if we see enough votes for it.
  367. // Call during transition from RoundStepVote to RoundStepPrecommit.
  368. func (cm *ConsensusManager) precommitProposal(rs *RoundState) error {
  369. // If we see a 2/3 majority for votes for a block, precommit.
  370. // TODO: maybe could use commitTime here and avg it with later commitTime?
  371. if hash, _, ok := rs.RoundBareVotes.TwoThirdsMajority(); ok {
  372. if len(hash) == 0 {
  373. // 2/3 majority voted for nil.
  374. return nil
  375. } else {
  376. // 2/3 majority voted for a block.
  377. // If proposal is invalid or unknown, do nothing.
  378. // See note on ZombieValidators to see why.
  379. if cm.stageProposal(rs.Proposal) != nil {
  380. return nil
  381. }
  382. // Lock this proposal.
  383. // NOTE: we're unlocking any prior locks.
  384. cm.cs.LockProposal(rs.Proposal)
  385. // Send precommit vote.
  386. err := cm.signAndVote(&Vote{
  387. Height: rs.Height,
  388. Round: rs.Round,
  389. Type: VoteTypePrecommit,
  390. Hash: hash,
  391. })
  392. return err
  393. }
  394. } else {
  395. // If we haven't seen enough votes, do nothing.
  396. return nil
  397. }
  398. }
  399. // Commit or unlock.
  400. // Call after RoundStepPrecommit, after round has completely expired.
  401. func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime time.Time, err error) {
  402. // If there exists a 2/3 majority of precommits.
  403. // Validate the block and commit.
  404. if hash, commitTime, ok := rs.RoundPrecommits.TwoThirdsMajority(); ok {
  405. // If the proposal is invalid or we don't have it,
  406. // do not commit.
  407. // TODO If we were just late to receive the block, when
  408. // do we actually get it? Document it.
  409. if cm.stageProposal(rs.Proposal) != nil {
  410. return time.Time{}, nil
  411. }
  412. // TODO: Remove?
  413. cm.cs.LockProposal(rs.Proposal)
  414. // Vote commit.
  415. err := cm.signAndVote(&Vote{
  416. Height: rs.Height,
  417. Round: rs.Round,
  418. Type: VoteTypePrecommit,
  419. Hash: hash,
  420. })
  421. if err != nil {
  422. return time.Time{}, err
  423. }
  424. // Commit block.
  425. cm.commitProposal(rs.Proposal, commitTime)
  426. return commitTime, nil
  427. } else {
  428. // Otherwise, if a 1/3 majority if a block that isn't our locked one exists, unlock.
  429. locked := cm.cs.LockedProposal()
  430. if locked != nil {
  431. for _, hashOrNil := range rs.RoundPrecommits.OneThirdMajority() {
  432. if hashOrNil == nil {
  433. continue
  434. }
  435. if !bytes.Equal(hashOrNil, locked.Block().Hash()) {
  436. // Unlock our lock.
  437. cm.cs.LockProposal(nil)
  438. }
  439. }
  440. }
  441. return time.Time{}, nil
  442. }
  443. }
  444. func (cm *ConsensusManager) commitProposal(proposal *BlockPartSet, commitTime time.Time) error {
  445. cm.mtx.Lock()
  446. defer cm.mtx.Unlock()
  447. if cm.stagedProposal != proposal {
  448. panic("Unexpected stagedProposal.") // Shouldn't happen.
  449. }
  450. // Save to blockStore
  451. block, blockParts := proposal.Block(), proposal.BlockParts()
  452. err := cm.blockStore.SaveBlockParts(block.Height, blockParts)
  453. if err != nil {
  454. return err
  455. }
  456. // What was staged becomes committed.
  457. cm.state = cm.stagedState
  458. cm.state.Save(commitTime)
  459. cm.cs.Update(cm.state)
  460. cm.stagedProposal = nil
  461. cm.stagedState = nil
  462. return nil
  463. }
  464. // Given a RoundState where we are the proposer,
  465. // broadcast rs.proposal to all the peers.
  466. func (cm *ConsensusManager) shareProposal(rs *RoundState) {
  467. privValidator := cm.PrivValidator()
  468. proposal := rs.Proposal
  469. if privValidator == nil || proposal == nil {
  470. return
  471. }
  472. privValidator.SignProposal(rs.Round, proposal)
  473. blockParts := proposal.BlockParts()
  474. peers := cm.sw.Peers().List()
  475. if len(peers) == 0 {
  476. log.Warning("Could not propose: no peers")
  477. return
  478. }
  479. numBlockParts := uint16(len(blockParts))
  480. kbpMsg := cm.makeKnownBlockPartsMessage(rs)
  481. for i, peer := range peers {
  482. peerState := cm.getPeerState(peer)
  483. if !peerState.IsConnected() {
  484. continue // Peer was disconnected.
  485. }
  486. startIndex := uint16((i * len(blockParts)) / len(peers))
  487. // Create a function that when called,
  488. // starts sending block parts to peer.
  489. cb := func(peer *p2p.Peer, startIndex uint16) func() {
  490. return func() {
  491. // TODO: if the clocks are off a bit,
  492. // peer may receive this before the round flips.
  493. peer.Send(KnownPartsCh, kbpMsg)
  494. for i := uint16(0); i < numBlockParts; i++ {
  495. part := blockParts[(startIndex+i)%numBlockParts]
  496. // Ensure round hasn't expired on our end.
  497. currentRS := cm.cs.RoundState()
  498. if currentRS != rs {
  499. return
  500. }
  501. // If peer wants the block:
  502. if peerState.WantsBlockPart(part) {
  503. partMsg := &BlockPartMessage{BlockPart: part}
  504. peer.Send(ProposalCh, partMsg)
  505. }
  506. }
  507. }
  508. }(peer, startIndex)
  509. // Call immediately or schedule cb for when peer is ready.
  510. peerState.SetRoundCallback(rs.Height, rs.Round, cb)
  511. }
  512. }
  513. func (cm *ConsensusManager) gossipVoteRoutine() {
  514. OUTER_LOOP:
  515. for {
  516. // Get round state
  517. rs := cm.cs.RoundState()
  518. // Receive incoming message on VoteCh
  519. inMsg, ok := cm.sw.Receive(VoteCh)
  520. if !ok {
  521. break // Client has stopped
  522. }
  523. type_, msg_ := decodeMessage(inMsg.Bytes)
  524. log.Info("gossipVoteRoutine received %v", msg_)
  525. switch msg_.(type) {
  526. case *Vote:
  527. vote := msg_.(*Vote)
  528. if vote.Height != rs.Height || vote.Round != rs.Round {
  529. continue OUTER_LOOP
  530. }
  531. added, rank, err := rs.AddVote(vote, inMsg.MConn.Peer.Key)
  532. // Send peer VoteRankMessage if needed
  533. if type_ == msgTypeVoteAskRank {
  534. msg := &VoteRankMessage{
  535. ValidatorId: vote.SignerId,
  536. Rank: rank,
  537. }
  538. inMsg.MConn.Peer.TrySend(VoteCh, msg)
  539. }
  540. // Process vote
  541. if !added {
  542. log.Info("Error adding vote %v", err)
  543. }
  544. switch err {
  545. case ErrVoteInvalidAccount, ErrVoteInvalidSignature:
  546. // TODO: Handle bad peer.
  547. case ErrVoteConflictingSignature, ErrVoteInvalidHash:
  548. // TODO: Handle bad validator.
  549. case nil:
  550. break
  551. //case ErrVoteUnexpectedPhase: Shouldn't happen.
  552. default:
  553. Panicf("Unexpected error from .AddVote(): %v", err)
  554. }
  555. if !added {
  556. continue
  557. }
  558. // Gossip vote.
  559. for _, peer := range cm.sw.Peers().List() {
  560. peerState := cm.getPeerState(peer)
  561. wantsVote, unsolicited := peerState.WantsVote(vote)
  562. if wantsVote {
  563. if unsolicited {
  564. // If we're sending an unsolicited vote,
  565. // ask for the rank so we know whether it's good.
  566. msg := p2p.TypedMessage{msgTypeVoteAskRank, vote}
  567. peer.TrySend(VoteCh, msg)
  568. } else {
  569. msg := p2p.TypedMessage{msgTypeVote, vote}
  570. peer.TrySend(VoteCh, msg)
  571. }
  572. }
  573. }
  574. case *VoteRankMessage:
  575. msg := msg_.(*VoteRankMessage)
  576. peerState := cm.getPeerState(inMsg.MConn.Peer)
  577. if !peerState.IsConnected() {
  578. // Peer disconnected before we were able to process.
  579. continue OUTER_LOOP
  580. }
  581. peerState.ApplyVoteRankMessage(msg)
  582. default:
  583. // Ignore unknown message
  584. // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
  585. }
  586. }
  587. // Cleanup
  588. }
  589. type RoundAction struct {
  590. Height uint32 // The block height for which consensus is reaching for.
  591. Round uint16 // The round number at given height.
  592. XnToStep uint8 // Transition to this step. Action depends on this value.
  593. }
  594. // Source of all round state transitions and votes.
  595. // It can be preemptively woken up via amessage to
  596. // doActionCh.
  597. func (cm *ConsensusManager) proposeAndVoteRoutine() {
  598. // Figure out when to wake up next (in the absence of other events)
  599. setAlarm := func() {
  600. if len(cm.doActionCh) > 0 {
  601. return // Already going to wake up later.
  602. }
  603. // Figure out which height/round/step we're at,
  604. // then schedule an action for when it is due.
  605. rs := cm.cs.RoundState()
  606. _, _, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime)
  607. switch rs.Step() {
  608. case RoundStepStart:
  609. // It's a new RoundState.
  610. if elapsedRatio < 0 {
  611. // startTime is in the future.
  612. time.Sleep(time.Duration(-1.0*elapsedRatio) * roundDuration)
  613. }
  614. cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepProposal}
  615. case RoundStepProposal:
  616. // Wake up when it's time to vote.
  617. time.Sleep(time.Duration(roundDeadlineBare-elapsedRatio) * roundDuration)
  618. cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepBareVotes}
  619. case RoundStepBareVotes:
  620. // Wake up when it's time to precommit.
  621. time.Sleep(time.Duration(roundDeadlinePrecommit-elapsedRatio) * roundDuration)
  622. cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepPrecommits}
  623. case RoundStepPrecommits:
  624. // Wake up when the round is over.
  625. time.Sleep(time.Duration(1.0-elapsedRatio) * roundDuration)
  626. cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepCommitOrUnlock}
  627. case RoundStepCommitOrUnlock:
  628. // This shouldn't happen.
  629. // Before setAlarm() got called,
  630. // logic should have created a new RoundState for the next round.
  631. panic("Should not happen")
  632. }
  633. }
  634. for {
  635. func() {
  636. roundAction := <-cm.doActionCh
  637. // Always set the alarm after any processing below.
  638. defer setAlarm()
  639. // We only consider acting on given height and round.
  640. height := roundAction.Height
  641. round := roundAction.Round
  642. // We only consider transitioning to given step.
  643. step := roundAction.XnToStep
  644. // This is the current state.
  645. rs := cm.cs.RoundState()
  646. if height != rs.Height || round != rs.Round {
  647. return // Not relevant.
  648. }
  649. if step == RoundStepProposal && rs.Step() == RoundStepStart {
  650. // Propose a block if I am the proposer.
  651. privValidator := cm.PrivValidator()
  652. if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id {
  653. // If we're already locked on a proposal, use that.
  654. proposal := cm.cs.LockedProposal()
  655. if proposal != nil {
  656. // Otherwise, construct a new proposal.
  657. var err error
  658. proposal, err = cm.constructProposal(rs)
  659. if err != nil {
  660. log.Error("Error attempting to construct a proposal: %v", err)
  661. return // Pretend like we weren't the proposer. Shrug.
  662. }
  663. }
  664. // Set proposal for roundState, so we vote correctly subsequently.
  665. rs.Proposal = proposal
  666. // Share the parts.
  667. // We send all parts to all of our peers, but everyone receives parts
  668. // starting at a different index, wrapping around back to 0.
  669. cm.shareProposal(rs)
  670. }
  671. } else if step == RoundStepBareVotes && rs.Step() <= RoundStepProposal {
  672. err := cm.voteProposal(rs)
  673. if err != nil {
  674. log.Info("Error attempting to vote for proposal: %v", err)
  675. }
  676. } else if step == RoundStepPrecommits && rs.Step() <= RoundStepBareVotes {
  677. err := cm.precommitProposal(rs)
  678. if err != nil {
  679. log.Info("Error attempting to precommit for proposal: %v", err)
  680. }
  681. } else if step == RoundStepCommitOrUnlock && rs.Step() <= RoundStepPrecommits {
  682. commitTime, err := cm.commitOrUnlockProposal(rs)
  683. if err != nil {
  684. log.Info("Error attempting to commit or update for proposal: %v", err)
  685. }
  686. if !commitTime.IsZero() {
  687. // We already set up ConsensusState for the next height
  688. // (it happens in the call to cm.commitProposal).
  689. } else {
  690. // Round is over. This is a special case.
  691. // Prepare a new RoundState for the next state.
  692. cm.cs.SetupRound(rs.Round + 1)
  693. return // setAlarm() takes care of the rest.
  694. }
  695. } else {
  696. return // Action is not relevant.
  697. }
  698. // Transition to new step.
  699. rs.SetStep(step)
  700. }()
  701. }
  702. }
  703. //-----------------------------------------------------------------------------
  704. var (
  705. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  706. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  707. )
  708. // TODO: voteRanks should purge bygone validators.
  709. type PeerState struct {
  710. mtx sync.Mutex
  711. connected bool
  712. peer *p2p.Peer
  713. height uint32
  714. startTime time.Time // Derived from offset seconds.
  715. blockPartsBitArray []byte
  716. voteRanks map[uint64]uint8
  717. cbHeight uint32
  718. cbRound uint16
  719. cbFunc func()
  720. }
  721. func NewPeerState(peer *p2p.Peer) *PeerState {
  722. return &PeerState{
  723. connected: true,
  724. peer: peer,
  725. height: 0,
  726. voteRanks: make(map[uint64]uint8),
  727. }
  728. }
  729. func (ps *PeerState) IsConnected() bool {
  730. if ps == nil {
  731. return false
  732. }
  733. ps.mtx.Lock()
  734. defer ps.mtx.Unlock()
  735. return ps.connected
  736. }
  737. func (ps *PeerState) Disconnect() {
  738. ps.mtx.Lock()
  739. defer ps.mtx.Unlock()
  740. ps.connected = false
  741. }
  742. func (ps *PeerState) WantsBlockPart(part *BlockPart) bool {
  743. if ps == nil {
  744. return false
  745. }
  746. ps.mtx.Lock()
  747. defer ps.mtx.Unlock()
  748. if !ps.connected {
  749. return false
  750. }
  751. // Only wants the part if peer's current height and round matches.
  752. if ps.height == part.Height {
  753. round := calcRound(ps.startTime)
  754. // NOTE: validators want to receive remaining block parts
  755. // even after it had voted bare or precommit.
  756. // Ergo, we do not check for which step the peer is in.
  757. if round == part.Round {
  758. // Only wants the part if it doesn't already have it.
  759. if ps.blockPartsBitArray[part.Index/8]&byte(1<<(part.Index%8)) == 0 {
  760. return true
  761. }
  762. }
  763. }
  764. return false
  765. }
  766. func (ps *PeerState) WantsVote(vote *Vote) (wants bool, unsolicited bool) {
  767. if ps == nil {
  768. return false, false
  769. }
  770. ps.mtx.Lock()
  771. defer ps.mtx.Unlock()
  772. if !ps.connected {
  773. return false, false
  774. }
  775. // Only wants the vote if peer's current height and round matches.
  776. if ps.height == vote.Height {
  777. round, _, _, _, elapsedRatio := calcRoundInfo(ps.startTime)
  778. if round == vote.Round {
  779. if vote.Type == VoteTypeBare && elapsedRatio > roundDeadlineBare {
  780. return false, false
  781. }
  782. if vote.Type == VoteTypePrecommit && elapsedRatio > roundDeadlinePrecommit {
  783. return false, false
  784. } else {
  785. // continue on ...
  786. }
  787. } else {
  788. return false, false
  789. }
  790. } else {
  791. return false, false
  792. }
  793. // Only wants the vote if voteRank is low.
  794. if ps.voteRanks[vote.SignerId] > voteRankCutoff {
  795. // Sometimes, send unsolicited votes to see if peer wants it.
  796. if rand.Float32() < unsolicitedVoteRate {
  797. return true, true
  798. } else {
  799. // Rank too high. Do not send vote.
  800. return false, false
  801. }
  802. }
  803. return true, false
  804. }
  805. func (ps *PeerState) ApplyKnownBlockPartsMessage(msg *KnownBlockPartsMessage) error {
  806. ps.mtx.Lock()
  807. defer ps.mtx.Unlock()
  808. // TODO: Sanity check len(BlockParts)
  809. if msg.Height < ps.height {
  810. return ErrPeerStateHeightRegression
  811. }
  812. if msg.Height == ps.height {
  813. if len(ps.blockPartsBitArray) == 0 {
  814. ps.blockPartsBitArray = msg.BlockPartsBitArray
  815. } else if len(msg.BlockPartsBitArray) > 0 {
  816. if len(ps.blockPartsBitArray) != len(msg.BlockPartsBitArray) {
  817. // TODO: If the peer received a part from
  818. // a proposer who signed a bad (or conflicting) part,
  819. // just about anything can happen with the new blockPartsBitArray.
  820. // In those cases it's alright to ignore the peer for the round,
  821. // and try to induce nil votes for that round.
  822. return nil
  823. } else {
  824. // TODO: Same as above. If previously known parts disappear,
  825. // something is fishy.
  826. // For now, just copy over known parts.
  827. for i, byt := range msg.BlockPartsBitArray {
  828. ps.blockPartsBitArray[i] |= byt
  829. }
  830. }
  831. }
  832. } else {
  833. // TODO: handle peer connection latency estimation.
  834. newStartTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  835. // Ensure that the new height's start time is sufficiently after the last startTime.
  836. // TODO: there should be some time between rounds.
  837. if !newStartTime.After(ps.startTime) {
  838. return ErrPeerStateInvalidStartTime
  839. }
  840. ps.startTime = newStartTime
  841. ps.height = msg.Height
  842. ps.blockPartsBitArray = msg.BlockPartsBitArray
  843. // Call callback if height+round matches.
  844. peerRound := calcRound(ps.startTime)
  845. if ps.cbFunc != nil && ps.cbHeight == ps.height && ps.cbRound == peerRound {
  846. go ps.cbFunc()
  847. ps.cbFunc = nil
  848. }
  849. }
  850. return nil
  851. }
  852. func (ps *PeerState) ApplyVoteRankMessage(msg *VoteRankMessage) error {
  853. ps.mtx.Lock()
  854. defer ps.mtx.Unlock()
  855. ps.voteRanks[msg.ValidatorId] = msg.Rank
  856. return nil
  857. }
  858. // Sets a single round callback, to be called when the height+round comes around.
  859. // If the height+round is current, calls "go f()" immediately.
  860. // Otherwise, does nothing.
  861. func (ps *PeerState) SetRoundCallback(height uint32, round uint16, f func()) {
  862. ps.mtx.Lock()
  863. defer ps.mtx.Unlock()
  864. if ps.height < height {
  865. ps.cbHeight = height
  866. ps.cbRound = round
  867. ps.cbFunc = f
  868. // Wait until the height of the peerState changes.
  869. // We'll call cbFunc then.
  870. return
  871. } else if ps.height == height {
  872. peerRound := calcRound(ps.startTime)
  873. if peerRound < round {
  874. // Set a timer to call the cbFunc when the time comes.
  875. go func() {
  876. roundStart := calcRoundStartTime(round, ps.startTime)
  877. time.Sleep(roundStart.Sub(time.Now()))
  878. // If peer height is still good
  879. ps.mtx.Lock()
  880. peerHeight := ps.height
  881. ps.mtx.Unlock()
  882. if peerHeight == height {
  883. f()
  884. }
  885. }()
  886. } else if peerRound == round {
  887. go f()
  888. } else {
  889. return
  890. }
  891. } else {
  892. return
  893. }
  894. }
  895. //-----------------------------------------------------------------------------
  896. // Messages
  897. const (
  898. msgTypeUnknown = byte(0x00)
  899. msgTypeBlockPart = byte(0x10)
  900. msgTypeKnownBlockParts = byte(0x11)
  901. msgTypeVote = byte(0x20)
  902. msgTypeVoteAskRank = byte(0x21)
  903. msgTypeVoteRank = byte(0x22)
  904. )
  905. // TODO: check for unnecessary extra bytes at the end.
  906. func decodeMessage(bz []byte) (msgType byte, msg interface{}) {
  907. n, err := new(int64), new(error)
  908. // log.Debug("decoding msg bytes: %X", bz)
  909. msgType = bz[0]
  910. switch msgType {
  911. case msgTypeBlockPart:
  912. msg = readBlockPartMessage(bytes.NewReader(bz[1:]), n, err)
  913. case msgTypeKnownBlockParts:
  914. msg = readKnownBlockPartsMessage(bytes.NewReader(bz[1:]), n, err)
  915. case msgTypeVote:
  916. msg = ReadVote(bytes.NewReader(bz[1:]), n, err)
  917. case msgTypeVoteAskRank:
  918. msg = ReadVote(bytes.NewReader(bz[1:]), n, err)
  919. case msgTypeVoteRank:
  920. msg = readVoteRankMessage(bytes.NewReader(bz[1:]), n, err)
  921. default:
  922. msg = nil
  923. }
  924. return
  925. }
  926. //-------------------------------------
  927. type BlockPartMessage struct {
  928. BlockPart *BlockPart
  929. }
  930. func readBlockPartMessage(r io.Reader, n *int64, err *error) *BlockPartMessage {
  931. return &BlockPartMessage{
  932. BlockPart: ReadBlockPart(r, n, err),
  933. }
  934. }
  935. func (m *BlockPartMessage) WriteTo(w io.Writer) (n int64, err error) {
  936. WriteByte(w, msgTypeBlockPart, &n, &err)
  937. WriteBinary(w, m.BlockPart, &n, &err)
  938. return
  939. }
  940. func (m *BlockPartMessage) String() string {
  941. return fmt.Sprintf("[BlockPartMessage %v]", m.BlockPart)
  942. }
  943. //-------------------------------------
  944. type KnownBlockPartsMessage struct {
  945. Height uint32
  946. SecondsSinceStartTime uint32
  947. BlockPartsBitArray []byte
  948. }
  949. func readKnownBlockPartsMessage(r io.Reader, n *int64, err *error) *KnownBlockPartsMessage {
  950. return &KnownBlockPartsMessage{
  951. Height: ReadUInt32(r, n, err),
  952. SecondsSinceStartTime: ReadUInt32(r, n, err),
  953. BlockPartsBitArray: ReadByteSlice(r, n, err),
  954. }
  955. }
  956. func (m *KnownBlockPartsMessage) WriteTo(w io.Writer) (n int64, err error) {
  957. WriteByte(w, msgTypeKnownBlockParts, &n, &err)
  958. WriteUInt32(w, m.Height, &n, &err)
  959. WriteUInt32(w, m.SecondsSinceStartTime, &n, &err)
  960. WriteByteSlice(w, m.BlockPartsBitArray, &n, &err)
  961. return
  962. }
  963. func (m *KnownBlockPartsMessage) String() string {
  964. return fmt.Sprintf("[KnownBlockPartsMessage H:%v SSST:%v, BPBA:%X]",
  965. m.Height, m.SecondsSinceStartTime, m.BlockPartsBitArray)
  966. }
  967. //-------------------------------------
  968. type VoteRankMessage struct {
  969. ValidatorId uint64
  970. Rank uint8
  971. }
  972. func readVoteRankMessage(r io.Reader, n *int64, err *error) *VoteRankMessage {
  973. return &VoteRankMessage{
  974. ValidatorId: ReadUInt64(r, n, err),
  975. Rank: ReadUInt8(r, n, err),
  976. }
  977. }
  978. func (m *VoteRankMessage) WriteTo(w io.Writer) (n int64, err error) {
  979. WriteByte(w, msgTypeVoteRank, &n, &err)
  980. WriteUInt64(w, m.ValidatorId, &n, &err)
  981. WriteUInt8(w, m.Rank, &n, &err)
  982. return
  983. }
  984. func (m *VoteRankMessage) String() string {
  985. return fmt.Sprintf("[VoteRankMessage V:%v, R:%v]", m.ValidatorId, m.Rank)
  986. }