You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

861 lines
24 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/tendermint/tendermint/binary"
  10. bc "github.com/tendermint/tendermint/blockchain"
  11. . "github.com/tendermint/tendermint/common"
  12. . "github.com/tendermint/tendermint/consensus/types"
  13. "github.com/tendermint/tendermint/p2p"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. const (
  18. StateChannel = byte(0x20)
  19. DataChannel = byte(0x21)
  20. VoteChannel = byte(0x22)
  21. peerStateKey = "ConsensusReactor.peerState"
  22. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  23. )
  24. //-----------------------------------------------------------------------------
  25. // The reactor's underlying ConsensusState may change state at any time.
  26. // We atomically copy the RoundState struct before using it.
  27. type ConsensusReactor struct {
  28. sw *p2p.Switch
  29. running uint32
  30. quit chan struct{}
  31. blockStore *bc.BlockStore
  32. conS *ConsensusState
  33. }
  34. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor {
  35. conR := &ConsensusReactor{
  36. blockStore: blockStore,
  37. quit: make(chan struct{}),
  38. conS: consensusState,
  39. }
  40. return conR
  41. }
  42. // Implements Reactor
  43. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  44. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  45. log.Info("Starting ConsensusReactor")
  46. conR.sw = sw
  47. conR.conS.Start()
  48. go conR.broadcastNewRoundStepRoutine()
  49. }
  50. }
  51. // Implements Reactor
  52. func (conR *ConsensusReactor) Stop() {
  53. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  54. log.Info("Stopping ConsensusReactor")
  55. conR.conS.Stop()
  56. close(conR.quit)
  57. }
  58. }
  59. func (conR *ConsensusReactor) IsRunning() bool {
  60. return atomic.LoadUint32(&conR.running) == 1
  61. }
  62. // Implements Reactor
  63. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  64. // TODO optimize
  65. return []*p2p.ChannelDescriptor{
  66. &p2p.ChannelDescriptor{
  67. Id: StateChannel,
  68. Priority: 5,
  69. },
  70. &p2p.ChannelDescriptor{
  71. Id: DataChannel,
  72. Priority: 5,
  73. },
  74. &p2p.ChannelDescriptor{
  75. Id: VoteChannel,
  76. Priority: 5,
  77. },
  78. }
  79. }
  80. // Implements Reactor
  81. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  82. if !conR.IsRunning() {
  83. return
  84. }
  85. // Create peerState for peer
  86. peerState := NewPeerState(peer)
  87. peer.Data.Set(peerStateKey, peerState)
  88. // Begin gossip routines for this peer.
  89. go conR.gossipDataRoutine(peer, peerState)
  90. go conR.gossipVotesRoutine(peer, peerState)
  91. // Send our state to peer.
  92. conR.sendNewRoundStepRoutine(peer)
  93. }
  94. // Implements Reactor
  95. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  96. if !conR.IsRunning() {
  97. return
  98. }
  99. //peer.Data.Get(peerStateKey).(*PeerState).Disconnect()
  100. }
  101. // Implements Reactor
  102. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  103. if !conR.IsRunning() {
  104. return
  105. }
  106. // Get round state
  107. rs := conR.conS.GetRoundState()
  108. ps := peer.Data.Get(peerStateKey).(*PeerState)
  109. _, msg_, err := DecodeMessage(msgBytes)
  110. if err != nil {
  111. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  112. return
  113. }
  114. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes)
  115. switch chId {
  116. case StateChannel:
  117. switch msg := msg_.(type) {
  118. case *NewRoundStepMessage:
  119. ps.ApplyNewRoundStepMessage(msg, rs)
  120. case *CommitStepMessage:
  121. ps.ApplyCommitStepMessage(msg)
  122. case *HasVoteMessage:
  123. ps.ApplyHasVoteMessage(msg)
  124. default:
  125. // Ignore unknown message
  126. }
  127. case DataChannel:
  128. switch msg := msg_.(type) {
  129. case *Proposal:
  130. ps.SetHasProposal(msg)
  131. err = conR.conS.SetProposal(msg)
  132. case *PartMessage:
  133. if msg.Type == partTypeProposalBlock {
  134. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  135. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  136. } else if msg.Type == partTypeProposalPOL {
  137. ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
  138. _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
  139. } else {
  140. // Ignore unknown part type
  141. }
  142. default:
  143. // Ignore unknown message
  144. }
  145. case VoteChannel:
  146. switch msg := msg_.(type) {
  147. case *VoteMessage:
  148. vote := msg.Vote
  149. if rs.Height != vote.Height {
  150. return // Wrong height. Not necessarily a bad peer.
  151. }
  152. validatorIndex := msg.ValidatorIndex
  153. address, _ := rs.Validators.GetByIndex(validatorIndex)
  154. added, index, err := conR.conS.AddVote(address, vote)
  155. if err != nil {
  156. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  157. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  158. log.Warn("Found conflicting vote. Publish evidence")
  159. evidenceTx := &types.DupeoutTx{
  160. Address: address,
  161. VoteA: *errDupe.VoteA,
  162. VoteB: *errDupe.VoteB,
  163. }
  164. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  165. } else {
  166. // Probably an invalid signature. Bad peer.
  167. log.Warn("Error attempting to add vote", "error", err)
  168. // TODO: punish peer
  169. }
  170. }
  171. // Initialize Prevotes/Precommits/Commits if needed
  172. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  173. ps.SetHasVote(vote, index)
  174. if added {
  175. msg := &HasVoteMessage{
  176. Height: vote.Height,
  177. Round: vote.Round,
  178. Type: vote.Type,
  179. Index: index,
  180. }
  181. conR.sw.Broadcast(StateChannel, msg)
  182. }
  183. default:
  184. // Ignore unknown message
  185. }
  186. default:
  187. // Ignore unknown channel
  188. }
  189. if err != nil {
  190. log.Warn("Error in Receive()", "error", err)
  191. }
  192. }
  193. // Sets our private validator account for signing votes.
  194. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  195. conR.conS.SetPrivValidator(priv)
  196. }
  197. // Reset to some state.
  198. func (conR *ConsensusReactor) ResetToState(state *sm.State) {
  199. conR.conS.updateToState(state, false)
  200. }
  201. //--------------------------------------
  202. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  203. // Get seconds since beginning of height.
  204. timeElapsed := time.Now().Sub(rs.StartTime)
  205. // Broadcast NewRoundStepMessage
  206. nrsMsg = &NewRoundStepMessage{
  207. Height: rs.Height,
  208. Round: rs.Round,
  209. Step: rs.Step,
  210. SecondsSinceStartTime: uint(timeElapsed.Seconds()),
  211. }
  212. // If the step is commit, then also broadcast a CommitStepMessage.
  213. if rs.Step == RoundStepCommit {
  214. csMsg = &CommitStepMessage{
  215. Height: rs.Height,
  216. BlockParts: rs.ProposalBlockParts.Header(),
  217. BlockBitArray: rs.ProposalBlockParts.BitArray(),
  218. }
  219. }
  220. return
  221. }
  222. // Listens for changes to the ConsensusState.Step by pulling
  223. // on conR.conS.NewStepCh().
  224. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  225. for {
  226. // Get RoundState with new Step or quit.
  227. var rs *RoundState
  228. select {
  229. case rs = <-conR.conS.NewStepCh():
  230. case <-conR.quit:
  231. return
  232. }
  233. nrsMsg, csMsg := makeRoundStepMessages(rs)
  234. if nrsMsg != nil {
  235. conR.sw.Broadcast(StateChannel, nrsMsg)
  236. }
  237. if csMsg != nil {
  238. conR.sw.Broadcast(StateChannel, csMsg)
  239. }
  240. }
  241. }
  242. func (conR *ConsensusReactor) sendNewRoundStepRoutine(peer *p2p.Peer) {
  243. rs := conR.conS.GetRoundState()
  244. nrsMsg, csMsg := makeRoundStepMessages(rs)
  245. if nrsMsg != nil {
  246. peer.Send(StateChannel, nrsMsg)
  247. }
  248. if csMsg != nil {
  249. peer.Send(StateChannel, nrsMsg)
  250. }
  251. }
  252. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  253. OUTER_LOOP:
  254. for {
  255. // Manage disconnects from self or peer.
  256. if !peer.IsRunning() || !conR.IsRunning() {
  257. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  258. return
  259. }
  260. rs := conR.conS.GetRoundState()
  261. prs := ps.GetRoundState()
  262. // Send proposal Block parts?
  263. // NOTE: if we or peer is at RoundStepCommit*, the round
  264. // won't necessarily match, but that's OK.
  265. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
  266. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  267. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
  268. part := rs.ProposalBlockParts.GetPart(index)
  269. msg := &PartMessage{
  270. Height: rs.Height,
  271. Round: rs.Round,
  272. Type: partTypeProposalBlock,
  273. Part: part,
  274. }
  275. peer.Send(DataChannel, msg)
  276. ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
  277. continue OUTER_LOOP
  278. }
  279. }
  280. // If the peer is on a previous height, help catch up.
  281. if 0 < prs.Height && prs.Height < rs.Height {
  282. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
  283. if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
  284. // Ensure that the peer's PartSetHeader is correct
  285. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  286. if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
  287. log.Debug("Peer ProposalBlockParts mismatch, sleeping",
  288. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  289. time.Sleep(peerGossipSleepDuration)
  290. continue OUTER_LOOP
  291. }
  292. // Load the part
  293. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  294. if part == nil {
  295. log.Warn("Could not load part", "index", index,
  296. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  297. time.Sleep(peerGossipSleepDuration)
  298. continue OUTER_LOOP
  299. }
  300. // Send the part
  301. msg := &PartMessage{
  302. Height: prs.Height,
  303. Round: prs.Round,
  304. Type: partTypeProposalBlock,
  305. Part: part,
  306. }
  307. peer.Send(DataChannel, msg)
  308. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  309. continue OUTER_LOOP
  310. } else {
  311. //log.Debug("No parts to send in catch-up, sleeping")
  312. time.Sleep(peerGossipSleepDuration)
  313. continue OUTER_LOOP
  314. }
  315. }
  316. // If height and round don't match, sleep.
  317. if rs.Height != prs.Height || rs.Round != prs.Round {
  318. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  319. time.Sleep(peerGossipSleepDuration)
  320. continue OUTER_LOOP
  321. }
  322. // Send proposal?
  323. if rs.Proposal != nil && !prs.Proposal {
  324. msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal}
  325. peer.Send(DataChannel, msg)
  326. ps.SetHasProposal(rs.Proposal)
  327. continue OUTER_LOOP
  328. }
  329. // Send proposal POL parts?
  330. if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) {
  331. if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok {
  332. msg := &PartMessage{
  333. Height: rs.Height,
  334. Round: rs.Round,
  335. Type: partTypeProposalPOL,
  336. Part: rs.ProposalPOLParts.GetPart(index),
  337. }
  338. peer.Send(DataChannel, msg)
  339. ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
  340. continue OUTER_LOOP
  341. }
  342. }
  343. // Nothing to do. Sleep.
  344. time.Sleep(peerGossipSleepDuration)
  345. continue OUTER_LOOP
  346. }
  347. }
  348. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  349. OUTER_LOOP:
  350. for {
  351. // Manage disconnects from self or peer.
  352. if !peer.IsRunning() || !conR.IsRunning() {
  353. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  354. return
  355. }
  356. rs := conR.conS.GetRoundState()
  357. prs := ps.GetRoundState()
  358. trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) {
  359. if prs.Height == voteSet.Height() {
  360. // Initialize Prevotes/Precommits/Commits if needed
  361. ps.EnsureVoteBitArrays(prs.Height, voteSet.Size())
  362. }
  363. // TODO: give priority to our vote.
  364. if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok {
  365. vote := voteSet.GetByIndex(index)
  366. // NOTE: vote may be a commit.
  367. msg := &VoteMessage{index, vote}
  368. peer.Send(VoteChannel, msg)
  369. ps.SetHasVote(vote, index)
  370. return true
  371. }
  372. return false
  373. }
  374. trySendCommitFromValidation := func(blockMeta *types.BlockMeta, validation *types.Validation, peerVoteSet BitArray) (sent bool) {
  375. // Initialize Commits if needed
  376. ps.EnsureVoteBitArrays(prs.Height, uint(len(validation.Commits)))
  377. if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
  378. commit := validation.Commits[index]
  379. log.Debug("Picked commit to send", "index", index, "commit", commit)
  380. // Reconstruct vote.
  381. vote := &types.Vote{
  382. Height: prs.Height,
  383. Round: commit.Round,
  384. Type: types.VoteTypeCommit,
  385. BlockHash: blockMeta.Hash,
  386. BlockParts: blockMeta.Parts,
  387. Signature: commit.Signature,
  388. }
  389. msg := &VoteMessage{index, vote}
  390. peer.Send(VoteChannel, msg)
  391. ps.SetHasVote(vote, index)
  392. return true
  393. }
  394. return false
  395. }
  396. // If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
  397. if rs.Height == prs.Height {
  398. // If there are lastcommits to send...
  399. if prs.Round == 0 && prs.Step == RoundStepNewHeight {
  400. if prs.LastCommits.Size() == rs.LastCommits.Size() {
  401. if trySendVote(rs.LastCommits, prs.LastCommits) {
  402. continue OUTER_LOOP
  403. }
  404. }
  405. }
  406. // If there are prevotes to send...
  407. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  408. if trySendVote(rs.Prevotes, prs.Prevotes) {
  409. continue OUTER_LOOP
  410. }
  411. }
  412. // If there are precommits to send...
  413. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  414. if trySendVote(rs.Precommits, prs.Precommits) {
  415. continue OUTER_LOOP
  416. }
  417. }
  418. // If there are any commits to send...
  419. if trySendVote(rs.Commits, prs.Commits) {
  420. continue OUTER_LOOP
  421. }
  422. }
  423. // Catchup logic
  424. if prs.Height != 0 && !prs.HasAllCatchupCommits {
  425. // If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits.
  426. if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 {
  427. // If there are lastcommits to send...
  428. if trySendVote(rs.LastCommits, prs.Commits) {
  429. continue OUTER_LOOP
  430. } else {
  431. ps.SetHasAllCatchupCommits(prs.Height)
  432. }
  433. }
  434. // Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation.
  435. if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 {
  436. // Load the blockMeta for block at prs.Height
  437. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  438. // Load the seen validation for prs.Height
  439. validation := conR.blockStore.LoadSeenValidation(prs.Height)
  440. log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation)
  441. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  442. continue OUTER_LOOP
  443. } else {
  444. ps.SetHasAllCatchupCommits(prs.Height)
  445. }
  446. }
  447. // If peer is lagging by more than 1, send Validation.
  448. if rs.Height >= prs.Height+2 {
  449. // Load the blockMeta for block at prs.Height
  450. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  451. // Load the block validation for prs.Height+1,
  452. // which contains commit signatures for prs.Height.
  453. validation := conR.blockStore.LoadBlockValidation(prs.Height + 1)
  454. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height+1, "blockMeta", blockMeta, "validation", validation)
  455. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  456. continue OUTER_LOOP
  457. } else {
  458. ps.SetHasAllCatchupCommits(prs.Height)
  459. }
  460. }
  461. }
  462. // We sent nothing. Sleep...
  463. time.Sleep(peerGossipSleepDuration)
  464. continue OUTER_LOOP
  465. }
  466. }
  467. //-----------------------------------------------------------------------------
  468. // Read only when returned by PeerState.GetRoundState().
  469. type PeerRoundState struct {
  470. Height uint // Height peer is at
  471. Round uint // Round peer is at
  472. Step RoundStep // Step peer is at
  473. StartTime time.Time // Estimated start of round 0 at this height
  474. Proposal bool // True if peer has proposal for this round
  475. ProposalBlockParts types.PartSetHeader //
  476. ProposalBlockBitArray BitArray // True bit -> has part
  477. ProposalPOLParts types.PartSetHeader //
  478. ProposalPOLBitArray BitArray // True bit -> has part
  479. Prevotes BitArray // All votes peer has for this round
  480. Precommits BitArray // All precommits peer has for this round
  481. Commits BitArray // All commits peer has for this height
  482. LastCommits BitArray // All commits peer has for last height
  483. HasAllCatchupCommits bool // Used for catch-up
  484. }
  485. //-----------------------------------------------------------------------------
  486. var (
  487. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  488. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  489. )
  490. type PeerState struct {
  491. mtx sync.Mutex
  492. PeerRoundState
  493. }
  494. func NewPeerState(peer *p2p.Peer) *PeerState {
  495. return &PeerState{}
  496. }
  497. // Returns an atomic snapshot of the PeerRoundState.
  498. // There's no point in mutating it since it won't change PeerState.
  499. func (ps *PeerState) GetRoundState() *PeerRoundState {
  500. ps.mtx.Lock()
  501. defer ps.mtx.Unlock()
  502. prs := ps.PeerRoundState // copy
  503. return &prs
  504. }
  505. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  506. ps.mtx.Lock()
  507. defer ps.mtx.Unlock()
  508. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  509. return
  510. }
  511. if ps.Proposal {
  512. return
  513. }
  514. ps.Proposal = true
  515. ps.ProposalBlockParts = proposal.BlockParts
  516. ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
  517. ps.ProposalPOLParts = proposal.POLParts
  518. ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
  519. }
  520. func (ps *PeerState) SetHasProposalBlockPart(height uint, round uint, index uint) {
  521. ps.mtx.Lock()
  522. defer ps.mtx.Unlock()
  523. if ps.Height != height || ps.Round != round {
  524. return
  525. }
  526. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  527. }
  528. func (ps *PeerState) SetHasProposalPOLPart(height uint, round uint, index uint) {
  529. ps.mtx.Lock()
  530. defer ps.mtx.Unlock()
  531. if ps.Height != height || ps.Round != round {
  532. return
  533. }
  534. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  535. }
  536. func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) {
  537. ps.mtx.Lock()
  538. defer ps.mtx.Unlock()
  539. if ps.Height != height {
  540. return
  541. }
  542. if ps.Prevotes.IsZero() {
  543. ps.Prevotes = NewBitArray(numValidators)
  544. }
  545. if ps.Precommits.IsZero() {
  546. ps.Precommits = NewBitArray(numValidators)
  547. }
  548. if ps.Commits.IsZero() {
  549. ps.Commits = NewBitArray(numValidators)
  550. }
  551. }
  552. func (ps *PeerState) SetHasVote(vote *types.Vote, index uint) {
  553. ps.mtx.Lock()
  554. defer ps.mtx.Unlock()
  555. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  556. }
  557. func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) {
  558. if ps.Height == height+1 && type_ == types.VoteTypeCommit {
  559. // Special case for LastCommits.
  560. ps.LastCommits.SetIndex(index, true)
  561. return
  562. } else if ps.Height != height {
  563. // Does not apply.
  564. return
  565. }
  566. switch type_ {
  567. case types.VoteTypePrevote:
  568. ps.Prevotes.SetIndex(index, true)
  569. case types.VoteTypePrecommit:
  570. ps.Precommits.SetIndex(index, true)
  571. case types.VoteTypeCommit:
  572. if round < ps.Round {
  573. ps.Prevotes.SetIndex(index, true)
  574. ps.Precommits.SetIndex(index, true)
  575. }
  576. ps.Commits.SetIndex(index, true)
  577. default:
  578. panic("Invalid vote type")
  579. }
  580. }
  581. // When catching up, this helps keep track of whether
  582. // we should send more commit votes from the block (validation) store
  583. func (ps *PeerState) SetHasAllCatchupCommits(height uint) {
  584. ps.mtx.Lock()
  585. defer ps.mtx.Unlock()
  586. if ps.Height == height {
  587. ps.HasAllCatchupCommits = true
  588. }
  589. }
  590. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  591. ps.mtx.Lock()
  592. defer ps.mtx.Unlock()
  593. // Just remember these values.
  594. psHeight := ps.Height
  595. psRound := ps.Round
  596. //psStep := ps.Step
  597. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  598. ps.Height = msg.Height
  599. ps.Round = msg.Round
  600. ps.Step = msg.Step
  601. ps.StartTime = startTime
  602. if psHeight != msg.Height || psRound != msg.Round {
  603. ps.Proposal = false
  604. ps.ProposalBlockParts = types.PartSetHeader{}
  605. ps.ProposalBlockBitArray = BitArray{}
  606. ps.ProposalPOLParts = types.PartSetHeader{}
  607. ps.ProposalPOLBitArray = BitArray{}
  608. // We'll update the BitArray capacity later.
  609. ps.Prevotes = BitArray{}
  610. ps.Precommits = BitArray{}
  611. }
  612. if psHeight != msg.Height {
  613. // Shift Commits to LastCommits
  614. if psHeight+1 == msg.Height {
  615. ps.LastCommits = ps.Commits
  616. } else {
  617. ps.LastCommits = BitArray{}
  618. }
  619. // We'll update the BitArray capacity later.
  620. ps.Commits = BitArray{}
  621. ps.HasAllCatchupCommits = false
  622. }
  623. }
  624. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  625. ps.mtx.Lock()
  626. defer ps.mtx.Unlock()
  627. if ps.Height != msg.Height {
  628. return
  629. }
  630. ps.ProposalBlockParts = msg.BlockParts
  631. ps.ProposalBlockBitArray = msg.BlockBitArray
  632. }
  633. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  634. ps.mtx.Lock()
  635. defer ps.mtx.Unlock()
  636. // Special case for LastCommits
  637. if ps.Height == msg.Height+1 && msg.Type == types.VoteTypeCommit {
  638. ps.LastCommits.SetIndex(msg.Index, true)
  639. return
  640. } else if ps.Height != msg.Height {
  641. return
  642. }
  643. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  644. }
  645. //-----------------------------------------------------------------------------
  646. // Messages
  647. const (
  648. msgTypeUnknown = byte(0x00)
  649. msgTypeNewRoundStep = byte(0x01)
  650. msgTypeCommitStep = byte(0x02)
  651. msgTypeProposal = byte(0x11)
  652. msgTypePart = byte(0x12) // both block & POL
  653. msgTypeVote = byte(0x13)
  654. msgTypeHasVote = byte(0x14)
  655. )
  656. // TODO: check for unnecessary extra bytes at the end.
  657. func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
  658. n := new(int64)
  659. // log.Debug(Fmt("decoding msg bytes: %X", bz))
  660. msgType = bz[0]
  661. r := bytes.NewReader(bz)
  662. switch msgType {
  663. // Messages for communicating state changes
  664. case msgTypeNewRoundStep:
  665. msg = binary.ReadBinary(&NewRoundStepMessage{}, r, n, &err)
  666. case msgTypeCommitStep:
  667. msg = binary.ReadBinary(&CommitStepMessage{}, r, n, &err)
  668. // Messages of data
  669. case msgTypeProposal:
  670. r.ReadByte() // Consume the byte
  671. msg = binary.ReadBinary(&Proposal{}, r, n, &err)
  672. case msgTypePart:
  673. msg = binary.ReadBinary(&PartMessage{}, r, n, &err)
  674. case msgTypeVote:
  675. msg = binary.ReadBinary(&VoteMessage{}, r, n, &err)
  676. case msgTypeHasVote:
  677. msg = binary.ReadBinary(&HasVoteMessage{}, r, n, &err)
  678. default:
  679. msg = nil
  680. }
  681. return
  682. }
  683. //-------------------------------------
  684. type NewRoundStepMessage struct {
  685. Height uint
  686. Round uint
  687. Step RoundStep
  688. SecondsSinceStartTime uint
  689. }
  690. func (m *NewRoundStepMessage) TypeByte() byte { return msgTypeNewRoundStep }
  691. func (m *NewRoundStepMessage) String() string {
  692. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v]", m.Height, m.Round, m.Step)
  693. }
  694. //-------------------------------------
  695. type CommitStepMessage struct {
  696. Height uint
  697. BlockParts types.PartSetHeader
  698. BlockBitArray BitArray
  699. }
  700. func (m *CommitStepMessage) TypeByte() byte { return msgTypeCommitStep }
  701. func (m *CommitStepMessage) String() string {
  702. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockParts, m.BlockBitArray)
  703. }
  704. //-------------------------------------
  705. const (
  706. partTypeProposalBlock = byte(0x01)
  707. partTypeProposalPOL = byte(0x02)
  708. )
  709. type PartMessage struct {
  710. Height uint
  711. Round uint
  712. Type byte
  713. Part *types.Part
  714. }
  715. func (m *PartMessage) TypeByte() byte { return msgTypePart }
  716. func (m *PartMessage) String() string {
  717. return fmt.Sprintf("[Part H:%v R:%v T:%X P:%v]", m.Height, m.Round, m.Type, m.Part)
  718. }
  719. //-------------------------------------
  720. type VoteMessage struct {
  721. ValidatorIndex uint
  722. Vote *types.Vote
  723. }
  724. func (m *VoteMessage) TypeByte() byte { return msgTypeVote }
  725. func (m *VoteMessage) String() string {
  726. return fmt.Sprintf("[Vote VI:%v V:%v]", m.ValidatorIndex, m.Vote)
  727. }
  728. //-------------------------------------
  729. type HasVoteMessage struct {
  730. Height uint
  731. Round uint
  732. Type byte
  733. Index uint
  734. }
  735. func (m *HasVoteMessage) TypeByte() byte { return msgTypeHasVote }
  736. func (m *HasVoteMessage) String() string {
  737. return fmt.Sprintf("[HasVote %v/%v T:%X]", m.Height, m.Round, m.Type)
  738. }