You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

876 lines
24 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. PeerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. )
  26. //-----------------------------------------------------------------------------
  27. // The reactor's underlying ConsensusState may change state at any time.
  28. // We atomically copy the RoundState struct before using it.
  29. type ConsensusReactor struct {
  30. sw *p2p.Switch
  31. running uint32
  32. quit chan struct{}
  33. blockStore *bc.BlockStore
  34. conS *ConsensusState
  35. // if fast sync is running we don't really do anything
  36. syncing bool
  37. evsw events.Fireable
  38. }
  39. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor {
  40. conR := &ConsensusReactor{
  41. blockStore: blockStore,
  42. quit: make(chan struct{}),
  43. conS: consensusState,
  44. }
  45. return conR
  46. }
  47. // Implements Reactor
  48. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  49. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  50. log.Info("Starting ConsensusReactor")
  51. conR.sw = sw
  52. conR.conS.Start()
  53. go conR.broadcastNewRoundStepRoutine()
  54. }
  55. }
  56. // Implements Reactor
  57. func (conR *ConsensusReactor) Stop() {
  58. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  59. log.Info("Stopping ConsensusReactor")
  60. conR.conS.Stop()
  61. close(conR.quit)
  62. }
  63. }
  64. func (conR *ConsensusReactor) IsRunning() bool {
  65. return atomic.LoadUint32(&conR.running) == 1
  66. }
  67. // Implements Reactor
  68. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  69. // TODO optimize
  70. return []*p2p.ChannelDescriptor{
  71. &p2p.ChannelDescriptor{
  72. Id: StateChannel,
  73. Priority: 5,
  74. },
  75. &p2p.ChannelDescriptor{
  76. Id: DataChannel,
  77. Priority: 5,
  78. },
  79. &p2p.ChannelDescriptor{
  80. Id: VoteChannel,
  81. Priority: 5,
  82. },
  83. }
  84. }
  85. // Implements Reactor
  86. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  87. if !conR.IsRunning() {
  88. return
  89. }
  90. // Create peerState for peer
  91. peerState := NewPeerState(peer)
  92. peer.Data.Set(PeerStateKey, peerState)
  93. // Begin gossip routines for this peer.
  94. go conR.gossipDataRoutine(peer, peerState)
  95. go conR.gossipVotesRoutine(peer, peerState)
  96. // Send our state to peer.
  97. conR.sendNewRoundStep(peer)
  98. }
  99. // Implements Reactor
  100. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  101. if !conR.IsRunning() {
  102. return
  103. }
  104. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  105. }
  106. // Implements Reactor
  107. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  108. if conR.syncing || !conR.IsRunning() {
  109. return
  110. }
  111. // Get round state
  112. rs := conR.conS.GetRoundState()
  113. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  114. _, msg_, err := DecodeMessage(msgBytes)
  115. if err != nil {
  116. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  117. return
  118. }
  119. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_) //, "bytes", msgBytes)
  120. switch chId {
  121. case StateChannel:
  122. switch msg := msg_.(type) {
  123. case *NewRoundStepMessage:
  124. ps.ApplyNewRoundStepMessage(msg, rs)
  125. case *CommitStepMessage:
  126. ps.ApplyCommitStepMessage(msg)
  127. case *HasVoteMessage:
  128. ps.ApplyHasVoteMessage(msg)
  129. default:
  130. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  131. }
  132. case DataChannel:
  133. switch msg := msg_.(type) {
  134. case *ProposalMessage:
  135. ps.SetHasProposal(msg.Proposal)
  136. err = conR.conS.SetProposal(msg.Proposal)
  137. case *PartMessage:
  138. if msg.Type == partTypeProposalBlock {
  139. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  140. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  141. } else if msg.Type == partTypeProposalPOL {
  142. ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
  143. _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
  144. } else {
  145. log.Warn(Fmt("Unknown part type %v", msg.Type))
  146. }
  147. default:
  148. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  149. }
  150. case VoteChannel:
  151. switch msg := msg_.(type) {
  152. case *VoteMessage:
  153. vote := msg.Vote
  154. if rs.Height != vote.Height {
  155. if rs.Height == vote.Height+1 {
  156. if rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypeCommit {
  157. goto VOTE_PASS // *ducks*
  158. }
  159. }
  160. return // Wrong height. Not necessarily a bad peer.
  161. }
  162. VOTE_PASS:
  163. validatorIndex := msg.ValidatorIndex
  164. address, _ := rs.Validators.GetByIndex(validatorIndex)
  165. added, index, err := conR.conS.AddVote(address, vote)
  166. if err != nil {
  167. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  168. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  169. log.Warn("Found conflicting vote. Publish evidence")
  170. evidenceTx := &types.DupeoutTx{
  171. Address: address,
  172. VoteA: *errDupe.VoteA,
  173. VoteB: *errDupe.VoteB,
  174. }
  175. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  176. } else {
  177. // Probably an invalid signature. Bad peer.
  178. log.Warn("Error attempting to add vote", "error", err)
  179. // TODO: punish peer
  180. }
  181. }
  182. // Initialize Prevotes/Precommits/Commits if needed
  183. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  184. ps.SetHasVote(vote, index)
  185. if added {
  186. msg := &HasVoteMessage{
  187. Height: vote.Height,
  188. Round: vote.Round,
  189. Type: vote.Type,
  190. Index: index,
  191. }
  192. conR.sw.Broadcast(StateChannel, msg)
  193. }
  194. default:
  195. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  196. }
  197. default:
  198. log.Warn(Fmt("Unknown channel %X", chId))
  199. }
  200. if err != nil {
  201. log.Warn("Error in Receive()", "error", err)
  202. }
  203. }
  204. // Sets whether or not we're using the blockchain reactor for syncing
  205. func (conR *ConsensusReactor) SetSyncing(syncing bool) {
  206. conR.syncing = syncing
  207. }
  208. // Sets our private validator account for signing votes.
  209. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  210. conR.conS.SetPrivValidator(priv)
  211. }
  212. // Reset to some state.
  213. func (conR *ConsensusReactor) ResetToState(state *sm.State) {
  214. conR.conS.updateToState(state, false)
  215. }
  216. // implements events.Eventable
  217. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  218. conR.evsw = evsw
  219. conR.conS.SetFireable(evsw)
  220. }
  221. //--------------------------------------
  222. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  223. // Get seconds since beginning of height.
  224. timeElapsed := time.Now().Sub(rs.StartTime)
  225. // Broadcast NewRoundStepMessage
  226. nrsMsg = &NewRoundStepMessage{
  227. Height: rs.Height,
  228. Round: rs.Round,
  229. Step: rs.Step,
  230. SecondsSinceStartTime: uint(timeElapsed.Seconds()),
  231. }
  232. // If the step is commit, then also broadcast a CommitStepMessage.
  233. if rs.Step == RoundStepCommit {
  234. csMsg = &CommitStepMessage{
  235. Height: rs.Height,
  236. BlockParts: rs.ProposalBlockParts.Header(),
  237. BlockBitArray: rs.ProposalBlockParts.BitArray(),
  238. }
  239. }
  240. return
  241. }
  242. // Listens for changes to the ConsensusState.Step by pulling
  243. // on conR.conS.NewStepCh().
  244. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  245. for {
  246. // Get RoundState with new Step or quit.
  247. var rs *RoundState
  248. select {
  249. case rs = <-conR.conS.NewStepCh():
  250. case <-conR.quit:
  251. return
  252. }
  253. nrsMsg, csMsg := makeRoundStepMessages(rs)
  254. if nrsMsg != nil {
  255. conR.sw.Broadcast(StateChannel, nrsMsg)
  256. }
  257. if csMsg != nil {
  258. conR.sw.Broadcast(StateChannel, csMsg)
  259. }
  260. }
  261. }
  262. func (conR *ConsensusReactor) sendNewRoundStep(peer *p2p.Peer) {
  263. rs := conR.conS.GetRoundState()
  264. nrsMsg, csMsg := makeRoundStepMessages(rs)
  265. if nrsMsg != nil {
  266. peer.Send(StateChannel, nrsMsg)
  267. }
  268. if csMsg != nil {
  269. peer.Send(StateChannel, csMsg)
  270. }
  271. }
  272. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  273. OUTER_LOOP:
  274. for {
  275. // Manage disconnects from self or peer.
  276. if !peer.IsRunning() || !conR.IsRunning() {
  277. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  278. return
  279. }
  280. rs := conR.conS.GetRoundState()
  281. prs := ps.GetRoundState()
  282. // Send proposal Block parts?
  283. // NOTE: if we or peer is at RoundStepCommit*, the round
  284. // won't necessarily match, but that's OK.
  285. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
  286. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  287. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
  288. part := rs.ProposalBlockParts.GetPart(index)
  289. msg := &PartMessage{
  290. Height: rs.Height,
  291. Round: rs.Round,
  292. Type: partTypeProposalBlock,
  293. Part: part,
  294. }
  295. peer.Send(DataChannel, msg)
  296. ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
  297. continue OUTER_LOOP
  298. }
  299. }
  300. // If the peer is on a previous height, help catch up.
  301. if 0 < prs.Height && prs.Height < rs.Height {
  302. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
  303. if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
  304. // Ensure that the peer's PartSetHeader is correct
  305. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  306. if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
  307. log.Debug("Peer ProposalBlockParts mismatch, sleeping",
  308. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  309. time.Sleep(peerGossipSleepDuration)
  310. continue OUTER_LOOP
  311. }
  312. // Load the part
  313. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  314. if part == nil {
  315. log.Warn("Could not load part", "index", index,
  316. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  317. time.Sleep(peerGossipSleepDuration)
  318. continue OUTER_LOOP
  319. }
  320. // Send the part
  321. msg := &PartMessage{
  322. Height: prs.Height,
  323. Round: prs.Round,
  324. Type: partTypeProposalBlock,
  325. Part: part,
  326. }
  327. peer.Send(DataChannel, msg)
  328. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  329. continue OUTER_LOOP
  330. } else {
  331. //log.Debug("No parts to send in catch-up, sleeping")
  332. time.Sleep(peerGossipSleepDuration)
  333. continue OUTER_LOOP
  334. }
  335. }
  336. // If height and round don't match, sleep.
  337. if rs.Height != prs.Height || rs.Round != prs.Round {
  338. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  339. time.Sleep(peerGossipSleepDuration)
  340. continue OUTER_LOOP
  341. }
  342. // Send proposal?
  343. if rs.Proposal != nil && !prs.Proposal {
  344. msg := &ProposalMessage{Proposal: rs.Proposal}
  345. peer.Send(DataChannel, msg)
  346. ps.SetHasProposal(rs.Proposal)
  347. continue OUTER_LOOP
  348. }
  349. // Send proposal POL parts?
  350. if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) {
  351. if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok {
  352. msg := &PartMessage{
  353. Height: rs.Height,
  354. Round: rs.Round,
  355. Type: partTypeProposalPOL,
  356. Part: rs.ProposalPOLParts.GetPart(index),
  357. }
  358. peer.Send(DataChannel, msg)
  359. ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
  360. continue OUTER_LOOP
  361. }
  362. }
  363. // Nothing to do. Sleep.
  364. time.Sleep(peerGossipSleepDuration)
  365. continue OUTER_LOOP
  366. }
  367. }
  368. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  369. OUTER_LOOP:
  370. for {
  371. // Manage disconnects from self or peer.
  372. if !peer.IsRunning() || !conR.IsRunning() {
  373. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  374. return
  375. }
  376. rs := conR.conS.GetRoundState()
  377. prs := ps.GetRoundState()
  378. trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) {
  379. if prs.Height == voteSet.Height() {
  380. // Initialize Prevotes/Precommits/Commits if needed
  381. ps.EnsureVoteBitArrays(prs.Height, voteSet.Size())
  382. }
  383. // TODO: give priority to our vote.
  384. if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok {
  385. vote := voteSet.GetByIndex(index)
  386. // NOTE: vote may be a commit.
  387. msg := &VoteMessage{index, vote}
  388. peer.Send(VoteChannel, msg)
  389. ps.SetHasVote(vote, index)
  390. return true
  391. }
  392. return false
  393. }
  394. trySendCommitFromValidation := func(blockMeta *types.BlockMeta, validation *types.Validation, peerVoteSet BitArray) (sent bool) {
  395. // Initialize Commits if needed
  396. ps.EnsureVoteBitArrays(prs.Height, uint(len(validation.Commits)))
  397. if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
  398. commit := validation.Commits[index]
  399. log.Debug("Picked commit to send", "index", index, "commit", commit)
  400. // Reconstruct vote.
  401. vote := &types.Vote{
  402. Height: prs.Height,
  403. Round: commit.Round,
  404. Type: types.VoteTypeCommit,
  405. BlockHash: blockMeta.Hash,
  406. BlockParts: blockMeta.Parts,
  407. Signature: commit.Signature,
  408. }
  409. msg := &VoteMessage{index, vote}
  410. peer.Send(VoteChannel, msg)
  411. ps.SetHasVote(vote, index)
  412. return true
  413. }
  414. return false
  415. }
  416. // If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
  417. if rs.Height == prs.Height {
  418. // If there are lastcommits to send...
  419. if prs.Round == 0 && prs.Step == RoundStepNewHeight {
  420. if prs.LastCommits.Size() == rs.LastCommits.Size() {
  421. if trySendVote(rs.LastCommits, prs.LastCommits) {
  422. continue OUTER_LOOP
  423. }
  424. }
  425. }
  426. // If there are prevotes to send...
  427. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  428. if trySendVote(rs.Prevotes, prs.Prevotes) {
  429. continue OUTER_LOOP
  430. }
  431. }
  432. // If there are precommits to send...
  433. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  434. if trySendVote(rs.Precommits, prs.Precommits) {
  435. continue OUTER_LOOP
  436. }
  437. }
  438. // If there are any commits to send...
  439. if trySendVote(rs.Commits, prs.Commits) {
  440. continue OUTER_LOOP
  441. }
  442. }
  443. // Catchup logic
  444. if prs.Height != 0 && !prs.HasAllCatchupCommits {
  445. // If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits.
  446. if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 {
  447. // If there are lastcommits to send...
  448. if trySendVote(rs.LastCommits, prs.Commits) {
  449. continue OUTER_LOOP
  450. } else {
  451. ps.SetHasAllCatchupCommits(prs.Height)
  452. }
  453. }
  454. // Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation.
  455. if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 {
  456. // Load the blockMeta for block at prs.Height
  457. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  458. // Load the seen validation for prs.Height
  459. validation := conR.blockStore.LoadSeenValidation(prs.Height)
  460. log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation)
  461. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  462. continue OUTER_LOOP
  463. } else {
  464. ps.SetHasAllCatchupCommits(prs.Height)
  465. }
  466. }
  467. // If peer is lagging by more than 1, send Validation.
  468. if rs.Height >= prs.Height+2 {
  469. // Load the blockMeta for block at prs.Height
  470. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  471. // Load the block validation for prs.Height+1,
  472. // which contains commit signatures for prs.Height.
  473. validation := conR.blockStore.LoadBlockValidation(prs.Height + 1)
  474. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height+1, "blockMeta", blockMeta, "validation", validation)
  475. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  476. continue OUTER_LOOP
  477. } else {
  478. ps.SetHasAllCatchupCommits(prs.Height)
  479. }
  480. }
  481. }
  482. // We sent nothing. Sleep...
  483. time.Sleep(peerGossipSleepDuration)
  484. continue OUTER_LOOP
  485. }
  486. }
  487. //-----------------------------------------------------------------------------
  488. // Read only when returned by PeerState.GetRoundState().
  489. type PeerRoundState struct {
  490. Height uint // Height peer is at
  491. Round uint // Round peer is at
  492. Step RoundStepType // Step peer is at
  493. StartTime time.Time // Estimated start of round 0 at this height
  494. Proposal bool // True if peer has proposal for this round
  495. ProposalBlockParts types.PartSetHeader //
  496. ProposalBlockBitArray BitArray // True bit -> has part
  497. ProposalPOLParts types.PartSetHeader //
  498. ProposalPOLBitArray BitArray // True bit -> has part
  499. Prevotes BitArray // All votes peer has for this round
  500. Precommits BitArray // All precommits peer has for this round
  501. Commits BitArray // All commits peer has for this height
  502. LastCommits BitArray // All commits peer has for last height
  503. HasAllCatchupCommits bool // Used for catch-up
  504. }
  505. //-----------------------------------------------------------------------------
  506. var (
  507. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  508. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  509. )
  510. type PeerState struct {
  511. mtx sync.Mutex
  512. PeerRoundState
  513. }
  514. func NewPeerState(peer *p2p.Peer) *PeerState {
  515. return &PeerState{}
  516. }
  517. // Returns an atomic snapshot of the PeerRoundState.
  518. // There's no point in mutating it since it won't change PeerState.
  519. func (ps *PeerState) GetRoundState() *PeerRoundState {
  520. ps.mtx.Lock()
  521. defer ps.mtx.Unlock()
  522. prs := ps.PeerRoundState // copy
  523. return &prs
  524. }
  525. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  526. ps.mtx.Lock()
  527. defer ps.mtx.Unlock()
  528. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  529. return
  530. }
  531. if ps.Proposal {
  532. return
  533. }
  534. ps.Proposal = true
  535. ps.ProposalBlockParts = proposal.BlockParts
  536. ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
  537. ps.ProposalPOLParts = proposal.POLParts
  538. ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
  539. }
  540. func (ps *PeerState) SetHasProposalBlockPart(height uint, round uint, index uint) {
  541. ps.mtx.Lock()
  542. defer ps.mtx.Unlock()
  543. if ps.Height != height || ps.Round != round {
  544. return
  545. }
  546. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  547. }
  548. func (ps *PeerState) SetHasProposalPOLPart(height uint, round uint, index uint) {
  549. ps.mtx.Lock()
  550. defer ps.mtx.Unlock()
  551. if ps.Height != height || ps.Round != round {
  552. return
  553. }
  554. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  555. }
  556. func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) {
  557. ps.mtx.Lock()
  558. defer ps.mtx.Unlock()
  559. if ps.Height != height {
  560. return
  561. }
  562. if ps.Prevotes.IsZero() {
  563. ps.Prevotes = NewBitArray(numValidators)
  564. }
  565. if ps.Precommits.IsZero() {
  566. ps.Precommits = NewBitArray(numValidators)
  567. }
  568. if ps.Commits.IsZero() {
  569. ps.Commits = NewBitArray(numValidators)
  570. }
  571. }
  572. func (ps *PeerState) SetHasVote(vote *types.Vote, index uint) {
  573. ps.mtx.Lock()
  574. defer ps.mtx.Unlock()
  575. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  576. }
  577. func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) {
  578. if ps.Height == height+1 && type_ == types.VoteTypeCommit {
  579. // Special case for LastCommits.
  580. ps.LastCommits.SetIndex(index, true)
  581. return
  582. } else if ps.Height != height {
  583. // Does not apply.
  584. return
  585. }
  586. switch type_ {
  587. case types.VoteTypePrevote:
  588. ps.Prevotes.SetIndex(index, true)
  589. case types.VoteTypePrecommit:
  590. ps.Precommits.SetIndex(index, true)
  591. case types.VoteTypeCommit:
  592. if round < ps.Round {
  593. ps.Prevotes.SetIndex(index, true)
  594. ps.Precommits.SetIndex(index, true)
  595. }
  596. ps.Commits.SetIndex(index, true)
  597. default:
  598. panic("Invalid vote type")
  599. }
  600. }
  601. // When catching up, this helps keep track of whether
  602. // we should send more commit votes from the block (validation) store
  603. func (ps *PeerState) SetHasAllCatchupCommits(height uint) {
  604. ps.mtx.Lock()
  605. defer ps.mtx.Unlock()
  606. if ps.Height == height {
  607. ps.HasAllCatchupCommits = true
  608. }
  609. }
  610. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  611. ps.mtx.Lock()
  612. defer ps.mtx.Unlock()
  613. // Just remember these values.
  614. psHeight := ps.Height
  615. psRound := ps.Round
  616. //psStep := ps.Step
  617. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  618. ps.Height = msg.Height
  619. ps.Round = msg.Round
  620. ps.Step = msg.Step
  621. ps.StartTime = startTime
  622. if psHeight != msg.Height || psRound != msg.Round {
  623. ps.Proposal = false
  624. ps.ProposalBlockParts = types.PartSetHeader{}
  625. ps.ProposalBlockBitArray = BitArray{}
  626. ps.ProposalPOLParts = types.PartSetHeader{}
  627. ps.ProposalPOLBitArray = BitArray{}
  628. // We'll update the BitArray capacity later.
  629. ps.Prevotes = BitArray{}
  630. ps.Precommits = BitArray{}
  631. }
  632. if psHeight != msg.Height {
  633. // Shift Commits to LastCommits
  634. if psHeight+1 == msg.Height {
  635. ps.LastCommits = ps.Commits
  636. } else {
  637. ps.LastCommits = BitArray{}
  638. }
  639. // We'll update the BitArray capacity later.
  640. ps.Commits = BitArray{}
  641. ps.HasAllCatchupCommits = false
  642. }
  643. }
  644. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  645. ps.mtx.Lock()
  646. defer ps.mtx.Unlock()
  647. if ps.Height != msg.Height {
  648. return
  649. }
  650. ps.ProposalBlockParts = msg.BlockParts
  651. ps.ProposalBlockBitArray = msg.BlockBitArray
  652. }
  653. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  654. ps.mtx.Lock()
  655. defer ps.mtx.Unlock()
  656. // Special case for LastCommits
  657. if ps.Height == msg.Height+1 && msg.Type == types.VoteTypeCommit {
  658. ps.LastCommits.SetIndex(msg.Index, true)
  659. return
  660. } else if ps.Height != msg.Height {
  661. return
  662. }
  663. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  664. }
  665. //-----------------------------------------------------------------------------
  666. // Messages
  667. const (
  668. msgTypeNewRoundStep = byte(0x01)
  669. msgTypeCommitStep = byte(0x02)
  670. msgTypeProposal = byte(0x11)
  671. msgTypePart = byte(0x12) // both block & POL
  672. msgTypeVote = byte(0x13)
  673. msgTypeHasVote = byte(0x14)
  674. )
  675. type ConsensusMessage interface{}
  676. var _ = binary.RegisterInterface(
  677. struct{ ConsensusMessage }{},
  678. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  679. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  680. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  681. binary.ConcreteType{&PartMessage{}, msgTypePart},
  682. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  683. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  684. )
  685. // TODO: check for unnecessary extra bytes at the end.
  686. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  687. msgType = bz[0]
  688. n := new(int64)
  689. r := bytes.NewReader(bz)
  690. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  691. return
  692. }
  693. //-------------------------------------
  694. type NewRoundStepMessage struct {
  695. Height uint
  696. Round uint
  697. Step RoundStepType
  698. SecondsSinceStartTime uint
  699. }
  700. func (m *NewRoundStepMessage) String() string {
  701. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v]", m.Height, m.Round, m.Step)
  702. }
  703. //-------------------------------------
  704. type CommitStepMessage struct {
  705. Height uint
  706. BlockParts types.PartSetHeader
  707. BlockBitArray BitArray
  708. }
  709. func (m *CommitStepMessage) String() string {
  710. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockParts, m.BlockBitArray)
  711. }
  712. //-------------------------------------
  713. type ProposalMessage struct {
  714. Proposal *Proposal
  715. }
  716. func (m *ProposalMessage) String() string {
  717. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  718. }
  719. //-------------------------------------
  720. const (
  721. partTypeProposalBlock = byte(0x01)
  722. partTypeProposalPOL = byte(0x02)
  723. )
  724. type PartMessage struct {
  725. Height uint
  726. Round uint
  727. Type byte
  728. Part *types.Part
  729. }
  730. func (m *PartMessage) String() string {
  731. return fmt.Sprintf("[Part H:%v R:%v T:%X P:%v]", m.Height, m.Round, m.Type, m.Part)
  732. }
  733. //-------------------------------------
  734. type VoteMessage struct {
  735. ValidatorIndex uint
  736. Vote *types.Vote
  737. }
  738. func (m *VoteMessage) String() string {
  739. return fmt.Sprintf("[Vote VI:%v V:%v]", m.ValidatorIndex, m.Vote)
  740. }
  741. //-------------------------------------
  742. type HasVoteMessage struct {
  743. Height uint
  744. Round uint
  745. Type byte
  746. Index uint
  747. }
  748. func (m *HasVoteMessage) String() string {
  749. return fmt.Sprintf("[HasVote %v/%v T:%X]", m.Height, m.Round, m.Type)
  750. }