You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

881 lines
24 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/tendermint/tendermint/binary"
  10. bc "github.com/tendermint/tendermint/blockchain"
  11. . "github.com/tendermint/tendermint/common"
  12. . "github.com/tendermint/tendermint/consensus/types"
  13. "github.com/tendermint/tendermint/events"
  14. "github.com/tendermint/tendermint/p2p"
  15. sm "github.com/tendermint/tendermint/state"
  16. "github.com/tendermint/tendermint/types"
  17. )
  18. const (
  19. StateChannel = byte(0x20)
  20. DataChannel = byte(0x21)
  21. VoteChannel = byte(0x22)
  22. peerStateKey = "ConsensusReactor.peerState"
  23. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  24. )
  25. //-----------------------------------------------------------------------------
  26. // The reactor's underlying ConsensusState may change state at any time.
  27. // We atomically copy the RoundState struct before using it.
  28. type ConsensusReactor struct {
  29. sw *p2p.Switch
  30. running uint32
  31. quit chan struct{}
  32. blockStore *bc.BlockStore
  33. conS *ConsensusState
  34. evsw *events.EventSwitch
  35. }
  36. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor {
  37. conR := &ConsensusReactor{
  38. blockStore: blockStore,
  39. quit: make(chan struct{}),
  40. conS: consensusState,
  41. }
  42. return conR
  43. }
  44. // Implements Reactor
  45. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  46. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  47. log.Info("Starting ConsensusReactor")
  48. conR.sw = sw
  49. conR.conS.Start()
  50. go conR.broadcastNewRoundStepRoutine()
  51. }
  52. }
  53. // Implements Reactor
  54. func (conR *ConsensusReactor) Stop() {
  55. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  56. log.Info("Stopping ConsensusReactor")
  57. conR.conS.Stop()
  58. close(conR.quit)
  59. }
  60. }
  61. func (conR *ConsensusReactor) IsRunning() bool {
  62. return atomic.LoadUint32(&conR.running) == 1
  63. }
  64. // Implements Reactor
  65. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  66. // TODO optimize
  67. return []*p2p.ChannelDescriptor{
  68. &p2p.ChannelDescriptor{
  69. Id: StateChannel,
  70. Priority: 5,
  71. },
  72. &p2p.ChannelDescriptor{
  73. Id: DataChannel,
  74. Priority: 5,
  75. },
  76. &p2p.ChannelDescriptor{
  77. Id: VoteChannel,
  78. Priority: 5,
  79. },
  80. }
  81. }
  82. // Implements Reactor
  83. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  84. if !conR.IsRunning() {
  85. return
  86. }
  87. // Create peerState for peer
  88. peerState := NewPeerState(peer)
  89. peer.Data.Set(peerStateKey, peerState)
  90. // Begin gossip routines for this peer.
  91. go conR.gossipDataRoutine(peer, peerState)
  92. go conR.gossipVotesRoutine(peer, peerState)
  93. // Send our state to peer.
  94. conR.sendNewRoundStepRoutine(peer)
  95. }
  96. // Implements Reactor
  97. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  98. if !conR.IsRunning() {
  99. return
  100. }
  101. //peer.Data.Get(peerStateKey).(*PeerState).Disconnect()
  102. }
  103. // Implements Reactor
  104. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  105. if !conR.IsRunning() {
  106. return
  107. }
  108. // Get round state
  109. rs := conR.conS.GetRoundState()
  110. ps := peer.Data.Get(peerStateKey).(*PeerState)
  111. _, msg_, err := DecodeMessage(msgBytes)
  112. if err != nil {
  113. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  114. return
  115. }
  116. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes)
  117. switch chId {
  118. case StateChannel:
  119. switch msg := msg_.(type) {
  120. case *NewRoundStepMessage:
  121. ps.ApplyNewRoundStepMessage(msg, rs)
  122. case *CommitStepMessage:
  123. ps.ApplyCommitStepMessage(msg)
  124. case *HasVoteMessage:
  125. ps.ApplyHasVoteMessage(msg)
  126. default:
  127. // Ignore unknown message
  128. }
  129. case DataChannel:
  130. switch msg := msg_.(type) {
  131. case *ProposalMessage:
  132. ps.SetHasProposal(msg.Proposal)
  133. err = conR.conS.SetProposal(msg.Proposal)
  134. case *PartMessage:
  135. if msg.Type == partTypeProposalBlock {
  136. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  137. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  138. } else if msg.Type == partTypeProposalPOL {
  139. ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
  140. _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
  141. } else {
  142. // Ignore unknown part type
  143. }
  144. default:
  145. // Ignore unknown message
  146. }
  147. case VoteChannel:
  148. switch msg := msg_.(type) {
  149. case *VoteMessage:
  150. vote := msg.Vote
  151. if rs.Height != vote.Height {
  152. return // Wrong height. Not necessarily a bad peer.
  153. }
  154. validatorIndex := msg.ValidatorIndex
  155. address, _ := rs.Validators.GetByIndex(validatorIndex)
  156. added, index, err := conR.conS.AddVote(address, vote)
  157. if err != nil {
  158. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  159. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  160. log.Warn("Found conflicting vote. Publish evidence")
  161. evidenceTx := &types.DupeoutTx{
  162. Address: address,
  163. VoteA: *errDupe.VoteA,
  164. VoteB: *errDupe.VoteB,
  165. }
  166. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  167. } else {
  168. // Probably an invalid signature. Bad peer.
  169. log.Warn("Error attempting to add vote", "error", err)
  170. // TODO: punish peer
  171. }
  172. }
  173. // Initialize Prevotes/Precommits/Commits if needed
  174. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  175. ps.SetHasVote(vote, index)
  176. if added {
  177. msg := &HasVoteMessage{
  178. Height: vote.Height,
  179. Round: vote.Round,
  180. Type: vote.Type,
  181. Index: index,
  182. }
  183. conR.sw.Broadcast(StateChannel, msg)
  184. }
  185. default:
  186. // Ignore unknown message
  187. }
  188. default:
  189. // Ignore unknown channel
  190. }
  191. if err != nil {
  192. log.Warn("Error in Receive()", "error", err)
  193. }
  194. }
  195. // Sets our private validator account for signing votes.
  196. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  197. conR.conS.SetPrivValidator(priv)
  198. }
  199. // Reset to some state.
  200. func (conR *ConsensusReactor) ResetToState(state *sm.State) {
  201. conR.conS.updateToState(state, false)
  202. }
  203. // implements events.Eventable
  204. func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
  205. conR.evsw = evsw
  206. }
  207. //--------------------------------------
  208. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  209. // Get seconds since beginning of height.
  210. timeElapsed := time.Now().Sub(rs.StartTime)
  211. // Broadcast NewRoundStepMessage
  212. nrsMsg = &NewRoundStepMessage{
  213. Height: rs.Height,
  214. Round: rs.Round,
  215. Step: rs.Step,
  216. SecondsSinceStartTime: uint(timeElapsed.Seconds()),
  217. }
  218. // If the step is commit, then also broadcast a CommitStepMessage.
  219. if rs.Step == RoundStepCommit {
  220. csMsg = &CommitStepMessage{
  221. Height: rs.Height,
  222. BlockParts: rs.ProposalBlockParts.Header(),
  223. BlockBitArray: rs.ProposalBlockParts.BitArray(),
  224. }
  225. }
  226. return
  227. }
  228. // Listens for changes to the ConsensusState.Step by pulling
  229. // on conR.conS.NewStepCh().
  230. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  231. for {
  232. // Get RoundState with new Step or quit.
  233. var rs *RoundState
  234. select {
  235. case rs = <-conR.conS.NewStepCh():
  236. case <-conR.quit:
  237. return
  238. }
  239. nrsMsg, csMsg := makeRoundStepMessages(rs)
  240. if nrsMsg != nil {
  241. conR.sw.Broadcast(StateChannel, nrsMsg)
  242. }
  243. if csMsg != nil {
  244. conR.sw.Broadcast(StateChannel, csMsg)
  245. }
  246. }
  247. }
  248. func (conR *ConsensusReactor) sendNewRoundStepRoutine(peer *p2p.Peer) {
  249. rs := conR.conS.GetRoundState()
  250. nrsMsg, csMsg := makeRoundStepMessages(rs)
  251. if nrsMsg != nil {
  252. peer.Send(StateChannel, nrsMsg)
  253. }
  254. if csMsg != nil {
  255. peer.Send(StateChannel, nrsMsg)
  256. }
  257. }
  258. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  259. OUTER_LOOP:
  260. for {
  261. // Manage disconnects from self or peer.
  262. if !peer.IsRunning() || !conR.IsRunning() {
  263. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  264. return
  265. }
  266. rs := conR.conS.GetRoundState()
  267. prs := ps.GetRoundState()
  268. // Send proposal Block parts?
  269. // NOTE: if we or peer is at RoundStepCommit*, the round
  270. // won't necessarily match, but that's OK.
  271. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
  272. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  273. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
  274. part := rs.ProposalBlockParts.GetPart(index)
  275. msg := &PartMessage{
  276. Height: rs.Height,
  277. Round: rs.Round,
  278. Type: partTypeProposalBlock,
  279. Part: part,
  280. }
  281. peer.Send(DataChannel, msg)
  282. ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
  283. continue OUTER_LOOP
  284. }
  285. }
  286. // If the peer is on a previous height, help catch up.
  287. if 0 < prs.Height && prs.Height < rs.Height {
  288. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
  289. if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
  290. // Ensure that the peer's PartSetHeader is correct
  291. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  292. if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
  293. log.Debug("Peer ProposalBlockParts mismatch, sleeping",
  294. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  295. time.Sleep(peerGossipSleepDuration)
  296. continue OUTER_LOOP
  297. }
  298. // Load the part
  299. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  300. if part == nil {
  301. log.Warn("Could not load part", "index", index,
  302. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  303. time.Sleep(peerGossipSleepDuration)
  304. continue OUTER_LOOP
  305. }
  306. // Send the part
  307. msg := &PartMessage{
  308. Height: prs.Height,
  309. Round: prs.Round,
  310. Type: partTypeProposalBlock,
  311. Part: part,
  312. }
  313. peer.Send(DataChannel, msg)
  314. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  315. continue OUTER_LOOP
  316. } else {
  317. //log.Debug("No parts to send in catch-up, sleeping")
  318. time.Sleep(peerGossipSleepDuration)
  319. continue OUTER_LOOP
  320. }
  321. }
  322. // If height and round don't match, sleep.
  323. if rs.Height != prs.Height || rs.Round != prs.Round {
  324. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  325. time.Sleep(peerGossipSleepDuration)
  326. continue OUTER_LOOP
  327. }
  328. // Send proposal?
  329. if rs.Proposal != nil && !prs.Proposal {
  330. msg := &ProposalMessage{Proposal: rs.Proposal}
  331. peer.Send(DataChannel, msg)
  332. ps.SetHasProposal(rs.Proposal)
  333. continue OUTER_LOOP
  334. }
  335. // Send proposal POL parts?
  336. if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) {
  337. if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok {
  338. msg := &PartMessage{
  339. Height: rs.Height,
  340. Round: rs.Round,
  341. Type: partTypeProposalPOL,
  342. Part: rs.ProposalPOLParts.GetPart(index),
  343. }
  344. peer.Send(DataChannel, msg)
  345. ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
  346. continue OUTER_LOOP
  347. }
  348. }
  349. // Nothing to do. Sleep.
  350. time.Sleep(peerGossipSleepDuration)
  351. continue OUTER_LOOP
  352. }
  353. }
  354. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  355. OUTER_LOOP:
  356. for {
  357. // Manage disconnects from self or peer.
  358. if !peer.IsRunning() || !conR.IsRunning() {
  359. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  360. return
  361. }
  362. rs := conR.conS.GetRoundState()
  363. prs := ps.GetRoundState()
  364. trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) {
  365. if prs.Height == voteSet.Height() {
  366. // Initialize Prevotes/Precommits/Commits if needed
  367. ps.EnsureVoteBitArrays(prs.Height, voteSet.Size())
  368. }
  369. // TODO: give priority to our vote.
  370. if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok {
  371. vote := voteSet.GetByIndex(index)
  372. // NOTE: vote may be a commit.
  373. msg := &VoteMessage{index, vote}
  374. peer.Send(VoteChannel, msg)
  375. ps.SetHasVote(vote, index)
  376. return true
  377. }
  378. return false
  379. }
  380. trySendCommitFromValidation := func(blockMeta *types.BlockMeta, validation *types.Validation, peerVoteSet BitArray) (sent bool) {
  381. // Initialize Commits if needed
  382. ps.EnsureVoteBitArrays(prs.Height, uint(len(validation.Commits)))
  383. if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
  384. commit := validation.Commits[index]
  385. log.Debug("Picked commit to send", "index", index, "commit", commit)
  386. // Reconstruct vote.
  387. vote := &types.Vote{
  388. Height: prs.Height,
  389. Round: commit.Round,
  390. Type: types.VoteTypeCommit,
  391. BlockHash: blockMeta.Hash,
  392. BlockParts: blockMeta.Parts,
  393. Signature: commit.Signature,
  394. }
  395. msg := &VoteMessage{index, vote}
  396. peer.Send(VoteChannel, msg)
  397. ps.SetHasVote(vote, index)
  398. return true
  399. }
  400. return false
  401. }
  402. // If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
  403. if rs.Height == prs.Height {
  404. // If there are lastcommits to send...
  405. if prs.Round == 0 && prs.Step == RoundStepNewHeight {
  406. if prs.LastCommits.Size() == rs.LastCommits.Size() {
  407. if trySendVote(rs.LastCommits, prs.LastCommits) {
  408. continue OUTER_LOOP
  409. }
  410. }
  411. }
  412. // If there are prevotes to send...
  413. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  414. if trySendVote(rs.Prevotes, prs.Prevotes) {
  415. continue OUTER_LOOP
  416. }
  417. }
  418. // If there are precommits to send...
  419. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  420. if trySendVote(rs.Precommits, prs.Precommits) {
  421. continue OUTER_LOOP
  422. }
  423. }
  424. // If there are any commits to send...
  425. if trySendVote(rs.Commits, prs.Commits) {
  426. continue OUTER_LOOP
  427. }
  428. }
  429. // Catchup logic
  430. if prs.Height != 0 && !prs.HasAllCatchupCommits {
  431. // If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits.
  432. if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 {
  433. // If there are lastcommits to send...
  434. if trySendVote(rs.LastCommits, prs.Commits) {
  435. continue OUTER_LOOP
  436. } else {
  437. ps.SetHasAllCatchupCommits(prs.Height)
  438. }
  439. }
  440. // Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation.
  441. if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 {
  442. // Load the blockMeta for block at prs.Height
  443. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  444. // Load the seen validation for prs.Height
  445. validation := conR.blockStore.LoadSeenValidation(prs.Height)
  446. log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation)
  447. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  448. continue OUTER_LOOP
  449. } else {
  450. ps.SetHasAllCatchupCommits(prs.Height)
  451. }
  452. }
  453. // If peer is lagging by more than 1, send Validation.
  454. if rs.Height >= prs.Height+2 {
  455. // Load the blockMeta for block at prs.Height
  456. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  457. // Load the block validation for prs.Height+1,
  458. // which contains commit signatures for prs.Height.
  459. validation := conR.blockStore.LoadBlockValidation(prs.Height + 1)
  460. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height+1, "blockMeta", blockMeta, "validation", validation)
  461. if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
  462. continue OUTER_LOOP
  463. } else {
  464. ps.SetHasAllCatchupCommits(prs.Height)
  465. }
  466. }
  467. }
  468. // We sent nothing. Sleep...
  469. time.Sleep(peerGossipSleepDuration)
  470. continue OUTER_LOOP
  471. }
  472. }
  473. //-----------------------------------------------------------------------------
  474. // Read only when returned by PeerState.GetRoundState().
  475. type PeerRoundState struct {
  476. Height uint // Height peer is at
  477. Round uint // Round peer is at
  478. Step RoundStep // Step peer is at
  479. StartTime time.Time // Estimated start of round 0 at this height
  480. Proposal bool // True if peer has proposal for this round
  481. ProposalBlockParts types.PartSetHeader //
  482. ProposalBlockBitArray BitArray // True bit -> has part
  483. ProposalPOLParts types.PartSetHeader //
  484. ProposalPOLBitArray BitArray // True bit -> has part
  485. Prevotes BitArray // All votes peer has for this round
  486. Precommits BitArray // All precommits peer has for this round
  487. Commits BitArray // All commits peer has for this height
  488. LastCommits BitArray // All commits peer has for last height
  489. HasAllCatchupCommits bool // Used for catch-up
  490. }
  491. //-----------------------------------------------------------------------------
  492. var (
  493. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  494. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  495. )
  496. type PeerState struct {
  497. mtx sync.Mutex
  498. PeerRoundState
  499. }
  500. func NewPeerState(peer *p2p.Peer) *PeerState {
  501. return &PeerState{}
  502. }
  503. // Returns an atomic snapshot of the PeerRoundState.
  504. // There's no point in mutating it since it won't change PeerState.
  505. func (ps *PeerState) GetRoundState() *PeerRoundState {
  506. ps.mtx.Lock()
  507. defer ps.mtx.Unlock()
  508. prs := ps.PeerRoundState // copy
  509. return &prs
  510. }
  511. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  512. ps.mtx.Lock()
  513. defer ps.mtx.Unlock()
  514. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  515. return
  516. }
  517. if ps.Proposal {
  518. return
  519. }
  520. ps.Proposal = true
  521. ps.ProposalBlockParts = proposal.BlockParts
  522. ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
  523. ps.ProposalPOLParts = proposal.POLParts
  524. ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
  525. }
  526. func (ps *PeerState) SetHasProposalBlockPart(height uint, round uint, index uint) {
  527. ps.mtx.Lock()
  528. defer ps.mtx.Unlock()
  529. if ps.Height != height || ps.Round != round {
  530. return
  531. }
  532. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  533. }
  534. func (ps *PeerState) SetHasProposalPOLPart(height uint, round uint, index uint) {
  535. ps.mtx.Lock()
  536. defer ps.mtx.Unlock()
  537. if ps.Height != height || ps.Round != round {
  538. return
  539. }
  540. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  541. }
  542. func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) {
  543. ps.mtx.Lock()
  544. defer ps.mtx.Unlock()
  545. if ps.Height != height {
  546. return
  547. }
  548. if ps.Prevotes.IsZero() {
  549. ps.Prevotes = NewBitArray(numValidators)
  550. }
  551. if ps.Precommits.IsZero() {
  552. ps.Precommits = NewBitArray(numValidators)
  553. }
  554. if ps.Commits.IsZero() {
  555. ps.Commits = NewBitArray(numValidators)
  556. }
  557. }
  558. func (ps *PeerState) SetHasVote(vote *types.Vote, index uint) {
  559. ps.mtx.Lock()
  560. defer ps.mtx.Unlock()
  561. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  562. }
  563. func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) {
  564. if ps.Height == height+1 && type_ == types.VoteTypeCommit {
  565. // Special case for LastCommits.
  566. ps.LastCommits.SetIndex(index, true)
  567. return
  568. } else if ps.Height != height {
  569. // Does not apply.
  570. return
  571. }
  572. switch type_ {
  573. case types.VoteTypePrevote:
  574. ps.Prevotes.SetIndex(index, true)
  575. case types.VoteTypePrecommit:
  576. ps.Precommits.SetIndex(index, true)
  577. case types.VoteTypeCommit:
  578. if round < ps.Round {
  579. ps.Prevotes.SetIndex(index, true)
  580. ps.Precommits.SetIndex(index, true)
  581. }
  582. ps.Commits.SetIndex(index, true)
  583. default:
  584. panic("Invalid vote type")
  585. }
  586. }
  587. // When catching up, this helps keep track of whether
  588. // we should send more commit votes from the block (validation) store
  589. func (ps *PeerState) SetHasAllCatchupCommits(height uint) {
  590. ps.mtx.Lock()
  591. defer ps.mtx.Unlock()
  592. if ps.Height == height {
  593. ps.HasAllCatchupCommits = true
  594. }
  595. }
  596. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  597. ps.mtx.Lock()
  598. defer ps.mtx.Unlock()
  599. // Just remember these values.
  600. psHeight := ps.Height
  601. psRound := ps.Round
  602. //psStep := ps.Step
  603. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  604. ps.Height = msg.Height
  605. ps.Round = msg.Round
  606. ps.Step = msg.Step
  607. ps.StartTime = startTime
  608. if psHeight != msg.Height || psRound != msg.Round {
  609. ps.Proposal = false
  610. ps.ProposalBlockParts = types.PartSetHeader{}
  611. ps.ProposalBlockBitArray = BitArray{}
  612. ps.ProposalPOLParts = types.PartSetHeader{}
  613. ps.ProposalPOLBitArray = BitArray{}
  614. // We'll update the BitArray capacity later.
  615. ps.Prevotes = BitArray{}
  616. ps.Precommits = BitArray{}
  617. }
  618. if psHeight != msg.Height {
  619. // Shift Commits to LastCommits
  620. if psHeight+1 == msg.Height {
  621. ps.LastCommits = ps.Commits
  622. } else {
  623. ps.LastCommits = BitArray{}
  624. }
  625. // We'll update the BitArray capacity later.
  626. ps.Commits = BitArray{}
  627. ps.HasAllCatchupCommits = false
  628. }
  629. }
  630. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  631. ps.mtx.Lock()
  632. defer ps.mtx.Unlock()
  633. if ps.Height != msg.Height {
  634. return
  635. }
  636. ps.ProposalBlockParts = msg.BlockParts
  637. ps.ProposalBlockBitArray = msg.BlockBitArray
  638. }
  639. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  640. ps.mtx.Lock()
  641. defer ps.mtx.Unlock()
  642. // Special case for LastCommits
  643. if ps.Height == msg.Height+1 && msg.Type == types.VoteTypeCommit {
  644. ps.LastCommits.SetIndex(msg.Index, true)
  645. return
  646. } else if ps.Height != msg.Height {
  647. return
  648. }
  649. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  650. }
  651. //-----------------------------------------------------------------------------
  652. // Messages
  653. const (
  654. msgTypeUnknown = byte(0x00)
  655. msgTypeNewRoundStep = byte(0x01)
  656. msgTypeCommitStep = byte(0x02)
  657. msgTypeProposal = byte(0x11)
  658. msgTypePart = byte(0x12) // both block & POL
  659. msgTypeVote = byte(0x13)
  660. msgTypeHasVote = byte(0x14)
  661. )
  662. // TODO: check for unnecessary extra bytes at the end.
  663. func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
  664. n := new(int64)
  665. // log.Debug(Fmt("decoding msg bytes: %X", bz))
  666. msgType = bz[0]
  667. r := bytes.NewReader(bz)
  668. switch msgType {
  669. // Messages for communicating state changes
  670. case msgTypeNewRoundStep:
  671. msg = binary.ReadBinary(&NewRoundStepMessage{}, r, n, &err)
  672. case msgTypeCommitStep:
  673. msg = binary.ReadBinary(&CommitStepMessage{}, r, n, &err)
  674. // Messages of data
  675. case msgTypeProposal:
  676. msg = binary.ReadBinary(&ProposalMessage{}, r, n, &err)
  677. case msgTypePart:
  678. msg = binary.ReadBinary(&PartMessage{}, r, n, &err)
  679. case msgTypeVote:
  680. msg = binary.ReadBinary(&VoteMessage{}, r, n, &err)
  681. case msgTypeHasVote:
  682. msg = binary.ReadBinary(&HasVoteMessage{}, r, n, &err)
  683. default:
  684. log.Warn(Fmt("Ignoring unknown message %X", bz))
  685. msg = nil
  686. }
  687. return
  688. }
  689. //-------------------------------------
  690. type NewRoundStepMessage struct {
  691. Height uint
  692. Round uint
  693. Step RoundStep
  694. SecondsSinceStartTime uint
  695. }
  696. func (m *NewRoundStepMessage) TypeByte() byte { return msgTypeNewRoundStep }
  697. func (m *NewRoundStepMessage) String() string {
  698. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v]", m.Height, m.Round, m.Step)
  699. }
  700. //-------------------------------------
  701. type CommitStepMessage struct {
  702. Height uint
  703. BlockParts types.PartSetHeader
  704. BlockBitArray BitArray
  705. }
  706. func (m *CommitStepMessage) TypeByte() byte { return msgTypeCommitStep }
  707. func (m *CommitStepMessage) String() string {
  708. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockParts, m.BlockBitArray)
  709. }
  710. //-------------------------------------
  711. type ProposalMessage struct {
  712. Proposal *Proposal
  713. }
  714. func (m *ProposalMessage) TypeByte() byte { return msgTypeProposal }
  715. func (m *ProposalMessage) String() string {
  716. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  717. }
  718. //-------------------------------------
  719. const (
  720. partTypeProposalBlock = byte(0x01)
  721. partTypeProposalPOL = byte(0x02)
  722. )
  723. type PartMessage struct {
  724. Height uint
  725. Round uint
  726. Type byte
  727. Part *types.Part
  728. }
  729. func (m *PartMessage) TypeByte() byte { return msgTypePart }
  730. func (m *PartMessage) String() string {
  731. return fmt.Sprintf("[Part H:%v R:%v T:%X P:%v]", m.Height, m.Round, m.Type, m.Part)
  732. }
  733. //-------------------------------------
  734. type VoteMessage struct {
  735. ValidatorIndex uint
  736. Vote *types.Vote
  737. }
  738. func (m *VoteMessage) TypeByte() byte { return msgTypeVote }
  739. func (m *VoteMessage) String() string {
  740. return fmt.Sprintf("[Vote VI:%v V:%v]", m.ValidatorIndex, m.Vote)
  741. }
  742. //-------------------------------------
  743. type HasVoteMessage struct {
  744. Height uint
  745. Round uint
  746. Type byte
  747. Index uint
  748. }
  749. func (m *HasVoteMessage) TypeByte() byte { return msgTypeHasVote }
  750. func (m *HasVoteMessage) String() string {
  751. return fmt.Sprintf("[HasVote %v/%v T:%X]", m.Height, m.Round, m.Type)
  752. }