You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

890 lines
25 KiB

  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. . "github.com/tendermint/tendermint/binary"
  12. . "github.com/tendermint/tendermint/blocks"
  13. . "github.com/tendermint/tendermint/common"
  14. "github.com/tendermint/tendermint/p2p"
  15. . "github.com/tendermint/tendermint/state"
  16. )
  17. const (
  18. ProposalCh = byte(0x20)
  19. KnownPartsCh = byte(0x21)
  20. VoteCh = byte(0x22)
  21. voteTypeNil = byte(0x00)
  22. voteTypeBlock = byte(0x01)
  23. roundDuration0 = 60 * time.Second // The first round is 60 seconds long.
  24. roundDurationDelta = 15 * time.Second // Each successive round lasts 15 seconds longer.
  25. roundDeadlineBare = float64(1.0 / 3.0) // When the bare vote is due.
  26. roundDeadlinePrecommit = float64(2.0 / 3.0) // When the precommit vote is due.
  27. )
  28. //-----------------------------------------------------------------------------
  29. // convenience
  30. func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration, roundElapsed time.Duration, elapsedRatio float64) {
  31. round = calcRound(startTime)
  32. roundStartTime = calcRoundStartTime(round, startTime)
  33. roundDuration = calcRoundDuration(round)
  34. roundElapsed = time.Now().Sub(roundStartTime)
  35. elapsedRatio = float64(roundElapsed) / float64(roundDuration)
  36. return
  37. }
  38. // total duration of given round
  39. func calcRoundDuration(round uint16) time.Duration {
  40. return roundDuration0 + roundDurationDelta*time.Duration(round)
  41. }
  42. // startTime is when round zero started.
  43. func calcRoundStartTime(round uint16, startTime time.Time) time.Time {
  44. return startTime.Add(roundDuration0*time.Duration(round) +
  45. roundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2)))
  46. }
  47. // calcs the current round given startTime of round zero.
  48. func calcRound(startTime time.Time) uint16 {
  49. now := time.Now()
  50. if now.Before(startTime) {
  51. Panicf("Cannot calc round when startTime is in the future: %v", startTime)
  52. }
  53. // Start + D_0 * R + D_delta * (R^2 - R)/2 <= Now; find largest integer R.
  54. // D_delta * R^2 + (2D_0 - D_delta) * R + 2(Start - Now) <= 0.
  55. // AR^2 + BR + C <= 0; A = D_delta, B = (2_D0 - D_delta), C = 2(Start - Now).
  56. // R = Floor((-B + Sqrt(B^2 - 4AC))/2A)
  57. A := float64(roundDurationDelta)
  58. B := 2.0*float64(roundDuration0) - float64(roundDurationDelta)
  59. C := 2.0 * float64(startTime.Sub(now))
  60. R := math.Floor((-B + math.Sqrt(B*B-4.0*A*C)/(2*A)))
  61. if math.IsNaN(R) {
  62. panic("Could not calc round, should not happen")
  63. }
  64. if R > math.MaxInt16 {
  65. Panicf("Could not calc round, round overflow: %v", R)
  66. }
  67. if R < 0 {
  68. return 0
  69. }
  70. return uint16(R)
  71. }
  72. //-----------------------------------------------------------------------------
  73. type ConsensusManager struct {
  74. sw *p2p.Switch
  75. swEvents chan interface{}
  76. quit chan struct{}
  77. started uint32
  78. stopped uint32
  79. cs *ConsensusState
  80. blockStore *BlockStore
  81. doActionCh chan RoundAction
  82. mtx sync.Mutex
  83. state *State
  84. privValidator *PrivValidator
  85. peerStates map[string]*PeerState
  86. stagedProposal *BlockPartSet
  87. stagedState *State
  88. }
  89. func NewConsensusManager(sw *p2p.Switch, state *State, blockStore *BlockStore) *ConsensusManager {
  90. swEvents := make(chan interface{})
  91. sw.AddEventListener("ConsensusManager.swEvents", swEvents)
  92. cs := NewConsensusState(state)
  93. cm := &ConsensusManager{
  94. sw: sw,
  95. swEvents: swEvents,
  96. quit: make(chan struct{}),
  97. cs: cs,
  98. blockStore: blockStore,
  99. doActionCh: make(chan RoundAction, 1),
  100. state: state,
  101. peerStates: make(map[string]*PeerState),
  102. }
  103. return cm
  104. }
  105. // Sets our private validator account for signing votes.
  106. func (cm *ConsensusManager) SetPrivValidator(priv *PrivValidator) {
  107. cm.mtx.Lock()
  108. defer cm.mtx.Unlock()
  109. cm.privValidator = priv
  110. }
  111. func (cm *ConsensusManager) Start() {
  112. if atomic.CompareAndSwapUint32(&cm.started, 0, 1) {
  113. log.Info("Starting ConsensusManager")
  114. go cm.switchEventsRoutine()
  115. go cm.gossipProposalRoutine()
  116. go cm.knownPartsRoutine()
  117. go cm.gossipVoteRoutine()
  118. go cm.proposeAndVoteRoutine()
  119. }
  120. }
  121. func (cm *ConsensusManager) Stop() {
  122. if atomic.CompareAndSwapUint32(&cm.stopped, 0, 1) {
  123. log.Info("Stopping ConsensusManager")
  124. close(cm.quit)
  125. close(cm.swEvents)
  126. }
  127. }
  128. // Handle peer new/done events
  129. func (cm *ConsensusManager) switchEventsRoutine() {
  130. for {
  131. swEvent, ok := <-cm.swEvents
  132. if !ok {
  133. break
  134. }
  135. switch swEvent.(type) {
  136. case p2p.SwitchEventNewPeer:
  137. event := swEvent.(p2p.SwitchEventNewPeer)
  138. // Create peerState for event.Peer
  139. cm.mtx.Lock()
  140. cm.peerStates[event.Peer.Key] = NewPeerState(event.Peer)
  141. cm.mtx.Unlock()
  142. // Share our state with event.Peer
  143. // By sending KnownBlockPartsMessage,
  144. // we send our height/round + startTime, and known block parts,
  145. // which is sufficient for the peer to begin interacting with us.
  146. event.Peer.TrySend(ProposalCh, cm.makeKnownBlockPartsMessage())
  147. case p2p.SwitchEventDonePeer:
  148. event := swEvent.(p2p.SwitchEventDonePeer)
  149. // Delete peerState for event.Peer
  150. cm.mtx.Lock()
  151. delete(cm.peerStates, event.Peer.Key)
  152. cm.mtx.Unlock()
  153. default:
  154. log.Warning("Unhandled switch event type")
  155. }
  156. }
  157. }
  158. // Like, how large is it and how often can we send it?
  159. func (cm *ConsensusManager) makeKnownBlockPartsMessage() *KnownBlockPartsMessage {
  160. rs := cm.cs.RoundState()
  161. return &KnownBlockPartsMessage{
  162. Height: rs.Height,
  163. SecondsSinceStartTime: uint32(time.Now().Sub(rs.StartTime).Seconds()),
  164. BlockPartsBitArray: rs.BlockPartSet.BitArray(),
  165. }
  166. }
  167. func (cm *ConsensusManager) getPeerState(peer *p2p.Peer) *PeerState {
  168. cm.mtx.Lock()
  169. defer cm.mtx.Unlock()
  170. peerState := cm.peerStates[peer.Key]
  171. if peerState == nil {
  172. log.Warning("Wanted peerState for %v but none exists", peer)
  173. }
  174. return peerState
  175. }
  176. func (cm *ConsensusManager) gossipProposalRoutine() {
  177. OUTER_LOOP:
  178. for {
  179. // Get round state
  180. rs := cm.cs.RoundState()
  181. // Receive incoming message on ProposalCh
  182. inMsg, ok := cm.sw.Receive(ProposalCh)
  183. if !ok {
  184. break OUTER_LOOP // Client has stopped
  185. }
  186. msg_ := decodeMessage(inMsg.Bytes)
  187. log.Info("gossipProposalRoutine received %v", msg_)
  188. switch msg_.(type) {
  189. case *BlockPartMessage:
  190. msg := msg_.(*BlockPartMessage)
  191. // Add the block part if the height matches.
  192. if msg.BlockPart.Height == rs.Height &&
  193. msg.BlockPart.Round == rs.Round {
  194. // TODO Continue if we've already voted, then no point processing the part.
  195. // Add and process the block part
  196. added, err := rs.BlockPartSet.AddBlockPart(msg.BlockPart)
  197. if err == ErrInvalidBlockPartConflict {
  198. // TODO: Bad validator
  199. } else if err == ErrInvalidBlockPartSignature {
  200. // TODO: Bad peer
  201. } else if err != nil {
  202. Panicf("Unexpected blockPartsSet error %v", err)
  203. }
  204. if added {
  205. // If peer wants this part, send peer the part
  206. // and our new blockParts state.
  207. kbpMsg := cm.makeKnownBlockPartsMessage()
  208. partMsg := &BlockPartMessage{BlockPart: msg.BlockPart}
  209. PEERS_LOOP:
  210. for _, peer := range cm.sw.Peers().List() {
  211. peerState := cm.getPeerState(peer)
  212. if peerState == nil {
  213. // Peer disconnected before we were able to process.
  214. continue PEERS_LOOP
  215. }
  216. if peerState.WantsBlockPart(msg.BlockPart) {
  217. peer.TrySend(KnownPartsCh, kbpMsg)
  218. peer.TrySend(ProposalCh, partMsg)
  219. }
  220. }
  221. } else {
  222. // We failed to process the block part.
  223. // Either an error, which we handled, or duplicate part.
  224. continue OUTER_LOOP
  225. }
  226. }
  227. default:
  228. // Ignore unknown message
  229. // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
  230. }
  231. }
  232. // Cleanup
  233. }
  234. func (cm *ConsensusManager) knownPartsRoutine() {
  235. OUTER_LOOP:
  236. for {
  237. // Receive incoming message on ProposalCh
  238. inMsg, ok := cm.sw.Receive(KnownPartsCh)
  239. if !ok {
  240. break OUTER_LOOP // Client has stopped
  241. }
  242. msg_ := decodeMessage(inMsg.Bytes)
  243. log.Info("knownPartsRoutine received %v", msg_)
  244. msg, ok := msg_.(*KnownBlockPartsMessage)
  245. if !ok {
  246. // Ignore unknown message type
  247. // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
  248. continue OUTER_LOOP
  249. }
  250. peerState := cm.getPeerState(inMsg.MConn.Peer)
  251. if peerState == nil {
  252. // Peer disconnected before we were able to process.
  253. continue OUTER_LOOP
  254. }
  255. peerState.ApplyKnownBlockPartsMessage(msg)
  256. }
  257. // Cleanup
  258. }
  259. // Signs a vote document and broadcasts it.
  260. // hash can be nil to vote "nil"
  261. func (cm *ConsensusManager) signAndVote(vote *Vote) error {
  262. if cm.privValidator != nil {
  263. err := cm.privValidator.SignVote(vote)
  264. if err != nil {
  265. return err
  266. }
  267. msg := p2p.TypedMessage{msgTypeVote, vote}
  268. cm.sw.Broadcast(VoteCh, msg)
  269. }
  270. return nil
  271. }
  272. func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error {
  273. // Already staged?
  274. cm.mtx.Lock()
  275. if cm.stagedProposal == proposal {
  276. cm.mtx.Unlock()
  277. return nil
  278. } else {
  279. cm.mtx.Unlock()
  280. }
  281. // Basic validation
  282. if !proposal.IsComplete() {
  283. return errors.New("Incomplete proposal BlockPartSet")
  284. }
  285. block, blockParts := blockPartSet.Block(), blockPartSet.BlockParts()
  286. err := block.ValidateBasic()
  287. if err != nil {
  288. return err
  289. }
  290. // Create a copy of the state for staging
  291. cm.mtx.Lock()
  292. stateCopy := cm.state.Copy() // Deep copy the state before staging.
  293. cm.mtx.Unlock()
  294. // Commit block onto the copied state.
  295. err := stateCopy.CommitBlock(block, block.Header.Time) // NOTE: fake commit time.
  296. if err != nil {
  297. return err
  298. }
  299. // Looks good!
  300. cm.mtx.Lock()
  301. cm.stagedProposal = proposal
  302. cm.stagedState = state
  303. cm.mtx.Unlock()
  304. return nil
  305. }
  306. func (cm *ConsensusManager) constructProposal(rs *RoundState) (*Block, error) {
  307. // XXX implement
  308. return nil, nil
  309. }
  310. // Vote for (or against) the proposal for this round.
  311. // Call during transition from RoundStepProposal to RoundStepVote.
  312. // We may not have received a full proposal.
  313. func (cm *ConsensusManager) voteProposal(rs *RoundState) error {
  314. // If we're locked, must vote that.
  315. locked := cm.cs.LockedProposal()
  316. if locked != nil {
  317. block := locked.Block()
  318. err := cm.signAndVote(&Vote{
  319. Height: rs.Height,
  320. Round: rs.Round,
  321. Type: VoteTypeBare,
  322. Hash: block.Hash(),
  323. })
  324. return err
  325. }
  326. // Stage proposal
  327. err := cm.stageProposal(rs.BlockPartSet)
  328. if err != nil {
  329. // Vote for nil, whatever the error.
  330. err := cm.signAndVote(&Vote{
  331. Height: rs.Height,
  332. Round: rs.Round,
  333. Type: VoteTypeBare,
  334. Hash: nil,
  335. })
  336. return err
  337. }
  338. // Vote for block.
  339. err := cm.signAndVote(&Vote{
  340. Height: rs.Height,
  341. Round: rs.Round,
  342. Type: VoteTypeBare,
  343. Hash: rs.BlockPartSet.Block().Hash(),
  344. })
  345. return err
  346. }
  347. // Precommit proposal if we see enough votes for it.
  348. // Call during transition from RoundStepVote to RoundStepPrecommit.
  349. func (cm *ConsensusManager) precommitProposal(rs *RoundState) error {
  350. // If we see a 2/3 majority for votes for a block, precommit.
  351. if hash, ok := rs.RoundBareVotes.TwoThirdsMajority(); ok {
  352. if len(hash) == 0 {
  353. // 2/3 majority voted for nil.
  354. return nil
  355. } else {
  356. // 2/3 majority voted for a block.
  357. // If proposal is invalid or unknown, do nothing.
  358. // See note on ZombieValidators to see why.
  359. if cm.stageProposal(rs.BlockPartSet) != nil {
  360. return nil
  361. }
  362. // Lock this proposal.
  363. // NOTE: we're unlocking any prior locks.
  364. cm.cs.LockProposal(rs.BlockPartSet)
  365. // Send precommit vote.
  366. err := cm.signAndVote(&Vote{
  367. Height: rs.Height,
  368. Round: rs.Round,
  369. Type: VoteTypePrecommit,
  370. Hash: hash,
  371. })
  372. return err
  373. }
  374. } else {
  375. // If we haven't seen enough votes, do nothing.
  376. return nil
  377. }
  378. }
  379. // Commit or unlock.
  380. // Call after RoundStepPrecommit, after round has expired.
  381. func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error {
  382. if hash, ok := rs.RoundPrecommits.TwoThirdsMajority(); ok {
  383. // If there exists a 2/3 majority of precommits.
  384. // Validate the block and commit.
  385. // If the proposal is invalid or we don't have it,
  386. // do not commit.
  387. // TODO If we were just late to receive the block, when
  388. // do we actually get it? Document it.
  389. if cm.stageProposal(rs.BlockPartSet) != nil {
  390. return nil
  391. }
  392. // TODO: Remove?
  393. cm.cs.LockProposal(rs.BlockPartSet)
  394. // Vote commit.
  395. err := cm.signAndVote(&Vote{
  396. Height: rs.Height,
  397. Round: rs.Round,
  398. Type: VoteTypePrecommit,
  399. Hash: hash,
  400. })
  401. if err != nil {
  402. return err
  403. }
  404. // Commit block.
  405. // XXX use adjusted commit time.
  406. // If we just use time.Now() we're not converging
  407. // time differences between nodes, so nodes end up drifting
  408. // in time.
  409. commitTime := time.Now()
  410. cm.commitProposal(rs.BlockPartSet, commitTime)
  411. return nil
  412. } else {
  413. // Otherwise, if a 1/3 majority if a block that isn't our locked one exists, unlock.
  414. locked := cm.cs.LockedProposal()
  415. if locked != nil {
  416. for _, hashOrNil := range rs.RoundPrecommits.OneThirdMajority() {
  417. if hashOrNil == nil {
  418. continue
  419. }
  420. hash := hashOrNil.([]byte)
  421. if !bytes.Equal(hash, locked.Block().Hash()) {
  422. // Unlock our lock.
  423. cm.cs.LockProposal(nil)
  424. }
  425. }
  426. }
  427. return nil
  428. }
  429. }
  430. func (cm *ConsensusManager) commitProposal(blockPartSet *BlockPartSet, commitTime time.Time) error {
  431. cm.mtx.Lock()
  432. defer cm.mtx.Unlock()
  433. if cm.stagedProposal != blockPartSet {
  434. panic("Unexpected stagedProposal.") // Shouldn't happen.
  435. }
  436. // Save to blockStore
  437. block, blockParts := blockPartSet.Block(), blockPartSet.BlockParts()
  438. err := cm.blockStore.SaveBlockParts(block.Height, blockParts)
  439. if err != nil {
  440. return err
  441. }
  442. // What was staged becomes committed.
  443. cm.state = cm.stagedState
  444. cm.cs.Update(cm.state)
  445. cm.stagedProposal = nil
  446. cm.stagedState = nil
  447. return nil
  448. }
  449. func (cm *ConsensusManager) gossipVoteRoutine() {
  450. OUTER_LOOP:
  451. for {
  452. // Get round state
  453. rs := cm.cs.RoundState()
  454. // Receive incoming message on VoteCh
  455. inMsg, ok := cm.sw.Receive(VoteCh)
  456. if !ok {
  457. break // Client has stopped
  458. }
  459. msg_ := decodeMessage(inMsg.Bytes)
  460. log.Info("gossipVoteRoutine received %v", msg_)
  461. switch msg_.(type) {
  462. case *Vote:
  463. vote := msg_.(*Vote)
  464. if vote.Height != rs.Height || vote.Round != rs.Round {
  465. continue OUTER_LOOP
  466. }
  467. added, err := rs.AddVote(vote)
  468. if !added {
  469. log.Info("Error adding vote %v", err)
  470. }
  471. switch err {
  472. case ErrVoteInvalidAccount, ErrVoteInvalidSignature:
  473. // TODO: Handle bad peer.
  474. case ErrVoteConflictingSignature, ErrVoteInvalidHash:
  475. // TODO: Handle bad validator.
  476. case nil:
  477. break
  478. //case ErrVoteUnexpectedPhase: Shouldn't happen.
  479. default:
  480. Panicf("Unexpected error from .AddVote(): %v", err)
  481. }
  482. if !added {
  483. continue
  484. }
  485. // Gossip vote.
  486. PEERS_LOOP:
  487. for _, peer := range cm.sw.Peers().List() {
  488. peerState := cm.getPeerState(peer)
  489. if peerState == nil {
  490. // Peer disconnected before we were able to process.
  491. continue PEERS_LOOP
  492. }
  493. if peerState.WantsVote(vote) {
  494. msg := p2p.TypedMessage{msgTypeVote, vote}
  495. peer.TrySend(VoteCh, msg)
  496. }
  497. }
  498. default:
  499. // Ignore unknown message
  500. // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
  501. }
  502. }
  503. // Cleanup
  504. }
  505. type RoundAction struct {
  506. Height uint32 // The block height for which consensus is reaching for.
  507. Round uint16 // The round number at given height.
  508. XnToStep uint8 // Transition to this step. Action depends on this value.
  509. }
  510. // Source of all round state transitions and votes.
  511. // It can be preemptively woken up via amessage to
  512. // doActionCh.
  513. func (cm *ConsensusManager) proposeAndVoteRoutine() {
  514. // Figure out when to wake up next (in the absence of other events)
  515. setAlarm := func() {
  516. if len(cm.doActionCh) > 0 {
  517. return // Already going to wake up later.
  518. }
  519. // Figure out which height/round/step we're at,
  520. // then schedule an action for when it is due.
  521. rs := cm.cs.RoundState()
  522. _, _, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime)
  523. switch rs.Step() {
  524. case RoundStepStart:
  525. // It's a new RoundState, immediately wake up and xn to RoundStepProposal.
  526. cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepProposal}
  527. case RoundStepProposal:
  528. // Wake up when it's time to vote.
  529. time.Sleep(time.Duration(roundDeadlineBare-elapsedRatio) * roundDuration)
  530. cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepBareVotes}
  531. case RoundStepBareVotes:
  532. // Wake up when it's time to precommit.
  533. time.Sleep(time.Duration(roundDeadlinePrecommit-elapsedRatio) * roundDuration)
  534. cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepPrecommits}
  535. case RoundStepPrecommits:
  536. // Wake up when the round is over.
  537. time.Sleep(time.Duration(1.0-elapsedRatio) * roundDuration)
  538. cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepCommitOrUnlock}
  539. case RoundStepCommitOrUnlock:
  540. // This shouldn't happen.
  541. // Before setAlarm() got called,
  542. // logic should have created a new RoundState for the next round.
  543. panic("Should not happen")
  544. }
  545. }
  546. for {
  547. func() {
  548. roundAction := <-cm.doActionCh
  549. // Always set the alarm after any processing below.
  550. defer setAlarm()
  551. // We only consider acting on given height and round.
  552. height := roundAction.Height
  553. round := roundAction.Round
  554. // We only consider transitioning to given step.
  555. step := roundAction.XnToStep
  556. // This is the current state.
  557. rs := cm.cs.RoundState()
  558. if height != rs.Height || round != rs.Round {
  559. return // Not relevant.
  560. }
  561. if step == RoundStepProposal && rs.Step() == RoundStepStart {
  562. // Propose a block if I am the proposer.
  563. if cm.privValidator != nil && rs.Proposer.Account.Id == cm.privValidator.Id {
  564. block, err := cm.constructProposal(rs)
  565. if err != nil {
  566. log.Error("Error attempting to construct a proposal: %v", err)
  567. }
  568. // XXX propose the block.
  569. log.Error("XXX use ", block)
  570. // XXX divide block into parts
  571. // XXX communicate parts.
  572. // XXX put this in another function.
  573. panic("Implement block proposal!")
  574. }
  575. } else if step == RoundStepBareVotes && rs.Step() <= RoundStepProposal {
  576. err := cm.voteProposal(rs)
  577. if err != nil {
  578. log.Info("Error attempting to vote for proposal: %v", err)
  579. }
  580. } else if step == RoundStepPrecommits && rs.Step() <= RoundStepBareVotes {
  581. err := cm.precommitProposal(rs)
  582. if err != nil {
  583. log.Info("Error attempting to precommit for proposal: %v", err)
  584. }
  585. } else if step == RoundStepCommitOrUnlock && rs.Step() <= RoundStepPrecommits {
  586. err := cm.commitOrUnlockProposal(rs)
  587. if err != nil {
  588. log.Info("Error attempting to commit or update for proposal: %v", err)
  589. }
  590. // Round is over. This is a special case.
  591. // Prepare a new RoundState for the next state.
  592. cm.cs.SetupRound(rs.Round + 1)
  593. return // setAlarm() takes care of the rest.
  594. } else {
  595. return // Action is not relevant.
  596. }
  597. // Transition to new step.
  598. rs.SetStep(step)
  599. }()
  600. }
  601. }
  602. //-----------------------------------------------------------------------------
  603. var (
  604. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  605. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  606. )
  607. type PeerState struct {
  608. mtx sync.Mutex
  609. peer *p2p.Peer
  610. height uint32
  611. startTime time.Time // Derived from offset seconds.
  612. blockPartsBitArray []byte
  613. votesWanted map[uint64]float32
  614. }
  615. func NewPeerState(peer *p2p.Peer) *PeerState {
  616. return &PeerState{
  617. peer: peer,
  618. height: 0,
  619. votesWanted: make(map[uint64]float32),
  620. }
  621. }
  622. func (ps *PeerState) WantsBlockPart(part *BlockPart) bool {
  623. ps.mtx.Lock()
  624. defer ps.mtx.Unlock()
  625. // Only wants the part if peer's current height and round matches.
  626. if ps.height == part.Height {
  627. round, _, _, _, elapsedRatio := calcRoundInfo(ps.startTime)
  628. if round == part.Round && elapsedRatio < roundDeadlineBare {
  629. // Only wants the part if it doesn't already have it.
  630. if ps.blockPartsBitArray[part.Index/8]&byte(1<<(part.Index%8)) == 0 {
  631. return true
  632. }
  633. }
  634. }
  635. return false
  636. }
  637. func (ps *PeerState) WantsVote(vote *Vote) bool {
  638. ps.mtx.Lock()
  639. defer ps.mtx.Unlock()
  640. // Only wants the vote if votesWanted says so
  641. if ps.votesWanted[vote.SignerId] <= 0 {
  642. // TODO: sometimes, send unsolicited votes to see if peer wants it.
  643. return false
  644. }
  645. // Only wants the vote if peer's current height and round matches.
  646. if ps.height == vote.Height {
  647. round, _, _, _, elapsedRatio := calcRoundInfo(ps.startTime)
  648. if round == vote.Round {
  649. if vote.Type == VoteTypeBare && elapsedRatio > roundDeadlineBare {
  650. return false
  651. }
  652. if vote.Type == VoteTypePrecommit && elapsedRatio > roundDeadlinePrecommit {
  653. return false
  654. }
  655. return true
  656. }
  657. }
  658. return false
  659. }
  660. func (ps *PeerState) ApplyKnownBlockPartsMessage(msg *KnownBlockPartsMessage) error {
  661. ps.mtx.Lock()
  662. defer ps.mtx.Unlock()
  663. // TODO: Sanity check len(BlockParts)
  664. if msg.Height < ps.height {
  665. return ErrPeerStateHeightRegression
  666. }
  667. if msg.Height == ps.height {
  668. if len(ps.blockPartsBitArray) == 0 {
  669. ps.blockPartsBitArray = msg.BlockPartsBitArray
  670. } else if len(msg.BlockPartsBitArray) > 0 {
  671. if len(ps.blockPartsBitArray) != len(msg.BlockPartsBitArray) {
  672. // TODO: If the peer received a part from
  673. // a proposer who signed a bad (or conflicting) part,
  674. // just about anything can happen with the new blockPartsBitArray.
  675. // In those cases it's alright to ignore the peer for the round,
  676. // and try to induce nil votes for that round.
  677. return nil
  678. } else {
  679. // TODO: Same as above. If previously known parts disappear,
  680. // something is fishy.
  681. // For now, just copy over known parts.
  682. for i, byt := range msg.BlockPartsBitArray {
  683. ps.blockPartsBitArray[i] |= byt
  684. }
  685. }
  686. }
  687. } else {
  688. // TODO: handle peer connection latency estimation.
  689. newStartTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  690. // Ensure that the new height's start time is sufficiently after the last startTime.
  691. // TODO: there should be some time between rounds.
  692. if !newStartTime.After(ps.startTime) {
  693. return ErrPeerStateInvalidStartTime
  694. }
  695. ps.startTime = newStartTime
  696. ps.height = msg.Height
  697. ps.blockPartsBitArray = msg.BlockPartsBitArray
  698. }
  699. return nil
  700. }
  701. func (ps *PeerState) ApplyVoteRankMessage(msg *VoteRankMessage) error {
  702. ps.mtx.Lock()
  703. defer ps.mtx.Unlock()
  704. // XXX IMPLEMENT
  705. return nil
  706. }
  707. //-----------------------------------------------------------------------------
  708. // Messages
  709. const (
  710. msgTypeUnknown = Byte(0x00)
  711. msgTypeBlockPart = Byte(0x10)
  712. msgTypeKnownBlockParts = Byte(0x11)
  713. msgTypeVote = Byte(0x20)
  714. msgTypeVoteRank = Byte(0x21)
  715. )
  716. // TODO: check for unnecessary extra bytes at the end.
  717. func decodeMessage(bz ByteSlice) (msg interface{}) {
  718. // log.Debug("decoding msg bytes: %X", bz)
  719. switch Byte(bz[0]) {
  720. case msgTypeBlockPart:
  721. return readBlockPartMessage(bytes.NewReader(bz[1:]))
  722. case msgTypeKnownBlockParts:
  723. return readKnownBlockPartsMessage(bytes.NewReader(bz[1:]))
  724. case msgTypeVote:
  725. return ReadVote(bytes.NewReader(bz[1:]))
  726. case msgTypeVoteRank:
  727. return readVoteRankMessage(bytes.NewReader(bz[1:]))
  728. default:
  729. return nil
  730. }
  731. }
  732. //-------------------------------------
  733. type BlockPartMessage struct {
  734. BlockPart *BlockPart
  735. }
  736. func readBlockPartMessage(r io.Reader) *BlockPartMessage {
  737. return &BlockPartMessage{
  738. BlockPart: ReadBlockPart(r),
  739. }
  740. }
  741. func (m *BlockPartMessage) WriteTo(w io.Writer) (n int64, err error) {
  742. n, err = WriteTo(msgTypeBlockPart, w, n, err)
  743. n, err = WriteTo(m.BlockPart, w, n, err)
  744. return
  745. }
  746. func (m *BlockPartMessage) String() string {
  747. return fmt.Sprintf("[BlockPartMessage %v]", m.BlockPart)
  748. }
  749. //-------------------------------------
  750. type KnownBlockPartsMessage struct {
  751. Height uint32
  752. SecondsSinceStartTime uint32
  753. BlockPartsBitArray ByteSlice
  754. }
  755. func readKnownBlockPartsMessage(r io.Reader) *KnownBlockPartsMessage {
  756. return &KnownBlockPartsMessage{
  757. Height: Readuint32(r),
  758. SecondsSinceStartTime: Readuint32(r),
  759. BlockPartsBitArray: ReadByteSlice(r),
  760. }
  761. }
  762. func (m *KnownBlockPartsMessage) WriteTo(w io.Writer) (n int64, err error) {
  763. n, err = WriteTo(msgTypeKnownBlockParts, w, n, err)
  764. n, err = WriteTo(UInt32(m.Height), w, n, err)
  765. n, err = WriteTo(UInt32(m.SecondsSinceStartTime), w, n, err)
  766. n, err = WriteTo(m.BlockPartsBitArray, w, n, err)
  767. return
  768. }
  769. func (m *KnownBlockPartsMessage) String() string {
  770. return fmt.Sprintf("[KnownBlockPartsMessage H:%v SSST:%v, BPBA:%X]",
  771. m.Height, m.SecondsSinceStartTime, m.BlockPartsBitArray)
  772. }
  773. //-------------------------------------
  774. // XXX use this.
  775. type VoteRankMessage struct {
  776. ValidatorId uint64
  777. Rank uint8
  778. }
  779. func readVoteRankMessage(r io.Reader) *VoteRankMessage {
  780. return &VoteRankMessage{
  781. ValidatorId: Readuint64(r),
  782. Rank: Readuint8(r),
  783. }
  784. }
  785. func (m *VoteRankMessage) WriteTo(w io.Writer) (n int64, err error) {
  786. n, err = WriteTo(msgTypeVoteRank, w, n, err)
  787. n, err = WriteTo(UInt64(m.ValidatorId), w, n, err)
  788. n, err = WriteTo(UInt8(m.Rank), w, n, err)
  789. return
  790. }
  791. func (m *VoteRankMessage) String() string {
  792. return fmt.Sprintf("[VoteRankMessage V:%v, R:%v]", m.ValidatorId, m.Rank)
  793. }