You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

914 lines
26 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. PeerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. )
  26. //-----------------------------------------------------------------------------
  27. // The reactor's underlying ConsensusState may change state at any time.
  28. // We atomically copy the RoundState struct before using it.
  29. type ConsensusReactor struct {
  30. sw *p2p.Switch
  31. running uint32
  32. quit chan struct{}
  33. blockStore *bc.BlockStore
  34. conS *ConsensusState
  35. // if fast sync is running we don't really do anything
  36. syncing bool
  37. evsw events.Fireable
  38. }
  39. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor {
  40. conR := &ConsensusReactor{
  41. blockStore: blockStore,
  42. quit: make(chan struct{}),
  43. conS: consensusState,
  44. }
  45. return conR
  46. }
  47. // Implements Reactor
  48. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  49. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  50. log.Info("Starting ConsensusReactor")
  51. conR.sw = sw
  52. conR.conS.Start()
  53. go conR.broadcastNewRoundStepRoutine()
  54. }
  55. }
  56. // Implements Reactor
  57. func (conR *ConsensusReactor) Stop() {
  58. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  59. log.Info("Stopping ConsensusReactor")
  60. conR.conS.Stop()
  61. close(conR.quit)
  62. }
  63. }
  64. func (conR *ConsensusReactor) IsRunning() bool {
  65. return atomic.LoadUint32(&conR.running) == 1
  66. }
  67. // Implements Reactor
  68. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  69. // TODO optimize
  70. return []*p2p.ChannelDescriptor{
  71. &p2p.ChannelDescriptor{
  72. Id: StateChannel,
  73. Priority: 5,
  74. SendQueueCapacity: 100,
  75. },
  76. &p2p.ChannelDescriptor{
  77. Id: DataChannel,
  78. Priority: 5,
  79. SendQueueCapacity: 2,
  80. },
  81. &p2p.ChannelDescriptor{
  82. Id: VoteChannel,
  83. Priority: 5,
  84. SendQueueCapacity: 40,
  85. },
  86. }
  87. }
  88. // Implements Reactor
  89. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  90. if !conR.IsRunning() {
  91. return
  92. }
  93. // Create peerState for peer
  94. peerState := NewPeerState(peer)
  95. peer.Data.Set(PeerStateKey, peerState)
  96. // Begin gossip routines for this peer.
  97. go conR.gossipDataRoutine(peer, peerState)
  98. go conR.gossipVotesRoutine(peer, peerState)
  99. // Send our state to peer.
  100. conR.sendNewRoundStep(peer)
  101. }
  102. // Implements Reactor
  103. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  104. if !conR.IsRunning() {
  105. return
  106. }
  107. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  108. }
  109. // Implements Reactor
  110. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  111. if conR.syncing || !conR.IsRunning() {
  112. return
  113. }
  114. // Get round state
  115. rs := conR.conS.GetRoundState()
  116. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  117. _, msg_, err := DecodeMessage(msgBytes)
  118. if err != nil {
  119. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  120. return
  121. }
  122. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_) //, "bytes", msgBytes)
  123. switch chId {
  124. case StateChannel:
  125. switch msg := msg_.(type) {
  126. case *NewRoundStepMessage:
  127. ps.ApplyNewRoundStepMessage(msg, rs)
  128. case *CommitStepMessage:
  129. ps.ApplyCommitStepMessage(msg)
  130. case *HasVoteMessage:
  131. ps.ApplyHasVoteMessage(msg)
  132. default:
  133. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  134. }
  135. case DataChannel:
  136. switch msg := msg_.(type) {
  137. case *ProposalMessage:
  138. ps.SetHasProposal(msg.Proposal)
  139. err = conR.conS.SetProposal(msg.Proposal)
  140. case *PartMessage:
  141. if msg.Type == partTypeProposalBlock {
  142. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  143. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  144. } else if msg.Type == partTypeProposalPOL {
  145. ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
  146. _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
  147. } else {
  148. log.Warn(Fmt("Unknown part type %v", msg.Type))
  149. }
  150. default:
  151. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  152. }
  153. case VoteChannel:
  154. switch msg := msg_.(type) {
  155. case *VoteMessage:
  156. vote := msg.Vote
  157. if rs.Height != vote.Height {
  158. if rs.Height == vote.Height+1 {
  159. if rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypeCommit {
  160. goto VOTE_PASS // *ducks*
  161. }
  162. }
  163. return // Wrong height. Not necessarily a bad peer.
  164. }
  165. VOTE_PASS:
  166. validatorIndex := msg.ValidatorIndex
  167. address, _ := rs.Validators.GetByIndex(validatorIndex)
  168. added, index, err := conR.conS.AddVote(address, vote)
  169. if err != nil {
  170. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  171. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  172. log.Warn("Found conflicting vote. Publish evidence")
  173. evidenceTx := &types.DupeoutTx{
  174. Address: address,
  175. VoteA: *errDupe.VoteA,
  176. VoteB: *errDupe.VoteB,
  177. }
  178. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  179. } else {
  180. // Probably an invalid signature. Bad peer.
  181. log.Warn("Error attempting to add vote", "error", err)
  182. // TODO: punish peer
  183. }
  184. }
  185. // Initialize Prevotes/Precommits/Commits if needed
  186. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  187. ps.SetHasVote(vote, index)
  188. if added {
  189. msg := &HasVoteMessage{
  190. Height: vote.Height,
  191. Round: vote.Round,
  192. Type: vote.Type,
  193. Index: index,
  194. }
  195. conR.sw.Broadcast(StateChannel, msg)
  196. }
  197. default:
  198. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  199. }
  200. default:
  201. log.Warn(Fmt("Unknown channel %X", chId))
  202. }
  203. if err != nil {
  204. log.Warn("Error in Receive()", "error", err)
  205. }
  206. }
  207. // Sets whether or not we're using the blockchain reactor for syncing
  208. func (conR *ConsensusReactor) SetSyncing(syncing bool) {
  209. conR.syncing = syncing
  210. }
  211. // Sets our private validator account for signing votes.
  212. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  213. conR.conS.SetPrivValidator(priv)
  214. }
  215. // Reset to some state.
  216. func (conR *ConsensusReactor) ResetToState(state *sm.State) {
  217. conR.conS.updateToState(state, false)
  218. conR.conS.newStepCh <- conR.conS.getRoundState()
  219. }
  220. // implements events.Eventable
  221. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  222. conR.evsw = evsw
  223. conR.conS.SetFireable(evsw)
  224. }
  225. //--------------------------------------
  226. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  227. // Get seconds since beginning of height.
  228. timeElapsed := time.Now().Sub(rs.StartTime)
  229. // Broadcast NewRoundStepMessage
  230. nrsMsg = &NewRoundStepMessage{
  231. Height: rs.Height,
  232. Round: rs.Round,
  233. Step: rs.Step,
  234. SecondsSinceStartTime: uint(timeElapsed.Seconds()),
  235. }
  236. // If the step is commit, then also broadcast a CommitStepMessage.
  237. if rs.Step == RoundStepCommit {
  238. csMsg = &CommitStepMessage{
  239. Height: rs.Height,
  240. BlockParts: rs.ProposalBlockParts.Header(),
  241. BlockBitArray: rs.ProposalBlockParts.BitArray(),
  242. }
  243. }
  244. return
  245. }
  246. // Listens for changes to the ConsensusState.Step by pulling
  247. // on conR.conS.NewStepCh().
  248. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  249. for {
  250. // Get RoundState with new Step or quit.
  251. var rs *RoundState
  252. select {
  253. case rs = <-conR.conS.NewStepCh():
  254. case <-conR.quit:
  255. return
  256. }
  257. nrsMsg, csMsg := makeRoundStepMessages(rs)
  258. if nrsMsg != nil {
  259. conR.sw.Broadcast(StateChannel, nrsMsg)
  260. }
  261. if csMsg != nil {
  262. conR.sw.Broadcast(StateChannel, csMsg)
  263. }
  264. }
  265. }
  266. func (conR *ConsensusReactor) sendNewRoundStep(peer *p2p.Peer) {
  267. rs := conR.conS.GetRoundState()
  268. nrsMsg, csMsg := makeRoundStepMessages(rs)
  269. if nrsMsg != nil {
  270. peer.Send(StateChannel, nrsMsg)
  271. }
  272. if csMsg != nil {
  273. peer.Send(StateChannel, csMsg)
  274. }
  275. }
  276. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  277. OUTER_LOOP:
  278. for {
  279. // Manage disconnects from self or peer.
  280. if !peer.IsRunning() || !conR.IsRunning() {
  281. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  282. return
  283. }
  284. rs := conR.conS.GetRoundState()
  285. prs := ps.GetRoundState()
  286. // Send proposal Block parts?
  287. // NOTE: if we or peer is at RoundStepCommit*, the round
  288. // won't necessarily match, but that's OK.
  289. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
  290. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  291. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
  292. part := rs.ProposalBlockParts.GetPart(index)
  293. msg := &PartMessage{
  294. Height: rs.Height,
  295. Round: rs.Round,
  296. Type: partTypeProposalBlock,
  297. Part: part,
  298. }
  299. peer.Send(DataChannel, msg)
  300. ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
  301. continue OUTER_LOOP
  302. }
  303. }
  304. // If the peer is on a previous height, help catch up.
  305. if 0 < prs.Height && prs.Height < rs.Height {
  306. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
  307. if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
  308. // Ensure that the peer's PartSetHeader is correct
  309. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  310. if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
  311. log.Debug("Peer ProposalBlockParts mismatch, sleeping",
  312. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  313. time.Sleep(peerGossipSleepDuration)
  314. continue OUTER_LOOP
  315. }
  316. // Load the part
  317. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  318. if part == nil {
  319. log.Warn("Could not load part", "index", index,
  320. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  321. time.Sleep(peerGossipSleepDuration)
  322. continue OUTER_LOOP
  323. }
  324. // Send the part
  325. msg := &PartMessage{
  326. Height: prs.Height,
  327. Round: prs.Round,
  328. Type: partTypeProposalBlock,
  329. Part: part,
  330. }
  331. peer.Send(DataChannel, msg)
  332. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  333. continue OUTER_LOOP
  334. } else {
  335. //log.Debug("No parts to send in catch-up, sleeping")
  336. time.Sleep(peerGossipSleepDuration)
  337. continue OUTER_LOOP
  338. }
  339. }
  340. // If height and round don't match, sleep.
  341. if rs.Height != prs.Height || rs.Round != prs.Round {
  342. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  343. time.Sleep(peerGossipSleepDuration)
  344. continue OUTER_LOOP
  345. }
  346. // Send proposal?
  347. if rs.Proposal != nil && !prs.Proposal {
  348. msg := &ProposalMessage{Proposal: rs.Proposal}
  349. peer.Send(DataChannel, msg)
  350. ps.SetHasProposal(rs.Proposal)
  351. continue OUTER_LOOP
  352. }
  353. // Send proposal POL parts?
  354. if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) {
  355. if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok {
  356. msg := &PartMessage{
  357. Height: rs.Height,
  358. Round: rs.Round,
  359. Type: partTypeProposalPOL,
  360. Part: rs.ProposalPOLParts.GetPart(index),
  361. }
  362. peer.Send(DataChannel, msg)
  363. ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
  364. continue OUTER_LOOP
  365. }
  366. }
  367. // Nothing to do. Sleep.
  368. time.Sleep(peerGossipSleepDuration)
  369. continue OUTER_LOOP
  370. }
  371. }
  372. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  373. // Simple hack to throttle logs upon sleep.
  374. var sleeping = 0
  375. OUTER_LOOP:
  376. for {
  377. // Manage disconnects from self or peer.
  378. if !peer.IsRunning() || !conR.IsRunning() {
  379. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  380. return
  381. }
  382. rs := conR.conS.GetRoundState()
  383. prs := ps.GetRoundState()
  384. switch sleeping {
  385. case 1: // First sleep
  386. sleeping = 2
  387. case 2: // No more sleep
  388. sleeping = 0
  389. }
  390. // Returns true when useful work was done.
  391. trySendVote := func(height uint, voteSet *VoteSet, peerVoteSet *BitArray) (sent bool) {
  392. if voteSet == nil {
  393. return false
  394. } else if peerVoteSet == nil {
  395. ps.EnsureVoteBitArrays(height, voteSet.Size())
  396. return true
  397. }
  398. // TODO: give priority to our vote.
  399. if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok {
  400. vote := voteSet.GetByIndex(index)
  401. // NOTE: vote may be a commit.
  402. msg := &VoteMessage{index, vote}
  403. peer.Send(VoteChannel, msg)
  404. ps.SetHasVote(vote, index)
  405. return true
  406. }
  407. return false
  408. }
  409. // Returns true when useful work was done.
  410. trySendCommitFromValidation := func(blockMeta *types.BlockMeta, validation *types.Validation, peerVoteSet *BitArray) (sent bool) {
  411. if validation == nil {
  412. return false
  413. } else if peerVoteSet == nil {
  414. ps.EnsureVoteBitArrays(blockMeta.Header.Height, uint(len(validation.Commits)))
  415. return true
  416. }
  417. if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
  418. commit := validation.Commits[index]
  419. log.Debug("Picked commit to send", "index", index, "commit", commit)
  420. // Reconstruct vote.
  421. vote := &types.Vote{
  422. Height: prs.Height,
  423. Round: commit.Round,
  424. Type: types.VoteTypeCommit,
  425. BlockHash: blockMeta.Hash,
  426. BlockParts: blockMeta.Parts,
  427. Signature: commit.Signature,
  428. }
  429. msg := &VoteMessage{index, vote}
  430. peer.Send(VoteChannel, msg)
  431. ps.SetHasVote(vote, index)
  432. return true
  433. }
  434. return false
  435. }
  436. // If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
  437. if rs.Height == prs.Height {
  438. // If there are lastcommits to send...
  439. if prs.Round == 0 && prs.Step == RoundStepNewHeight {
  440. if trySendVote(rs.Height-1, rs.LastCommits, prs.LastCommits) {
  441. continue OUTER_LOOP
  442. }
  443. }
  444. // If there are prevotes to send...
  445. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  446. if trySendVote(rs.Height, rs.Prevotes, prs.Prevotes) {
  447. continue OUTER_LOOP
  448. }
  449. }
  450. // If there are precommits to send...
  451. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  452. if trySendVote(rs.Height, rs.Precommits, prs.Precommits) {
  453. continue OUTER_LOOP
  454. }
  455. }
  456. // If there are any commits to send...
  457. if trySendVote(rs.Height, rs.Commits, prs.Commits) {
  458. continue OUTER_LOOP
  459. }
  460. }
  461. // Catchup logic
  462. if prs.Height != 0 && !prs.HasAllCatchupCommits {
  463. // If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits.
  464. if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 {
  465. // If there are lastcommits to send...
  466. if trySendVote(prs.Height, rs.LastCommits, prs.Commits) {
  467. continue OUTER_LOOP
  468. } else {
  469. ps.SetHasAllCatchupCommits(prs.Height)
  470. }
  471. }
  472. // Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation.
  473. if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 {
  474. // Load the blockMeta for block at prs.Height
  475. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  476. // Load the seen validation for prs.Height
  477. validation := conR.blockStore.LoadSeenValidation(prs.Height)
  478. log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation)
  479. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  480. continue OUTER_LOOP
  481. } else {
  482. ps.SetHasAllCatchupCommits(prs.Height)
  483. }
  484. }
  485. // If peer is lagging by more than 1, send Validation.
  486. if rs.Height >= prs.Height+2 {
  487. // Load the blockMeta for block at prs.Height
  488. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  489. // Load the block validation for prs.Height+1,
  490. // which contains commit signatures for prs.Height.
  491. validation := conR.blockStore.LoadBlockValidation(prs.Height + 1)
  492. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height+1, "blockMeta", blockMeta, "validation", validation)
  493. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  494. continue OUTER_LOOP
  495. } else {
  496. ps.SetHasAllCatchupCommits(prs.Height)
  497. }
  498. }
  499. }
  500. if sleeping == 0 {
  501. // We sent nothing. Sleep...
  502. sleeping = 1
  503. log.Debug("No votes to send, sleeping", "peer", peer,
  504. "localPV", rs.Prevotes.BitArray(), "peerPV", prs.Prevotes,
  505. "localPC", rs.Precommits.BitArray(), "peerPC", prs.Precommits,
  506. "localCM", rs.Commits.BitArray(), "peerCM", prs.Commits)
  507. } else if sleeping == 2 {
  508. // Continued sleep...
  509. sleeping = 1
  510. }
  511. time.Sleep(peerGossipSleepDuration)
  512. continue OUTER_LOOP
  513. }
  514. }
  515. //-----------------------------------------------------------------------------
  516. // Read only when returned by PeerState.GetRoundState().
  517. type PeerRoundState struct {
  518. Height uint // Height peer is at
  519. Round uint // Round peer is at
  520. Step RoundStepType // Step peer is at
  521. StartTime time.Time // Estimated start of round 0 at this height
  522. Proposal bool // True if peer has proposal for this round
  523. ProposalBlockParts types.PartSetHeader //
  524. ProposalBlockBitArray *BitArray // True bit -> has part
  525. ProposalPOLParts types.PartSetHeader //
  526. ProposalPOLBitArray *BitArray // True bit -> has part
  527. Prevotes *BitArray // All votes peer has for this round
  528. Precommits *BitArray // All precommits peer has for this round
  529. Commits *BitArray // All commits peer has for this height
  530. LastCommits *BitArray // All commits peer has for last height
  531. HasAllCatchupCommits bool // Used for catch-up
  532. }
  533. //-----------------------------------------------------------------------------
  534. var (
  535. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  536. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  537. )
  538. type PeerState struct {
  539. Key string
  540. mtx sync.Mutex
  541. PeerRoundState
  542. }
  543. func NewPeerState(peer *p2p.Peer) *PeerState {
  544. return &PeerState{Key: peer.Key}
  545. }
  546. // Returns an atomic snapshot of the PeerRoundState.
  547. // There's no point in mutating it since it won't change PeerState.
  548. func (ps *PeerState) GetRoundState() *PeerRoundState {
  549. ps.mtx.Lock()
  550. defer ps.mtx.Unlock()
  551. prs := ps.PeerRoundState // copy
  552. return &prs
  553. }
  554. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  555. ps.mtx.Lock()
  556. defer ps.mtx.Unlock()
  557. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  558. return
  559. }
  560. if ps.Proposal {
  561. return
  562. }
  563. ps.Proposal = true
  564. ps.ProposalBlockParts = proposal.BlockParts
  565. ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
  566. ps.ProposalPOLParts = proposal.POLParts
  567. ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
  568. }
  569. func (ps *PeerState) SetHasProposalBlockPart(height uint, round uint, index uint) {
  570. ps.mtx.Lock()
  571. defer ps.mtx.Unlock()
  572. if ps.Height != height || ps.Round != round {
  573. return
  574. }
  575. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  576. }
  577. func (ps *PeerState) SetHasProposalPOLPart(height uint, round uint, index uint) {
  578. ps.mtx.Lock()
  579. defer ps.mtx.Unlock()
  580. if ps.Height != height || ps.Round != round {
  581. return
  582. }
  583. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  584. }
  585. func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) {
  586. ps.mtx.Lock()
  587. defer ps.mtx.Unlock()
  588. if ps.Height == height {
  589. if ps.Prevotes == nil {
  590. ps.Prevotes = NewBitArray(numValidators)
  591. }
  592. if ps.Precommits == nil {
  593. ps.Precommits = NewBitArray(numValidators)
  594. }
  595. if ps.Commits == nil {
  596. ps.Commits = NewBitArray(numValidators)
  597. }
  598. } else if ps.Height == height+1 {
  599. if ps.LastCommits == nil {
  600. ps.LastCommits = NewBitArray(numValidators)
  601. }
  602. }
  603. }
  604. func (ps *PeerState) SetHasVote(vote *types.Vote, index uint) {
  605. ps.mtx.Lock()
  606. defer ps.mtx.Unlock()
  607. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  608. }
  609. func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) {
  610. if ps.Height == height+1 && type_ == types.VoteTypeCommit {
  611. // Special case for LastCommits.
  612. ps.LastCommits.SetIndex(index, true)
  613. log.Debug("SetHasVote", "lastCommits", ps.LastCommits, "index", index)
  614. return
  615. } else if ps.Height != height {
  616. // Does not apply.
  617. return
  618. }
  619. switch type_ {
  620. case types.VoteTypePrevote:
  621. ps.Prevotes.SetIndex(index, true)
  622. log.Debug("SetHasVote", "peer", ps.Key, "prevotes", ps.Prevotes, "index", index)
  623. case types.VoteTypePrecommit:
  624. ps.Precommits.SetIndex(index, true)
  625. log.Debug("SetHasVote", "peer", ps.Key, "precommits", ps.Precommits, "index", index)
  626. case types.VoteTypeCommit:
  627. if round < ps.Round {
  628. ps.Prevotes.SetIndex(index, true)
  629. ps.Precommits.SetIndex(index, true)
  630. }
  631. ps.Commits.SetIndex(index, true)
  632. log.Debug("SetHasVote", "peer", ps.Key, "commits", ps.Commits, "index", index)
  633. default:
  634. panic("Invalid vote type")
  635. }
  636. }
  637. // When catching up, this helps keep track of whether
  638. // we should send more commit votes from the block (validation) store
  639. func (ps *PeerState) SetHasAllCatchupCommits(height uint) {
  640. ps.mtx.Lock()
  641. defer ps.mtx.Unlock()
  642. if ps.Height == height {
  643. ps.HasAllCatchupCommits = true
  644. }
  645. }
  646. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  647. ps.mtx.Lock()
  648. defer ps.mtx.Unlock()
  649. // Just remember these values.
  650. psHeight := ps.Height
  651. psRound := ps.Round
  652. //psStep := ps.Step
  653. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  654. ps.Height = msg.Height
  655. ps.Round = msg.Round
  656. ps.Step = msg.Step
  657. ps.StartTime = startTime
  658. if psHeight != msg.Height || psRound != msg.Round {
  659. ps.Proposal = false
  660. ps.ProposalBlockParts = types.PartSetHeader{}
  661. ps.ProposalBlockBitArray = nil
  662. ps.ProposalPOLParts = types.PartSetHeader{}
  663. ps.ProposalPOLBitArray = nil
  664. // We'll update the BitArray capacity later.
  665. ps.Prevotes = nil
  666. ps.Precommits = nil
  667. }
  668. if psHeight != msg.Height {
  669. // Shift Commits to LastCommits
  670. if psHeight+1 == msg.Height {
  671. ps.LastCommits = ps.Commits
  672. } else {
  673. ps.LastCommits = nil
  674. }
  675. // We'll update the BitArray capacity later.
  676. ps.Commits = nil
  677. ps.HasAllCatchupCommits = false
  678. }
  679. }
  680. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  681. ps.mtx.Lock()
  682. defer ps.mtx.Unlock()
  683. if ps.Height != msg.Height {
  684. return
  685. }
  686. ps.ProposalBlockParts = msg.BlockParts
  687. ps.ProposalBlockBitArray = msg.BlockBitArray
  688. }
  689. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  690. ps.mtx.Lock()
  691. defer ps.mtx.Unlock()
  692. // Special case for LastCommits
  693. if ps.Height == msg.Height+1 && msg.Type == types.VoteTypeCommit {
  694. ps.LastCommits.SetIndex(msg.Index, true)
  695. return
  696. } else if ps.Height != msg.Height {
  697. return
  698. }
  699. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  700. }
  701. //-----------------------------------------------------------------------------
  702. // Messages
  703. const (
  704. msgTypeNewRoundStep = byte(0x01)
  705. msgTypeCommitStep = byte(0x02)
  706. msgTypeProposal = byte(0x11)
  707. msgTypePart = byte(0x12) // both block & POL
  708. msgTypeVote = byte(0x13)
  709. msgTypeHasVote = byte(0x14)
  710. )
  711. type ConsensusMessage interface{}
  712. var _ = binary.RegisterInterface(
  713. struct{ ConsensusMessage }{},
  714. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  715. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  716. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  717. binary.ConcreteType{&PartMessage{}, msgTypePart},
  718. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  719. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  720. )
  721. // TODO: check for unnecessary extra bytes at the end.
  722. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  723. msgType = bz[0]
  724. n := new(int64)
  725. r := bytes.NewReader(bz)
  726. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  727. return
  728. }
  729. //-------------------------------------
  730. type NewRoundStepMessage struct {
  731. Height uint
  732. Round uint
  733. Step RoundStepType
  734. SecondsSinceStartTime uint
  735. }
  736. func (m *NewRoundStepMessage) String() string {
  737. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v]", m.Height, m.Round, m.Step)
  738. }
  739. //-------------------------------------
  740. type CommitStepMessage struct {
  741. Height uint
  742. BlockParts types.PartSetHeader
  743. BlockBitArray *BitArray
  744. }
  745. func (m *CommitStepMessage) String() string {
  746. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockParts, m.BlockBitArray)
  747. }
  748. //-------------------------------------
  749. type ProposalMessage struct {
  750. Proposal *Proposal
  751. }
  752. func (m *ProposalMessage) String() string {
  753. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  754. }
  755. //-------------------------------------
  756. const (
  757. partTypeProposalBlock = byte(0x01)
  758. partTypeProposalPOL = byte(0x02)
  759. )
  760. type PartMessage struct {
  761. Height uint
  762. Round uint
  763. Type byte
  764. Part *types.Part
  765. }
  766. func (m *PartMessage) String() string {
  767. return fmt.Sprintf("[Part H:%v R:%v T:%X P:%v]", m.Height, m.Round, m.Type, m.Part)
  768. }
  769. //-------------------------------------
  770. type VoteMessage struct {
  771. ValidatorIndex uint
  772. Vote *types.Vote
  773. }
  774. func (m *VoteMessage) String() string {
  775. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  776. }
  777. //-------------------------------------
  778. type HasVoteMessage struct {
  779. Height uint
  780. Round uint
  781. Type byte
  782. Index uint
  783. }
  784. func (m *HasVoteMessage) String() string {
  785. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  786. }