You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

915 lines
26 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. PeerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. )
  26. //-----------------------------------------------------------------------------
  27. // The reactor's underlying ConsensusState may change state at any time.
  28. // We atomically copy the RoundState struct before using it.
  29. type ConsensusReactor struct {
  30. sw *p2p.Switch
  31. running uint32
  32. quit chan struct{}
  33. blockStore *bc.BlockStore
  34. conS *ConsensusState
  35. // if fast sync is running we don't really do anything
  36. sync bool
  37. evsw events.Fireable
  38. }
  39. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor {
  40. conR := &ConsensusReactor{
  41. blockStore: blockStore,
  42. quit: make(chan struct{}),
  43. conS: consensusState,
  44. sync: sync,
  45. }
  46. return conR
  47. }
  48. // Implements Reactor
  49. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  50. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  51. log.Info("Starting ConsensusReactor")
  52. conR.sw = sw
  53. if !conR.sync {
  54. conR.conS.Start()
  55. }
  56. go conR.broadcastNewRoundStepRoutine()
  57. }
  58. }
  59. // Implements Reactor
  60. func (conR *ConsensusReactor) Stop() {
  61. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  62. log.Info("Stopping ConsensusReactor")
  63. conR.conS.Stop()
  64. close(conR.quit)
  65. }
  66. }
  67. func (conR *ConsensusReactor) IsRunning() bool {
  68. return atomic.LoadUint32(&conR.running) == 1
  69. }
  70. // Implements Reactor
  71. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  72. // TODO optimize
  73. return []*p2p.ChannelDescriptor{
  74. &p2p.ChannelDescriptor{
  75. Id: StateChannel,
  76. Priority: 5,
  77. SendQueueCapacity: 100,
  78. },
  79. &p2p.ChannelDescriptor{
  80. Id: DataChannel,
  81. Priority: 5,
  82. SendQueueCapacity: 2,
  83. },
  84. &p2p.ChannelDescriptor{
  85. Id: VoteChannel,
  86. Priority: 5,
  87. SendQueueCapacity: 40,
  88. },
  89. }
  90. }
  91. // Implements Reactor
  92. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  93. if !conR.IsRunning() {
  94. return
  95. }
  96. // Create peerState for peer
  97. peerState := NewPeerState(peer)
  98. peer.Data.Set(PeerStateKey, peerState)
  99. // Begin gossip routines for this peer.
  100. go conR.gossipDataRoutine(peer, peerState)
  101. go conR.gossipVotesRoutine(peer, peerState)
  102. // Send our state to peer.
  103. conR.sendNewRoundStep(peer)
  104. }
  105. // Implements Reactor
  106. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. if !conR.IsRunning() {
  108. return
  109. }
  110. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  111. }
  112. // Implements Reactor
  113. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  114. if conR.sync || !conR.IsRunning() {
  115. return
  116. }
  117. // Get round state
  118. rs := conR.conS.GetRoundState()
  119. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  120. _, msg_, err := DecodeMessage(msgBytes)
  121. if err != nil {
  122. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  123. return
  124. }
  125. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_) //, "bytes", msgBytes)
  126. switch chId {
  127. case StateChannel:
  128. switch msg := msg_.(type) {
  129. case *NewRoundStepMessage:
  130. ps.ApplyNewRoundStepMessage(msg, rs)
  131. case *CommitStepMessage:
  132. ps.ApplyCommitStepMessage(msg)
  133. case *HasVoteMessage:
  134. ps.ApplyHasVoteMessage(msg)
  135. default:
  136. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  137. }
  138. case DataChannel:
  139. switch msg := msg_.(type) {
  140. case *ProposalMessage:
  141. ps.SetHasProposal(msg.Proposal)
  142. err = conR.conS.SetProposal(msg.Proposal)
  143. case *PartMessage:
  144. if msg.Type == partTypeProposalBlock {
  145. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  146. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  147. } else if msg.Type == partTypeProposalPOL {
  148. ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
  149. _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
  150. } else {
  151. log.Warn(Fmt("Unknown part type %v", msg.Type))
  152. }
  153. default:
  154. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  155. }
  156. case VoteChannel:
  157. switch msg := msg_.(type) {
  158. case *VoteMessage:
  159. vote := msg.Vote
  160. if rs.Height != vote.Height {
  161. if rs.Height == vote.Height+1 {
  162. if rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypeCommit {
  163. goto VOTE_PASS // *ducks*
  164. }
  165. }
  166. return // Wrong height. Not necessarily a bad peer.
  167. }
  168. VOTE_PASS:
  169. validatorIndex := msg.ValidatorIndex
  170. address, _ := rs.Validators.GetByIndex(validatorIndex)
  171. added, index, err := conR.conS.AddVote(address, vote)
  172. if err != nil {
  173. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  174. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  175. log.Warn("Found conflicting vote. Publish evidence")
  176. evidenceTx := &types.DupeoutTx{
  177. Address: address,
  178. VoteA: *errDupe.VoteA,
  179. VoteB: *errDupe.VoteB,
  180. }
  181. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  182. } else {
  183. // Probably an invalid signature. Bad peer.
  184. log.Warn("Error attempting to add vote", "error", err)
  185. // TODO: punish peer
  186. }
  187. }
  188. // Initialize Prevotes/Precommits/Commits if needed
  189. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  190. ps.SetHasVote(vote, index)
  191. if added {
  192. msg := &HasVoteMessage{
  193. Height: vote.Height,
  194. Round: vote.Round,
  195. Type: vote.Type,
  196. Index: index,
  197. }
  198. conR.sw.Broadcast(StateChannel, msg)
  199. }
  200. default:
  201. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  202. }
  203. default:
  204. log.Warn(Fmt("Unknown channel %X", chId))
  205. }
  206. if err != nil {
  207. log.Warn("Error in Receive()", "error", err)
  208. }
  209. }
  210. // Sets our private validator account for signing votes.
  211. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  212. conR.conS.SetPrivValidator(priv)
  213. }
  214. // Switch from the fast sync to the consensus:
  215. // reset the state, turn off fast sync, start the consensus-state-machine
  216. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  217. conR.conS.updateToState(state, false)
  218. conR.conS.newStepCh <- conR.conS.getRoundState()
  219. conR.sync = false
  220. conR.conS.Start()
  221. }
  222. // implements events.Eventable
  223. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  224. conR.evsw = evsw
  225. conR.conS.SetFireable(evsw)
  226. }
  227. //--------------------------------------
  228. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  229. // Get seconds since beginning of height.
  230. timeElapsed := time.Now().Sub(rs.StartTime)
  231. // Broadcast NewRoundStepMessage
  232. nrsMsg = &NewRoundStepMessage{
  233. Height: rs.Height,
  234. Round: rs.Round,
  235. Step: rs.Step,
  236. SecondsSinceStartTime: uint(timeElapsed.Seconds()),
  237. }
  238. // If the step is commit, then also broadcast a CommitStepMessage.
  239. if rs.Step == RoundStepCommit {
  240. csMsg = &CommitStepMessage{
  241. Height: rs.Height,
  242. BlockParts: rs.ProposalBlockParts.Header(),
  243. BlockBitArray: rs.ProposalBlockParts.BitArray(),
  244. }
  245. }
  246. return
  247. }
  248. // Listens for changes to the ConsensusState.Step by pulling
  249. // on conR.conS.NewStepCh().
  250. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  251. for {
  252. // Get RoundState with new Step or quit.
  253. var rs *RoundState
  254. select {
  255. case rs = <-conR.conS.NewStepCh():
  256. case <-conR.quit:
  257. return
  258. }
  259. nrsMsg, csMsg := makeRoundStepMessages(rs)
  260. if nrsMsg != nil {
  261. conR.sw.Broadcast(StateChannel, nrsMsg)
  262. }
  263. if csMsg != nil {
  264. conR.sw.Broadcast(StateChannel, csMsg)
  265. }
  266. }
  267. }
  268. func (conR *ConsensusReactor) sendNewRoundStep(peer *p2p.Peer) {
  269. rs := conR.conS.GetRoundState()
  270. nrsMsg, csMsg := makeRoundStepMessages(rs)
  271. if nrsMsg != nil {
  272. peer.Send(StateChannel, nrsMsg)
  273. }
  274. if csMsg != nil {
  275. peer.Send(StateChannel, csMsg)
  276. }
  277. }
  278. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  279. OUTER_LOOP:
  280. for {
  281. // Manage disconnects from self or peer.
  282. if !peer.IsRunning() || !conR.IsRunning() {
  283. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  284. return
  285. }
  286. rs := conR.conS.GetRoundState()
  287. prs := ps.GetRoundState()
  288. // Send proposal Block parts?
  289. // NOTE: if we or peer is at RoundStepCommit*, the round
  290. // won't necessarily match, but that's OK.
  291. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
  292. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  293. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
  294. part := rs.ProposalBlockParts.GetPart(index)
  295. msg := &PartMessage{
  296. Height: rs.Height,
  297. Round: rs.Round,
  298. Type: partTypeProposalBlock,
  299. Part: part,
  300. }
  301. peer.Send(DataChannel, msg)
  302. ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
  303. continue OUTER_LOOP
  304. }
  305. }
  306. // If the peer is on a previous height, help catch up.
  307. if 0 < prs.Height && prs.Height < rs.Height {
  308. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
  309. if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
  310. // Ensure that the peer's PartSetHeader is correct
  311. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  312. if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
  313. log.Debug("Peer ProposalBlockParts mismatch, sleeping",
  314. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  315. time.Sleep(peerGossipSleepDuration)
  316. continue OUTER_LOOP
  317. }
  318. // Load the part
  319. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  320. if part == nil {
  321. log.Warn("Could not load part", "index", index,
  322. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  323. time.Sleep(peerGossipSleepDuration)
  324. continue OUTER_LOOP
  325. }
  326. // Send the part
  327. msg := &PartMessage{
  328. Height: prs.Height,
  329. Round: prs.Round,
  330. Type: partTypeProposalBlock,
  331. Part: part,
  332. }
  333. peer.Send(DataChannel, msg)
  334. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  335. continue OUTER_LOOP
  336. } else {
  337. //log.Debug("No parts to send in catch-up, sleeping")
  338. time.Sleep(peerGossipSleepDuration)
  339. continue OUTER_LOOP
  340. }
  341. }
  342. // If height and round don't match, sleep.
  343. if rs.Height != prs.Height || rs.Round != prs.Round {
  344. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  345. time.Sleep(peerGossipSleepDuration)
  346. continue OUTER_LOOP
  347. }
  348. // Send proposal?
  349. if rs.Proposal != nil && !prs.Proposal {
  350. msg := &ProposalMessage{Proposal: rs.Proposal}
  351. peer.Send(DataChannel, msg)
  352. ps.SetHasProposal(rs.Proposal)
  353. continue OUTER_LOOP
  354. }
  355. // Send proposal POL parts?
  356. if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) {
  357. if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok {
  358. msg := &PartMessage{
  359. Height: rs.Height,
  360. Round: rs.Round,
  361. Type: partTypeProposalPOL,
  362. Part: rs.ProposalPOLParts.GetPart(index),
  363. }
  364. peer.Send(DataChannel, msg)
  365. ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
  366. continue OUTER_LOOP
  367. }
  368. }
  369. // Nothing to do. Sleep.
  370. time.Sleep(peerGossipSleepDuration)
  371. continue OUTER_LOOP
  372. }
  373. }
  374. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  375. // Simple hack to throttle logs upon sleep.
  376. var sleeping = 0
  377. OUTER_LOOP:
  378. for {
  379. // Manage disconnects from self or peer.
  380. if !peer.IsRunning() || !conR.IsRunning() {
  381. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  382. return
  383. }
  384. rs := conR.conS.GetRoundState()
  385. prs := ps.GetRoundState()
  386. switch sleeping {
  387. case 1: // First sleep
  388. sleeping = 2
  389. case 2: // No more sleep
  390. sleeping = 0
  391. }
  392. // Returns true when useful work was done.
  393. trySendVote := func(height uint, voteSet *VoteSet, peerVoteSet *BitArray) (sent bool) {
  394. if voteSet == nil {
  395. return false
  396. } else if peerVoteSet == nil {
  397. ps.EnsureVoteBitArrays(height, voteSet.Size())
  398. return true
  399. }
  400. // TODO: give priority to our vote.
  401. if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok {
  402. vote := voteSet.GetByIndex(index)
  403. // NOTE: vote may be a commit.
  404. msg := &VoteMessage{index, vote}
  405. peer.Send(VoteChannel, msg)
  406. ps.SetHasVote(vote, index)
  407. return true
  408. }
  409. return false
  410. }
  411. // Returns true when useful work was done.
  412. trySendCommitFromValidation := func(blockMeta *types.BlockMeta, validation *types.Validation, peerVoteSet *BitArray) (sent bool) {
  413. if validation == nil {
  414. return false
  415. } else if peerVoteSet == nil {
  416. ps.EnsureVoteBitArrays(blockMeta.Header.Height, uint(len(validation.Commits)))
  417. return true
  418. }
  419. if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
  420. commit := validation.Commits[index]
  421. log.Debug("Picked commit to send", "index", index, "commit", commit)
  422. // Reconstruct vote.
  423. vote := &types.Vote{
  424. Height: prs.Height,
  425. Round: commit.Round,
  426. Type: types.VoteTypeCommit,
  427. BlockHash: blockMeta.Hash,
  428. BlockParts: blockMeta.Parts,
  429. Signature: commit.Signature,
  430. }
  431. msg := &VoteMessage{index, vote}
  432. peer.Send(VoteChannel, msg)
  433. ps.SetHasVote(vote, index)
  434. return true
  435. }
  436. return false
  437. }
  438. // If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
  439. if rs.Height == prs.Height {
  440. // If there are lastcommits to send...
  441. if prs.Round == 0 && prs.Step == RoundStepNewHeight {
  442. if trySendVote(rs.Height-1, rs.LastCommits, prs.LastCommits) {
  443. continue OUTER_LOOP
  444. }
  445. }
  446. // If there are prevotes to send...
  447. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  448. if trySendVote(rs.Height, rs.Prevotes, prs.Prevotes) {
  449. continue OUTER_LOOP
  450. }
  451. }
  452. // If there are precommits to send...
  453. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  454. if trySendVote(rs.Height, rs.Precommits, prs.Precommits) {
  455. continue OUTER_LOOP
  456. }
  457. }
  458. // If there are any commits to send...
  459. if trySendVote(rs.Height, rs.Commits, prs.Commits) {
  460. continue OUTER_LOOP
  461. }
  462. }
  463. // Catchup logic
  464. if prs.Height != 0 && !prs.HasAllCatchupCommits {
  465. // If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits.
  466. if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 {
  467. // If there are lastcommits to send...
  468. if trySendVote(prs.Height, rs.LastCommits, prs.Commits) {
  469. continue OUTER_LOOP
  470. } else {
  471. ps.SetHasAllCatchupCommits(prs.Height)
  472. }
  473. }
  474. // Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation.
  475. if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 {
  476. // Load the blockMeta for block at prs.Height
  477. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  478. // Load the seen validation for prs.Height
  479. validation := conR.blockStore.LoadSeenValidation(prs.Height)
  480. log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation)
  481. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  482. continue OUTER_LOOP
  483. } else {
  484. ps.SetHasAllCatchupCommits(prs.Height)
  485. }
  486. }
  487. // If peer is lagging by more than 1, send Validation.
  488. if rs.Height >= prs.Height+2 {
  489. // Load the blockMeta for block at prs.Height
  490. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  491. // Load the block validation for prs.Height+1,
  492. // which contains commit signatures for prs.Height.
  493. validation := conR.blockStore.LoadBlockValidation(prs.Height + 1)
  494. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height+1, "blockMeta", blockMeta, "validation", validation)
  495. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  496. continue OUTER_LOOP
  497. } else {
  498. ps.SetHasAllCatchupCommits(prs.Height)
  499. }
  500. }
  501. }
  502. if sleeping == 0 {
  503. // We sent nothing. Sleep...
  504. sleeping = 1
  505. log.Debug("No votes to send, sleeping", "peer", peer,
  506. "localPV", rs.Prevotes.BitArray(), "peerPV", prs.Prevotes,
  507. "localPC", rs.Precommits.BitArray(), "peerPC", prs.Precommits,
  508. "localCM", rs.Commits.BitArray(), "peerCM", prs.Commits)
  509. } else if sleeping == 2 {
  510. // Continued sleep...
  511. sleeping = 1
  512. }
  513. time.Sleep(peerGossipSleepDuration)
  514. continue OUTER_LOOP
  515. }
  516. }
  517. //-----------------------------------------------------------------------------
  518. // Read only when returned by PeerState.GetRoundState().
  519. type PeerRoundState struct {
  520. Height uint // Height peer is at
  521. Round uint // Round peer is at
  522. Step RoundStepType // Step peer is at
  523. StartTime time.Time // Estimated start of round 0 at this height
  524. Proposal bool // True if peer has proposal for this round
  525. ProposalBlockParts types.PartSetHeader //
  526. ProposalBlockBitArray *BitArray // True bit -> has part
  527. ProposalPOLParts types.PartSetHeader //
  528. ProposalPOLBitArray *BitArray // True bit -> has part
  529. Prevotes *BitArray // All votes peer has for this round
  530. Precommits *BitArray // All precommits peer has for this round
  531. Commits *BitArray // All commits peer has for this height
  532. LastCommits *BitArray // All commits peer has for last height
  533. HasAllCatchupCommits bool // Used for catch-up
  534. }
  535. //-----------------------------------------------------------------------------
  536. var (
  537. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  538. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  539. )
  540. type PeerState struct {
  541. Key string
  542. mtx sync.Mutex
  543. PeerRoundState
  544. }
  545. func NewPeerState(peer *p2p.Peer) *PeerState {
  546. return &PeerState{Key: peer.Key}
  547. }
  548. // Returns an atomic snapshot of the PeerRoundState.
  549. // There's no point in mutating it since it won't change PeerState.
  550. func (ps *PeerState) GetRoundState() *PeerRoundState {
  551. ps.mtx.Lock()
  552. defer ps.mtx.Unlock()
  553. prs := ps.PeerRoundState // copy
  554. return &prs
  555. }
  556. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  557. ps.mtx.Lock()
  558. defer ps.mtx.Unlock()
  559. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  560. return
  561. }
  562. if ps.Proposal {
  563. return
  564. }
  565. ps.Proposal = true
  566. ps.ProposalBlockParts = proposal.BlockParts
  567. ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
  568. ps.ProposalPOLParts = proposal.POLParts
  569. ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
  570. }
  571. func (ps *PeerState) SetHasProposalBlockPart(height uint, round uint, index uint) {
  572. ps.mtx.Lock()
  573. defer ps.mtx.Unlock()
  574. if ps.Height != height || ps.Round != round {
  575. return
  576. }
  577. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  578. }
  579. func (ps *PeerState) SetHasProposalPOLPart(height uint, round uint, index uint) {
  580. ps.mtx.Lock()
  581. defer ps.mtx.Unlock()
  582. if ps.Height != height || ps.Round != round {
  583. return
  584. }
  585. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  586. }
  587. func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) {
  588. ps.mtx.Lock()
  589. defer ps.mtx.Unlock()
  590. if ps.Height == height {
  591. if ps.Prevotes == nil {
  592. ps.Prevotes = NewBitArray(numValidators)
  593. }
  594. if ps.Precommits == nil {
  595. ps.Precommits = NewBitArray(numValidators)
  596. }
  597. if ps.Commits == nil {
  598. ps.Commits = NewBitArray(numValidators)
  599. }
  600. } else if ps.Height == height+1 {
  601. if ps.LastCommits == nil {
  602. ps.LastCommits = NewBitArray(numValidators)
  603. }
  604. }
  605. }
  606. func (ps *PeerState) SetHasVote(vote *types.Vote, index uint) {
  607. ps.mtx.Lock()
  608. defer ps.mtx.Unlock()
  609. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  610. }
  611. func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) {
  612. if ps.Height == height+1 && type_ == types.VoteTypeCommit {
  613. // Special case for LastCommits.
  614. ps.LastCommits.SetIndex(index, true)
  615. log.Debug("SetHasVote", "lastCommits", ps.LastCommits, "index", index)
  616. return
  617. } else if ps.Height != height {
  618. // Does not apply.
  619. return
  620. }
  621. switch type_ {
  622. case types.VoteTypePrevote:
  623. ps.Prevotes.SetIndex(index, true)
  624. log.Debug("SetHasVote", "peer", ps.Key, "prevotes", ps.Prevotes, "index", index)
  625. case types.VoteTypePrecommit:
  626. ps.Precommits.SetIndex(index, true)
  627. log.Debug("SetHasVote", "peer", ps.Key, "precommits", ps.Precommits, "index", index)
  628. case types.VoteTypeCommit:
  629. if round < ps.Round {
  630. ps.Prevotes.SetIndex(index, true)
  631. ps.Precommits.SetIndex(index, true)
  632. }
  633. ps.Commits.SetIndex(index, true)
  634. log.Debug("SetHasVote", "peer", ps.Key, "commits", ps.Commits, "index", index)
  635. default:
  636. panic("Invalid vote type")
  637. }
  638. }
  639. // When catching up, this helps keep track of whether
  640. // we should send more commit votes from the block (validation) store
  641. func (ps *PeerState) SetHasAllCatchupCommits(height uint) {
  642. ps.mtx.Lock()
  643. defer ps.mtx.Unlock()
  644. if ps.Height == height {
  645. ps.HasAllCatchupCommits = true
  646. }
  647. }
  648. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  649. ps.mtx.Lock()
  650. defer ps.mtx.Unlock()
  651. // Just remember these values.
  652. psHeight := ps.Height
  653. psRound := ps.Round
  654. //psStep := ps.Step
  655. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  656. ps.Height = msg.Height
  657. ps.Round = msg.Round
  658. ps.Step = msg.Step
  659. ps.StartTime = startTime
  660. if psHeight != msg.Height || psRound != msg.Round {
  661. ps.Proposal = false
  662. ps.ProposalBlockParts = types.PartSetHeader{}
  663. ps.ProposalBlockBitArray = nil
  664. ps.ProposalPOLParts = types.PartSetHeader{}
  665. ps.ProposalPOLBitArray = nil
  666. // We'll update the BitArray capacity later.
  667. ps.Prevotes = nil
  668. ps.Precommits = nil
  669. }
  670. if psHeight != msg.Height {
  671. // Shift Commits to LastCommits
  672. if psHeight+1 == msg.Height {
  673. ps.LastCommits = ps.Commits
  674. } else {
  675. ps.LastCommits = nil
  676. }
  677. // We'll update the BitArray capacity later.
  678. ps.Commits = nil
  679. ps.HasAllCatchupCommits = false
  680. }
  681. }
  682. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  683. ps.mtx.Lock()
  684. defer ps.mtx.Unlock()
  685. if ps.Height != msg.Height {
  686. return
  687. }
  688. ps.ProposalBlockParts = msg.BlockParts
  689. ps.ProposalBlockBitArray = msg.BlockBitArray
  690. }
  691. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  692. ps.mtx.Lock()
  693. defer ps.mtx.Unlock()
  694. // Special case for LastCommits
  695. if ps.Height == msg.Height+1 && msg.Type == types.VoteTypeCommit {
  696. ps.LastCommits.SetIndex(msg.Index, true)
  697. return
  698. } else if ps.Height != msg.Height {
  699. return
  700. }
  701. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  702. }
  703. //-----------------------------------------------------------------------------
  704. // Messages
  705. const (
  706. msgTypeNewRoundStep = byte(0x01)
  707. msgTypeCommitStep = byte(0x02)
  708. msgTypeProposal = byte(0x11)
  709. msgTypePart = byte(0x12) // both block & POL
  710. msgTypeVote = byte(0x13)
  711. msgTypeHasVote = byte(0x14)
  712. )
  713. type ConsensusMessage interface{}
  714. var _ = binary.RegisterInterface(
  715. struct{ ConsensusMessage }{},
  716. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  717. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  718. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  719. binary.ConcreteType{&PartMessage{}, msgTypePart},
  720. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  721. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  722. )
  723. // TODO: check for unnecessary extra bytes at the end.
  724. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  725. msgType = bz[0]
  726. n := new(int64)
  727. r := bytes.NewReader(bz)
  728. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  729. return
  730. }
  731. //-------------------------------------
  732. type NewRoundStepMessage struct {
  733. Height uint
  734. Round uint
  735. Step RoundStepType
  736. SecondsSinceStartTime uint
  737. }
  738. func (m *NewRoundStepMessage) String() string {
  739. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v]", m.Height, m.Round, m.Step)
  740. }
  741. //-------------------------------------
  742. type CommitStepMessage struct {
  743. Height uint
  744. BlockParts types.PartSetHeader
  745. BlockBitArray *BitArray
  746. }
  747. func (m *CommitStepMessage) String() string {
  748. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockParts, m.BlockBitArray)
  749. }
  750. //-------------------------------------
  751. type ProposalMessage struct {
  752. Proposal *Proposal
  753. }
  754. func (m *ProposalMessage) String() string {
  755. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  756. }
  757. //-------------------------------------
  758. const (
  759. partTypeProposalBlock = byte(0x01)
  760. partTypeProposalPOL = byte(0x02)
  761. )
  762. type PartMessage struct {
  763. Height uint
  764. Round uint
  765. Type byte
  766. Part *types.Part
  767. }
  768. func (m *PartMessage) String() string {
  769. return fmt.Sprintf("[Part H:%v R:%v T:%X P:%v]", m.Height, m.Round, m.Type, m.Part)
  770. }
  771. //-------------------------------------
  772. type VoteMessage struct {
  773. ValidatorIndex uint
  774. Vote *types.Vote
  775. }
  776. func (m *VoteMessage) String() string {
  777. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  778. }
  779. //-------------------------------------
  780. type HasVoteMessage struct {
  781. Height uint
  782. Round uint
  783. Type byte
  784. Index uint
  785. }
  786. func (m *HasVoteMessage) String() string {
  787. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  788. }