You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

863 lines
24 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. peerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. )
  26. //-----------------------------------------------------------------------------
  27. // The reactor's underlying ConsensusState may change state at any time.
  28. // We atomically copy the RoundState struct before using it.
  29. type ConsensusReactor struct {
  30. sw *p2p.Switch
  31. running uint32
  32. quit chan struct{}
  33. blockStore *bc.BlockStore
  34. conS *ConsensusState
  35. evsw events.Fireable
  36. }
  37. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor {
  38. conR := &ConsensusReactor{
  39. blockStore: blockStore,
  40. quit: make(chan struct{}),
  41. conS: consensusState,
  42. }
  43. return conR
  44. }
  45. // Implements Reactor
  46. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  47. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  48. log.Info("Starting ConsensusReactor")
  49. conR.sw = sw
  50. conR.conS.Start()
  51. go conR.broadcastNewRoundStepRoutine()
  52. }
  53. }
  54. // Implements Reactor
  55. func (conR *ConsensusReactor) Stop() {
  56. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  57. log.Info("Stopping ConsensusReactor")
  58. conR.conS.Stop()
  59. close(conR.quit)
  60. }
  61. }
  62. func (conR *ConsensusReactor) IsRunning() bool {
  63. return atomic.LoadUint32(&conR.running) == 1
  64. }
  65. // Implements Reactor
  66. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  67. // TODO optimize
  68. return []*p2p.ChannelDescriptor{
  69. &p2p.ChannelDescriptor{
  70. Id: StateChannel,
  71. Priority: 5,
  72. },
  73. &p2p.ChannelDescriptor{
  74. Id: DataChannel,
  75. Priority: 5,
  76. },
  77. &p2p.ChannelDescriptor{
  78. Id: VoteChannel,
  79. Priority: 5,
  80. },
  81. }
  82. }
  83. // Implements Reactor
  84. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  85. if !conR.IsRunning() {
  86. return
  87. }
  88. // Create peerState for peer
  89. peerState := NewPeerState(peer)
  90. peer.Data.Set(peerStateKey, peerState)
  91. // Begin gossip routines for this peer.
  92. go conR.gossipDataRoutine(peer, peerState)
  93. go conR.gossipVotesRoutine(peer, peerState)
  94. // Send our state to peer.
  95. conR.sendNewRoundStepRoutine(peer)
  96. }
  97. // Implements Reactor
  98. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  99. if !conR.IsRunning() {
  100. return
  101. }
  102. //peer.Data.Get(peerStateKey).(*PeerState).Disconnect()
  103. }
  104. // Implements Reactor
  105. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  106. if !conR.IsRunning() {
  107. return
  108. }
  109. // Get round state
  110. rs := conR.conS.GetRoundState()
  111. ps := peer.Data.Get(peerStateKey).(*PeerState)
  112. _, msg_, err := DecodeMessage(msgBytes)
  113. if err != nil {
  114. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  115. return
  116. }
  117. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes)
  118. switch chId {
  119. case StateChannel:
  120. switch msg := msg_.(type) {
  121. case *NewRoundStepMessage:
  122. ps.ApplyNewRoundStepMessage(msg, rs)
  123. case *CommitStepMessage:
  124. ps.ApplyCommitStepMessage(msg)
  125. case *HasVoteMessage:
  126. ps.ApplyHasVoteMessage(msg)
  127. default:
  128. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  129. }
  130. case DataChannel:
  131. switch msg := msg_.(type) {
  132. case *ProposalMessage:
  133. ps.SetHasProposal(msg.Proposal)
  134. err = conR.conS.SetProposal(msg.Proposal)
  135. case *PartMessage:
  136. if msg.Type == partTypeProposalBlock {
  137. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  138. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  139. } else if msg.Type == partTypeProposalPOL {
  140. ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
  141. _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
  142. } else {
  143. log.Warn(Fmt("Unknown part type %v", msg.Type))
  144. }
  145. default:
  146. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  147. }
  148. case VoteChannel:
  149. switch msg := msg_.(type) {
  150. case *VoteMessage:
  151. vote := msg.Vote
  152. if rs.Height != vote.Height {
  153. return // Wrong height. Not necessarily a bad peer.
  154. }
  155. validatorIndex := msg.ValidatorIndex
  156. address, _ := rs.Validators.GetByIndex(validatorIndex)
  157. added, index, err := conR.conS.AddVote(address, vote)
  158. if err != nil {
  159. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  160. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  161. log.Warn("Found conflicting vote. Publish evidence")
  162. evidenceTx := &types.DupeoutTx{
  163. Address: address,
  164. VoteA: *errDupe.VoteA,
  165. VoteB: *errDupe.VoteB,
  166. }
  167. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  168. } else {
  169. // Probably an invalid signature. Bad peer.
  170. log.Warn("Error attempting to add vote", "error", err)
  171. // TODO: punish peer
  172. }
  173. }
  174. // Initialize Prevotes/Precommits/Commits if needed
  175. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  176. ps.SetHasVote(vote, index)
  177. if added {
  178. msg := &HasVoteMessage{
  179. Height: vote.Height,
  180. Round: vote.Round,
  181. Type: vote.Type,
  182. Index: index,
  183. }
  184. conR.sw.Broadcast(StateChannel, msg)
  185. }
  186. default:
  187. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  188. }
  189. default:
  190. log.Warn(Fmt("Unknown channel %X", chId))
  191. }
  192. if err != nil {
  193. log.Warn("Error in Receive()", "error", err)
  194. }
  195. }
  196. // Sets our private validator account for signing votes.
  197. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  198. conR.conS.SetPrivValidator(priv)
  199. }
  200. // Reset to some state.
  201. func (conR *ConsensusReactor) ResetToState(state *sm.State) {
  202. conR.conS.updateToState(state, false)
  203. }
  204. // implements events.Eventable
  205. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  206. conR.evsw = evsw
  207. conR.conS.SetFireable(evsw)
  208. }
  209. //--------------------------------------
  210. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  211. // Get seconds since beginning of height.
  212. timeElapsed := time.Now().Sub(rs.StartTime)
  213. // Broadcast NewRoundStepMessage
  214. nrsMsg = &NewRoundStepMessage{
  215. Height: rs.Height,
  216. Round: rs.Round,
  217. Step: rs.Step,
  218. SecondsSinceStartTime: uint(timeElapsed.Seconds()),
  219. }
  220. // If the step is commit, then also broadcast a CommitStepMessage.
  221. if rs.Step == RoundStepCommit {
  222. csMsg = &CommitStepMessage{
  223. Height: rs.Height,
  224. BlockParts: rs.ProposalBlockParts.Header(),
  225. BlockBitArray: rs.ProposalBlockParts.BitArray(),
  226. }
  227. }
  228. return
  229. }
  230. // Listens for changes to the ConsensusState.Step by pulling
  231. // on conR.conS.NewStepCh().
  232. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  233. for {
  234. // Get RoundState with new Step or quit.
  235. var rs *RoundState
  236. select {
  237. case rs = <-conR.conS.NewStepCh():
  238. case <-conR.quit:
  239. return
  240. }
  241. nrsMsg, csMsg := makeRoundStepMessages(rs)
  242. if nrsMsg != nil {
  243. conR.sw.Broadcast(StateChannel, nrsMsg)
  244. }
  245. if csMsg != nil {
  246. conR.sw.Broadcast(StateChannel, csMsg)
  247. }
  248. }
  249. }
  250. func (conR *ConsensusReactor) sendNewRoundStepRoutine(peer *p2p.Peer) {
  251. rs := conR.conS.GetRoundState()
  252. nrsMsg, csMsg := makeRoundStepMessages(rs)
  253. if nrsMsg != nil {
  254. peer.Send(StateChannel, nrsMsg)
  255. }
  256. if csMsg != nil {
  257. peer.Send(StateChannel, nrsMsg)
  258. }
  259. }
  260. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  261. OUTER_LOOP:
  262. for {
  263. // Manage disconnects from self or peer.
  264. if !peer.IsRunning() || !conR.IsRunning() {
  265. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  266. return
  267. }
  268. rs := conR.conS.GetRoundState()
  269. prs := ps.GetRoundState()
  270. // Send proposal Block parts?
  271. // NOTE: if we or peer is at RoundStepCommit*, the round
  272. // won't necessarily match, but that's OK.
  273. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
  274. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  275. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
  276. part := rs.ProposalBlockParts.GetPart(index)
  277. msg := &PartMessage{
  278. Height: rs.Height,
  279. Round: rs.Round,
  280. Type: partTypeProposalBlock,
  281. Part: part,
  282. }
  283. peer.Send(DataChannel, msg)
  284. ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
  285. continue OUTER_LOOP
  286. }
  287. }
  288. // If the peer is on a previous height, help catch up.
  289. if 0 < prs.Height && prs.Height < rs.Height {
  290. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
  291. if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
  292. // Ensure that the peer's PartSetHeader is correct
  293. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  294. if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
  295. log.Debug("Peer ProposalBlockParts mismatch, sleeping",
  296. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  297. time.Sleep(peerGossipSleepDuration)
  298. continue OUTER_LOOP
  299. }
  300. // Load the part
  301. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  302. if part == nil {
  303. log.Warn("Could not load part", "index", index,
  304. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  305. time.Sleep(peerGossipSleepDuration)
  306. continue OUTER_LOOP
  307. }
  308. // Send the part
  309. msg := &PartMessage{
  310. Height: prs.Height,
  311. Round: prs.Round,
  312. Type: partTypeProposalBlock,
  313. Part: part,
  314. }
  315. peer.Send(DataChannel, msg)
  316. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  317. continue OUTER_LOOP
  318. } else {
  319. //log.Debug("No parts to send in catch-up, sleeping")
  320. time.Sleep(peerGossipSleepDuration)
  321. continue OUTER_LOOP
  322. }
  323. }
  324. // If height and round don't match, sleep.
  325. if rs.Height != prs.Height || rs.Round != prs.Round {
  326. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  327. time.Sleep(peerGossipSleepDuration)
  328. continue OUTER_LOOP
  329. }
  330. // Send proposal?
  331. if rs.Proposal != nil && !prs.Proposal {
  332. msg := &ProposalMessage{Proposal: rs.Proposal}
  333. peer.Send(DataChannel, msg)
  334. ps.SetHasProposal(rs.Proposal)
  335. continue OUTER_LOOP
  336. }
  337. // Send proposal POL parts?
  338. if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) {
  339. if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok {
  340. msg := &PartMessage{
  341. Height: rs.Height,
  342. Round: rs.Round,
  343. Type: partTypeProposalPOL,
  344. Part: rs.ProposalPOLParts.GetPart(index),
  345. }
  346. peer.Send(DataChannel, msg)
  347. ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
  348. continue OUTER_LOOP
  349. }
  350. }
  351. // Nothing to do. Sleep.
  352. time.Sleep(peerGossipSleepDuration)
  353. continue OUTER_LOOP
  354. }
  355. }
  356. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  357. OUTER_LOOP:
  358. for {
  359. // Manage disconnects from self or peer.
  360. if !peer.IsRunning() || !conR.IsRunning() {
  361. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  362. return
  363. }
  364. rs := conR.conS.GetRoundState()
  365. prs := ps.GetRoundState()
  366. trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) {
  367. if prs.Height == voteSet.Height() {
  368. // Initialize Prevotes/Precommits/Commits if needed
  369. ps.EnsureVoteBitArrays(prs.Height, voteSet.Size())
  370. }
  371. // TODO: give priority to our vote.
  372. if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok {
  373. vote := voteSet.GetByIndex(index)
  374. // NOTE: vote may be a commit.
  375. msg := &VoteMessage{index, vote}
  376. peer.Send(VoteChannel, msg)
  377. ps.SetHasVote(vote, index)
  378. return true
  379. }
  380. return false
  381. }
  382. trySendCommitFromValidation := func(blockMeta *types.BlockMeta, validation *types.Validation, peerVoteSet BitArray) (sent bool) {
  383. // Initialize Commits if needed
  384. ps.EnsureVoteBitArrays(prs.Height, uint(len(validation.Commits)))
  385. if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
  386. commit := validation.Commits[index]
  387. log.Debug("Picked commit to send", "index", index, "commit", commit)
  388. // Reconstruct vote.
  389. vote := &types.Vote{
  390. Height: prs.Height,
  391. Round: commit.Round,
  392. Type: types.VoteTypeCommit,
  393. BlockHash: blockMeta.Hash,
  394. BlockParts: blockMeta.Parts,
  395. Signature: commit.Signature,
  396. }
  397. msg := &VoteMessage{index, vote}
  398. peer.Send(VoteChannel, msg)
  399. ps.SetHasVote(vote, index)
  400. return true
  401. }
  402. return false
  403. }
  404. // If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
  405. if rs.Height == prs.Height {
  406. // If there are lastcommits to send...
  407. if prs.Round == 0 && prs.Step == RoundStepNewHeight {
  408. if prs.LastCommits.Size() == rs.LastCommits.Size() {
  409. if trySendVote(rs.LastCommits, prs.LastCommits) {
  410. continue OUTER_LOOP
  411. }
  412. }
  413. }
  414. // If there are prevotes to send...
  415. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  416. if trySendVote(rs.Prevotes, prs.Prevotes) {
  417. continue OUTER_LOOP
  418. }
  419. }
  420. // If there are precommits to send...
  421. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  422. if trySendVote(rs.Precommits, prs.Precommits) {
  423. continue OUTER_LOOP
  424. }
  425. }
  426. // If there are any commits to send...
  427. if trySendVote(rs.Commits, prs.Commits) {
  428. continue OUTER_LOOP
  429. }
  430. }
  431. // Catchup logic
  432. if prs.Height != 0 && !prs.HasAllCatchupCommits {
  433. // If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits.
  434. if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 {
  435. // If there are lastcommits to send...
  436. if trySendVote(rs.LastCommits, prs.Commits) {
  437. continue OUTER_LOOP
  438. } else {
  439. ps.SetHasAllCatchupCommits(prs.Height)
  440. }
  441. }
  442. // Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation.
  443. if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 {
  444. // Load the blockMeta for block at prs.Height
  445. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  446. // Load the seen validation for prs.Height
  447. validation := conR.blockStore.LoadSeenValidation(prs.Height)
  448. log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation)
  449. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  450. continue OUTER_LOOP
  451. } else {
  452. ps.SetHasAllCatchupCommits(prs.Height)
  453. }
  454. }
  455. // If peer is lagging by more than 1, send Validation.
  456. if rs.Height >= prs.Height+2 {
  457. // Load the blockMeta for block at prs.Height
  458. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  459. // Load the block validation for prs.Height+1,
  460. // which contains commit signatures for prs.Height.
  461. validation := conR.blockStore.LoadBlockValidation(prs.Height + 1)
  462. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height+1, "blockMeta", blockMeta, "validation", validation)
  463. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  464. continue OUTER_LOOP
  465. } else {
  466. ps.SetHasAllCatchupCommits(prs.Height)
  467. }
  468. }
  469. }
  470. // We sent nothing. Sleep...
  471. time.Sleep(peerGossipSleepDuration)
  472. continue OUTER_LOOP
  473. }
  474. }
  475. //-----------------------------------------------------------------------------
  476. // Read only when returned by PeerState.GetRoundState().
  477. type PeerRoundState struct {
  478. Height uint // Height peer is at
  479. Round uint // Round peer is at
  480. Step RoundStep // Step peer is at
  481. StartTime time.Time // Estimated start of round 0 at this height
  482. Proposal bool // True if peer has proposal for this round
  483. ProposalBlockParts types.PartSetHeader //
  484. ProposalBlockBitArray BitArray // True bit -> has part
  485. ProposalPOLParts types.PartSetHeader //
  486. ProposalPOLBitArray BitArray // True bit -> has part
  487. Prevotes BitArray // All votes peer has for this round
  488. Precommits BitArray // All precommits peer has for this round
  489. Commits BitArray // All commits peer has for this height
  490. LastCommits BitArray // All commits peer has for last height
  491. HasAllCatchupCommits bool // Used for catch-up
  492. }
  493. //-----------------------------------------------------------------------------
  494. var (
  495. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  496. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  497. )
  498. type PeerState struct {
  499. mtx sync.Mutex
  500. PeerRoundState
  501. }
  502. func NewPeerState(peer *p2p.Peer) *PeerState {
  503. return &PeerState{}
  504. }
  505. // Returns an atomic snapshot of the PeerRoundState.
  506. // There's no point in mutating it since it won't change PeerState.
  507. func (ps *PeerState) GetRoundState() *PeerRoundState {
  508. ps.mtx.Lock()
  509. defer ps.mtx.Unlock()
  510. prs := ps.PeerRoundState // copy
  511. return &prs
  512. }
  513. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  514. ps.mtx.Lock()
  515. defer ps.mtx.Unlock()
  516. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  517. return
  518. }
  519. if ps.Proposal {
  520. return
  521. }
  522. ps.Proposal = true
  523. ps.ProposalBlockParts = proposal.BlockParts
  524. ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
  525. ps.ProposalPOLParts = proposal.POLParts
  526. ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
  527. }
  528. func (ps *PeerState) SetHasProposalBlockPart(height uint, round uint, index uint) {
  529. ps.mtx.Lock()
  530. defer ps.mtx.Unlock()
  531. if ps.Height != height || ps.Round != round {
  532. return
  533. }
  534. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  535. }
  536. func (ps *PeerState) SetHasProposalPOLPart(height uint, round uint, index uint) {
  537. ps.mtx.Lock()
  538. defer ps.mtx.Unlock()
  539. if ps.Height != height || ps.Round != round {
  540. return
  541. }
  542. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  543. }
  544. func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) {
  545. ps.mtx.Lock()
  546. defer ps.mtx.Unlock()
  547. if ps.Height != height {
  548. return
  549. }
  550. if ps.Prevotes.IsZero() {
  551. ps.Prevotes = NewBitArray(numValidators)
  552. }
  553. if ps.Precommits.IsZero() {
  554. ps.Precommits = NewBitArray(numValidators)
  555. }
  556. if ps.Commits.IsZero() {
  557. ps.Commits = NewBitArray(numValidators)
  558. }
  559. }
  560. func (ps *PeerState) SetHasVote(vote *types.Vote, index uint) {
  561. ps.mtx.Lock()
  562. defer ps.mtx.Unlock()
  563. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  564. }
  565. func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) {
  566. if ps.Height == height+1 && type_ == types.VoteTypeCommit {
  567. // Special case for LastCommits.
  568. ps.LastCommits.SetIndex(index, true)
  569. return
  570. } else if ps.Height != height {
  571. // Does not apply.
  572. return
  573. }
  574. switch type_ {
  575. case types.VoteTypePrevote:
  576. ps.Prevotes.SetIndex(index, true)
  577. case types.VoteTypePrecommit:
  578. ps.Precommits.SetIndex(index, true)
  579. case types.VoteTypeCommit:
  580. if round < ps.Round {
  581. ps.Prevotes.SetIndex(index, true)
  582. ps.Precommits.SetIndex(index, true)
  583. }
  584. ps.Commits.SetIndex(index, true)
  585. default:
  586. panic("Invalid vote type")
  587. }
  588. }
  589. // When catching up, this helps keep track of whether
  590. // we should send more commit votes from the block (validation) store
  591. func (ps *PeerState) SetHasAllCatchupCommits(height uint) {
  592. ps.mtx.Lock()
  593. defer ps.mtx.Unlock()
  594. if ps.Height == height {
  595. ps.HasAllCatchupCommits = true
  596. }
  597. }
  598. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  599. ps.mtx.Lock()
  600. defer ps.mtx.Unlock()
  601. // Just remember these values.
  602. psHeight := ps.Height
  603. psRound := ps.Round
  604. //psStep := ps.Step
  605. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  606. ps.Height = msg.Height
  607. ps.Round = msg.Round
  608. ps.Step = msg.Step
  609. ps.StartTime = startTime
  610. if psHeight != msg.Height || psRound != msg.Round {
  611. ps.Proposal = false
  612. ps.ProposalBlockParts = types.PartSetHeader{}
  613. ps.ProposalBlockBitArray = BitArray{}
  614. ps.ProposalPOLParts = types.PartSetHeader{}
  615. ps.ProposalPOLBitArray = BitArray{}
  616. // We'll update the BitArray capacity later.
  617. ps.Prevotes = BitArray{}
  618. ps.Precommits = BitArray{}
  619. }
  620. if psHeight != msg.Height {
  621. // Shift Commits to LastCommits
  622. if psHeight+1 == msg.Height {
  623. ps.LastCommits = ps.Commits
  624. } else {
  625. ps.LastCommits = BitArray{}
  626. }
  627. // We'll update the BitArray capacity later.
  628. ps.Commits = BitArray{}
  629. ps.HasAllCatchupCommits = false
  630. }
  631. }
  632. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  633. ps.mtx.Lock()
  634. defer ps.mtx.Unlock()
  635. if ps.Height != msg.Height {
  636. return
  637. }
  638. ps.ProposalBlockParts = msg.BlockParts
  639. ps.ProposalBlockBitArray = msg.BlockBitArray
  640. }
  641. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  642. ps.mtx.Lock()
  643. defer ps.mtx.Unlock()
  644. // Special case for LastCommits
  645. if ps.Height == msg.Height+1 && msg.Type == types.VoteTypeCommit {
  646. ps.LastCommits.SetIndex(msg.Index, true)
  647. return
  648. } else if ps.Height != msg.Height {
  649. return
  650. }
  651. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  652. }
  653. //-----------------------------------------------------------------------------
  654. // Messages
  655. const (
  656. msgTypeNewRoundStep = byte(0x01)
  657. msgTypeCommitStep = byte(0x02)
  658. msgTypeProposal = byte(0x11)
  659. msgTypePart = byte(0x12) // both block & POL
  660. msgTypeVote = byte(0x13)
  661. msgTypeHasVote = byte(0x14)
  662. )
  663. type ConsensusMessage interface{}
  664. var _ = binary.RegisterInterface(
  665. struct{ ConsensusMessage }{},
  666. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  667. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  668. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  669. binary.ConcreteType{&PartMessage{}, msgTypePart},
  670. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  671. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  672. )
  673. // TODO: check for unnecessary extra bytes at the end.
  674. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  675. msgType = bz[0]
  676. n := new(int64)
  677. r := bytes.NewReader(bz)
  678. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  679. return
  680. }
  681. //-------------------------------------
  682. type NewRoundStepMessage struct {
  683. Height uint
  684. Round uint
  685. Step RoundStep
  686. SecondsSinceStartTime uint
  687. }
  688. func (m *NewRoundStepMessage) String() string {
  689. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v]", m.Height, m.Round, m.Step)
  690. }
  691. //-------------------------------------
  692. type CommitStepMessage struct {
  693. Height uint
  694. BlockParts types.PartSetHeader
  695. BlockBitArray BitArray
  696. }
  697. func (m *CommitStepMessage) String() string {
  698. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockParts, m.BlockBitArray)
  699. }
  700. //-------------------------------------
  701. type ProposalMessage struct {
  702. Proposal *Proposal
  703. }
  704. func (m *ProposalMessage) String() string {
  705. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  706. }
  707. //-------------------------------------
  708. const (
  709. partTypeProposalBlock = byte(0x01)
  710. partTypeProposalPOL = byte(0x02)
  711. )
  712. type PartMessage struct {
  713. Height uint
  714. Round uint
  715. Type byte
  716. Part *types.Part
  717. }
  718. func (m *PartMessage) String() string {
  719. return fmt.Sprintf("[Part H:%v R:%v T:%X P:%v]", m.Height, m.Round, m.Type, m.Part)
  720. }
  721. //-------------------------------------
  722. type VoteMessage struct {
  723. ValidatorIndex uint
  724. Vote *types.Vote
  725. }
  726. func (m *VoteMessage) String() string {
  727. return fmt.Sprintf("[Vote VI:%v V:%v]", m.ValidatorIndex, m.Vote)
  728. }
  729. //-------------------------------------
  730. type HasVoteMessage struct {
  731. Height uint
  732. Round uint
  733. Type byte
  734. Index uint
  735. }
  736. func (m *HasVoteMessage) String() string {
  737. return fmt.Sprintf("[HasVote %v/%v T:%X]", m.Height, m.Round, m.Type)
  738. }