You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

900 lines
25 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. PeerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. )
  26. //-----------------------------------------------------------------------------
  27. // The reactor's underlying ConsensusState may change state at any time.
  28. // We atomically copy the RoundState struct before using it.
  29. type ConsensusReactor struct {
  30. sw *p2p.Switch
  31. running uint32
  32. quit chan struct{}
  33. blockStore *bc.BlockStore
  34. conS *ConsensusState
  35. // if fast sync is running we don't really do anything
  36. sync bool
  37. evsw events.Fireable
  38. }
  39. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor {
  40. conR := &ConsensusReactor{
  41. quit: make(chan struct{}),
  42. blockStore: blockStore,
  43. conS: consensusState,
  44. sync: sync,
  45. }
  46. return conR
  47. }
  48. // Implements Reactor
  49. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  50. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  51. log.Info("Starting ConsensusReactor")
  52. conR.sw = sw
  53. if !conR.sync {
  54. conR.conS.Start()
  55. }
  56. go conR.broadcastNewRoundStepRoutine()
  57. }
  58. }
  59. // Implements Reactor
  60. func (conR *ConsensusReactor) Stop() {
  61. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  62. log.Info("Stopping ConsensusReactor")
  63. conR.conS.Stop()
  64. close(conR.quit)
  65. }
  66. }
  67. func (conR *ConsensusReactor) IsRunning() bool {
  68. return atomic.LoadUint32(&conR.running) == 1
  69. }
  70. // Implements Reactor
  71. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  72. // TODO optimize
  73. return []*p2p.ChannelDescriptor{
  74. &p2p.ChannelDescriptor{
  75. Id: StateChannel,
  76. Priority: 5,
  77. SendQueueCapacity: 100,
  78. },
  79. &p2p.ChannelDescriptor{
  80. Id: DataChannel,
  81. Priority: 5,
  82. SendQueueCapacity: 2,
  83. },
  84. &p2p.ChannelDescriptor{
  85. Id: VoteChannel,
  86. Priority: 5,
  87. SendQueueCapacity: 40,
  88. },
  89. }
  90. }
  91. // Implements Reactor
  92. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  93. if !conR.IsRunning() {
  94. return
  95. }
  96. // Create peerState for peer
  97. peerState := NewPeerState(peer)
  98. peer.Data.Set(PeerStateKey, peerState)
  99. // Begin gossip routines for this peer.
  100. go conR.gossipDataRoutine(peer, peerState)
  101. go conR.gossipVotesRoutine(peer, peerState)
  102. // Send our state to peer.
  103. conR.sendNewRoundStepMessage(peer)
  104. }
  105. // Implements Reactor
  106. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. if !conR.IsRunning() {
  108. return
  109. }
  110. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  111. }
  112. // Implements Reactor
  113. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  114. if conR.sync || !conR.IsRunning() {
  115. return
  116. }
  117. // Get round state
  118. rs := conR.conS.GetRoundState()
  119. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  120. _, msg_, err := DecodeMessage(msgBytes)
  121. if err != nil {
  122. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  123. return
  124. }
  125. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_) //, "bytes", msgBytes)
  126. switch chId {
  127. case StateChannel:
  128. switch msg := msg_.(type) {
  129. case *NewRoundStepMessage:
  130. ps.ApplyNewRoundStepMessage(msg, rs)
  131. case *CommitStepMessage:
  132. ps.ApplyCommitStepMessage(msg)
  133. case *HasVoteMessage:
  134. ps.ApplyHasVoteMessage(msg)
  135. default:
  136. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  137. }
  138. case DataChannel:
  139. switch msg := msg_.(type) {
  140. case *ProposalMessage:
  141. ps.SetHasProposal(msg.Proposal)
  142. err = conR.conS.SetProposal(msg.Proposal)
  143. case *PartMessage:
  144. if msg.Type == partTypeProposalBlock {
  145. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  146. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  147. } else {
  148. log.Warn(Fmt("Unknown part type %v", msg.Type))
  149. }
  150. default:
  151. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  152. }
  153. case VoteChannel:
  154. switch msg := msg_.(type) {
  155. case *VoteMessage:
  156. vote := msg.Vote
  157. if rs.Height != vote.Height {
  158. if rs.Height == vote.Height+1 {
  159. if rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit {
  160. goto VOTE_PASS // *ducks*
  161. }
  162. }
  163. return // Wrong height. Not necessarily a bad peer.
  164. }
  165. VOTE_PASS:
  166. address, _ := rs.Validators.GetByIndex(msg.ValidatorIndex)
  167. added, index, err := conR.conS.AddVote(address, vote)
  168. if err != nil {
  169. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  170. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  171. log.Warn("Found conflicting vote. Publish evidence")
  172. evidenceTx := &types.DupeoutTx{
  173. Address: address,
  174. VoteA: *errDupe.VoteA,
  175. VoteB: *errDupe.VoteB,
  176. }
  177. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  178. } else {
  179. // Probably an invalid signature. Bad peer.
  180. log.Warn("Error attempting to add vote", "error", err)
  181. // TODO: punish peer
  182. }
  183. }
  184. // Initialize Prevotes/Precommits if needed
  185. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size(), _)
  186. ps.SetHasVote(vote, index)
  187. if added {
  188. msg := &HasVoteMessage{
  189. Height: vote.Height,
  190. Round: vote.Round,
  191. Type: vote.Type,
  192. Index: index,
  193. }
  194. conR.sw.Broadcast(StateChannel, msg)
  195. }
  196. default:
  197. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  198. }
  199. default:
  200. log.Warn(Fmt("Unknown channel %X", chId))
  201. }
  202. if err != nil {
  203. log.Warn("Error in Receive()", "error", err)
  204. }
  205. }
  206. // Sets our private validator account for signing votes.
  207. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  208. conR.conS.SetPrivValidator(priv)
  209. }
  210. // Switch from the fast sync to the consensus:
  211. // reset the state, turn off fast sync, start the consensus-state-machine
  212. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  213. conR.conS.updateToState(state, false)
  214. conR.sync = false
  215. conR.conS.Start()
  216. }
  217. // implements events.Eventable
  218. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  219. conR.evsw = evsw
  220. conR.conS.SetFireable(evsw)
  221. }
  222. //--------------------------------------
  223. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  224. // Get seconds since beginning of height.
  225. timeElapsed := time.Now().Sub(rs.StartTime)
  226. // Broadcast NewRoundStepMessage
  227. nrsMsg = &NewRoundStepMessage{
  228. Height: rs.Height,
  229. Round: rs.Round,
  230. Step: rs.Step,
  231. SecondsSinceStartTime: uint(timeElapsed.Seconds()),
  232. LastCommitRound: rs.LastCommit.Round(),
  233. }
  234. // If the step is commit, then also broadcast a CommitStepMessage.
  235. if rs.Step == RoundStepCommit {
  236. csMsg = &CommitStepMessage{
  237. Height: rs.Height,
  238. BlockParts: rs.ProposalBlockParts.Header(),
  239. BlockBitArray: rs.ProposalBlockParts.BitArray(),
  240. }
  241. }
  242. return
  243. }
  244. // Listens for changes to the ConsensusState.Step by pulling
  245. // on conR.conS.NewStepCh().
  246. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  247. for {
  248. // Get RoundState with new Step or quit.
  249. var rs *RoundState
  250. select {
  251. case rs = <-conR.conS.NewStepCh():
  252. case <-conR.quit:
  253. return
  254. }
  255. nrsMsg, csMsg := makeRoundStepMessages(rs)
  256. if nrsMsg != nil {
  257. conR.sw.Broadcast(StateChannel, nrsMsg)
  258. }
  259. if csMsg != nil {
  260. conR.sw.Broadcast(StateChannel, csMsg)
  261. }
  262. }
  263. }
  264. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  265. rs := conR.conS.GetRoundState()
  266. nrsMsg, csMsg := makeRoundStepMessages(rs)
  267. if nrsMsg != nil {
  268. peer.Send(StateChannel, nrsMsg)
  269. }
  270. if csMsg != nil {
  271. peer.Send(StateChannel, csMsg)
  272. }
  273. }
  274. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  275. OUTER_LOOP:
  276. for {
  277. // Manage disconnects from self or peer.
  278. if !peer.IsRunning() || !conR.IsRunning() {
  279. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  280. return
  281. }
  282. rs := conR.conS.GetRoundState()
  283. prs := ps.GetRoundState()
  284. // Send proposal Block parts?
  285. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
  286. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  287. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
  288. part := rs.ProposalBlockParts.GetPart(index)
  289. msg := &PartMessage{
  290. Height: rs.Height,
  291. Round: rs.Round,
  292. Type: partTypeProposalBlock,
  293. Part: part,
  294. }
  295. peer.Send(DataChannel, msg)
  296. ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
  297. continue OUTER_LOOP
  298. }
  299. }
  300. // If the peer is on a previous height, help catch up.
  301. if 0 < prs.Height && prs.Height < rs.Height {
  302. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
  303. if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
  304. // Ensure that the peer's PartSetHeader is correct
  305. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  306. if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
  307. log.Debug("Peer ProposalBlockParts mismatch, sleeping",
  308. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  309. time.Sleep(peerGossipSleepDuration)
  310. continue OUTER_LOOP
  311. }
  312. // Load the part
  313. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  314. if part == nil {
  315. log.Warn("Could not load part", "index", index,
  316. "peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts)
  317. time.Sleep(peerGossipSleepDuration)
  318. continue OUTER_LOOP
  319. }
  320. // Send the part
  321. msg := &PartMessage{
  322. Height: prs.Height,
  323. Round: prs.Round,
  324. Type: partTypeProposalBlock,
  325. Part: part,
  326. }
  327. peer.Send(DataChannel, msg)
  328. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  329. continue OUTER_LOOP
  330. } else {
  331. //log.Debug("No parts to send in catch-up, sleeping")
  332. time.Sleep(peerGossipSleepDuration)
  333. continue OUTER_LOOP
  334. }
  335. }
  336. // If height and round don't match, sleep.
  337. if rs.Height != prs.Height || rs.Round != prs.Round {
  338. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  339. time.Sleep(peerGossipSleepDuration)
  340. continue OUTER_LOOP
  341. }
  342. // Send proposal?
  343. if rs.Proposal != nil && !prs.Proposal {
  344. msg := &ProposalMessage{Proposal: rs.Proposal}
  345. peer.Send(DataChannel, msg)
  346. ps.SetHasProposal(rs.Proposal)
  347. continue OUTER_LOOP
  348. }
  349. // Nothing to do. Sleep.
  350. time.Sleep(peerGossipSleepDuration)
  351. continue OUTER_LOOP
  352. }
  353. }
  354. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  355. // Simple hack to throttle logs upon sleep.
  356. var sleeping = 0
  357. OUTER_LOOP:
  358. for {
  359. // Manage disconnects from self or peer.
  360. if !peer.IsRunning() || !conR.IsRunning() {
  361. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  362. return
  363. }
  364. rs := conR.conS.GetRoundState()
  365. prs := ps.GetRoundState()
  366. switch sleeping {
  367. case 1: // First sleep
  368. sleeping = 2
  369. case 2: // No more sleep
  370. sleeping = 0
  371. }
  372. // prsVoteSet: a pointer to a VoteSet field of prs.
  373. // Returns true when useful work was done.
  374. trySendVote := func(voteSet *VoteSet, prsVoteSet **BitArray) (sent bool) {
  375. if voteSet == nil {
  376. return false
  377. }
  378. if *prsVoteSet == nil {
  379. ps.EnsureVoteBitArrays(voteSet.Height(), voteSet.Size(), prs)
  380. // We could return true here (useful work was done)
  381. // or, we can continue since prsVoteSet is no longer nil.
  382. if *prsVoteSet == nil {
  383. panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays")
  384. }
  385. }
  386. // TODO: give priority to our vote.
  387. if index, ok := voteSet.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok {
  388. vote := voteSet.GetByIndex(index)
  389. msg := &VoteMessage{index, vote}
  390. peer.Send(VoteChannel, msg)
  391. ps.SetHasVote(vote, index)
  392. return true
  393. }
  394. return false
  395. }
  396. // prsVoteSet: a pointer to a VoteSet field of prs.
  397. // Returns true when useful work was done.
  398. trySendPrecommitFromValidation := func(validation *types.Validation, prsVoteSet **BitArray) (sent bool) {
  399. if validation == nil {
  400. return false
  401. } else if *prsVoteSet == nil {
  402. ps.EnsureVoteBitArrays(validation.Height(), uint(len(validation.Precommits)), prs)
  403. // We could return true here (useful work was done)
  404. // or, we can continue since prsVoteSet is no longer nil.
  405. if *prsVoteSet == nil {
  406. panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays")
  407. }
  408. }
  409. if index, ok := validation.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok {
  410. precommit := validation.Precommits[index]
  411. log.Debug("Picked precommit to send", "index", index, "precommit", precommit)
  412. msg := &VoteMessage{index, precommit}
  413. peer.Send(VoteChannel, msg)
  414. ps.SetHasVote(precommit, index)
  415. return true
  416. }
  417. return false
  418. }
  419. // If height matches, then send LastCommit, Prevotes, Precommits.
  420. if rs.Height == prs.Height {
  421. // If there are lastCommits to send...
  422. if prs.Step == RoundStepNewHeight {
  423. if trySendVote(rs.LastCommit, prs.LastCommit) {
  424. continue OUTER_LOOP
  425. }
  426. }
  427. // If there are prevotes to send...
  428. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  429. if trySendVote(rs.Prevotes, prs.Prevotes) {
  430. continue OUTER_LOOP
  431. }
  432. }
  433. // If there are precommits to send...
  434. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  435. if trySendVote(rs.Precommits, prs.Precommits) {
  436. continue OUTER_LOOP
  437. }
  438. }
  439. }
  440. // Special catchup logic.
  441. // If peer is lagging by height 1, send LastCommit.
  442. if prs.Height != 0 && prs.Height == rs.Height-1 {
  443. if prs.Round == rs.LastCommit.Round() {
  444. if trySendVote(rs.LastCommit, prs.Precommits) {
  445. continue OUTER_LOOP
  446. // XXX CONTONUE
  447. }
  448. } else {
  449. ps.SetCatchupCommitRound(prs.Height, rs.LastCommit.Round())
  450. ps.EnsureVoteBitArrays(prs.Height, rs.LastCommit.Size(), prs)
  451. if trySendVote(rs.LastCommit, prs.CatchupCommit) {
  452. continue OUTER_LOOP
  453. }
  454. }
  455. }
  456. // Catchup logic
  457. // If peer is lagging by more than 1, send Validation.
  458. if prs.Height != 0 && prs.Height <= rs.Height-2 {
  459. // Load the block validation for prs.Height,
  460. // which contains precommit signatures for prs.Height.
  461. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  462. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  463. // Peer's CommitRound should be -1 or equal to the validation's precommit rounds.
  464. // If not, warn.
  465. if prs.CommitRound == -1 {
  466. ps.SetCommitRound(prs.Height, validation.Round())
  467. continue OUTER_LOOP // Get prs := ps.GetRoundState() again.
  468. } else if prs.CommitRound != validation.Round() {
  469. log.Warn("Peer's CommitRound during catchup not equal to commit round",
  470. "height", prs.Height, "validation", validation, "prs.CommitRound", prs.CommitRound)
  471. } else if trySendPrecommitFromValidation(validation, prs.Commit) {
  472. continue OUTER_LOOP
  473. }
  474. }
  475. if sleeping == 0 {
  476. // We sent nothing. Sleep...
  477. sleeping = 1
  478. log.Debug("No votes to send, sleeping", "peer", peer,
  479. "localPV", rs.Prevotes.BitArray(), "peerPV", prs.Prevotes,
  480. "localPC", rs.Precommits.BitArray(), "peerPC", prs.Precommits)
  481. } else if sleeping == 2 {
  482. // Continued sleep...
  483. sleeping = 1
  484. }
  485. time.Sleep(peerGossipSleepDuration)
  486. continue OUTER_LOOP
  487. }
  488. }
  489. //-----------------------------------------------------------------------------
  490. // Read only when returned by PeerState.GetRoundState().
  491. type PeerRoundState struct {
  492. Height uint // Height peer is at
  493. Round uint // Round peer is at
  494. Step RoundStepType // Step peer is at
  495. StartTime time.Time // Estimated start of round 0 at this height
  496. Proposal bool // True if peer has proposal for this round
  497. ProposalBlockParts types.PartSetHeader //
  498. ProposalBlockBitArray *BitArray // True bit -> has part
  499. ProposalPOLParts types.PartSetHeader //
  500. ProposalPOLBitArray *BitArray // True bit -> has part
  501. Prevotes *BitArray // All votes peer has for this round
  502. Precommits *BitArray // All precommits peer has for this round
  503. LastCommitRound uint // Round of commit for last height.
  504. LastCommit *BitArray // All commit precommits of commit for last height.
  505. // If peer is leading in height, the round that peer believes commit round is.
  506. // If peer is lagging in height, the round that we believe commit round is.
  507. CatchupCommitRound int
  508. CatchupCommit *BitArray // All commit precommits peer has for this height
  509. }
  510. //-----------------------------------------------------------------------------
  511. var (
  512. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  513. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  514. )
  515. type PeerState struct {
  516. Key string
  517. mtx sync.Mutex
  518. PeerRoundState
  519. }
  520. func NewPeerState(peer *p2p.Peer) *PeerState {
  521. return &PeerState{Key: peer.Key}
  522. }
  523. // Returns an atomic snapshot of the PeerRoundState.
  524. // There's no point in mutating it since it won't change PeerState.
  525. func (ps *PeerState) GetRoundState() *PeerRoundState {
  526. ps.mtx.Lock()
  527. defer ps.mtx.Unlock()
  528. prs := ps.PeerRoundState // copy
  529. return &prs
  530. }
  531. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  532. ps.mtx.Lock()
  533. defer ps.mtx.Unlock()
  534. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  535. return
  536. }
  537. if ps.Proposal {
  538. return
  539. }
  540. ps.Proposal = true
  541. ps.ProposalBlockParts = proposal.BlockParts
  542. ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
  543. ps.ProposalPOLParts = proposal.POLParts
  544. ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
  545. }
  546. func (ps *PeerState) SetHasProposalBlockPart(height uint, round uint, index uint) {
  547. ps.mtx.Lock()
  548. defer ps.mtx.Unlock()
  549. if ps.Height != height || ps.Round != round {
  550. return
  551. }
  552. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  553. }
  554. func (ps *PeerState) SetHasProposalPOLPart(height uint, round uint, index uint) {
  555. ps.mtx.Lock()
  556. defer ps.mtx.Unlock()
  557. if ps.Height != height || ps.Round != round {
  558. return
  559. }
  560. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  561. }
  562. // prs: If given, will also update this PeerRoundState copy.
  563. func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint, prs *PeerRoundState) {
  564. ps.mtx.Lock()
  565. defer ps.mtx.Unlock()
  566. if ps.Height == height {
  567. if ps.Prevotes == nil {
  568. ps.Prevotes = NewBitArray(numValidators)
  569. }
  570. if ps.Precommits == nil {
  571. ps.Precommits = NewBitArray(numValidators)
  572. }
  573. if ps.CatchupCommit == nil {
  574. ps.CatchupCommit = NewBitArray(numValidators)
  575. }
  576. } else if ps.Height == height+1 {
  577. if ps.LastCommit == nil {
  578. ps.LastCommit = NewBitArray(numValidators)
  579. }
  580. }
  581. // Also, update prs if given.
  582. if prs != nil {
  583. prs.Prevotes = ps.Prevotes
  584. prs.Precommits = ps.Precommits
  585. prs.LastCommit = ps.LastCommit
  586. prs.CatchupCommit = ps.CatchupCommit
  587. }
  588. }
  589. func (ps *PeerState) SetHasVote(vote *types.Vote, index uint) {
  590. ps.mtx.Lock()
  591. defer ps.mtx.Unlock()
  592. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  593. }
  594. func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) {
  595. if ps.Height == height+1 && ps.LastCommitRound == round && type_ == types.VoteTypePrecommit {
  596. // Special case for LastCommit.
  597. ps.LastCommit.SetIndex(index, true)
  598. log.Debug("setHasVote", "LastCommit", ps.LastCommit, "index", index)
  599. return
  600. } else if ps.Height != height {
  601. // Does not apply.
  602. return
  603. }
  604. switch type_ {
  605. case types.VoteTypePrevote:
  606. ps.Prevotes.SetIndex(index, true)
  607. log.Debug("SetHasVote", "peer", ps.Key, "prevotes", ps.Prevotes, "index", index)
  608. case types.VoteTypePrecommit:
  609. if ps.CommitRound == round {
  610. ps.Commit.SetIndex(index, true)
  611. }
  612. ps.Precommits.SetIndex(index, true)
  613. log.Debug("SetHasVote", "peer", ps.Key, "precommits", ps.Precommits, "index", index)
  614. default:
  615. panic("Invalid vote type")
  616. }
  617. }
  618. func (ps *PeerState) SetCatchupCommitRound(height, round uint) {
  619. ps.mtx.Lock()
  620. defer ps.mtx.Unlock()
  621. if ps.Height != height {
  622. return
  623. }
  624. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  625. log.Warn("Conflicting CatchupCommitRound",
  626. "height", height,
  627. "orig", ps.CatchupCommitRound,
  628. "new", round,
  629. )
  630. // TODO think harder
  631. }
  632. ps.CatchupCommitRound = round
  633. ps.CatchupCommit = nil
  634. }
  635. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  636. ps.mtx.Lock()
  637. defer ps.mtx.Unlock()
  638. // Just remember these values.
  639. psHeight := ps.Height
  640. psRound := ps.Round
  641. //psStep := ps.Step
  642. psCatchupCommitRound := ps.CatchupCommitRound
  643. psCatchupCommit := ps.CatchupCommitRound
  644. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  645. ps.Height = msg.Height
  646. ps.Round = msg.Round
  647. ps.Step = msg.Step
  648. ps.StartTime = startTime
  649. if psHeight != msg.Height || psRound != msg.Round {
  650. ps.Proposal = false
  651. ps.ProposalBlockParts = types.PartSetHeader{}
  652. ps.ProposalBlockBitArray = nil
  653. ps.ProposalPOLParts = types.PartSetHeader{}
  654. ps.ProposalPOLBitArray = nil
  655. // We'll update the BitArray capacity later.
  656. ps.Prevotes = nil
  657. ps.Precommits = nil
  658. }
  659. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  660. // Peer caught up to CatchupCommitRound.
  661. ps.Precommits = psCatchupCommit
  662. }
  663. if psHeight != msg.Height {
  664. // Shift Precommits to LastCommit.
  665. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  666. ps.LastCommitRound = msg.LastCommitRound
  667. ps.LastCommit = ps.Precommits
  668. } else {
  669. ps.LastCommitRound = msg.LastCommitRound
  670. ps.LastCommit = nil
  671. }
  672. // We'll update the BitArray capacity later.
  673. ps.CatchupCommitRound = -1
  674. ps.CatchupCommit = nil
  675. }
  676. }
  677. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  678. ps.mtx.Lock()
  679. defer ps.mtx.Unlock()
  680. if ps.Height != msg.Height {
  681. return
  682. }
  683. ps.ProposalBlockParts = msg.BlockParts
  684. ps.ProposalBlockBitArray = msg.BlockBitArray
  685. }
  686. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  687. ps.mtx.Lock()
  688. defer ps.mtx.Unlock()
  689. if ps.Height != msg.Height {
  690. return
  691. }
  692. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  693. }
  694. //-----------------------------------------------------------------------------
  695. // Messages
  696. const (
  697. msgTypeNewRoundStep = byte(0x01)
  698. msgTypeCommitStep = byte(0x02)
  699. msgTypeProposal = byte(0x11)
  700. msgTypePart = byte(0x12) // both block & POL
  701. msgTypeVote = byte(0x13)
  702. msgTypeHasVote = byte(0x14)
  703. )
  704. type ConsensusMessage interface{}
  705. var _ = binary.RegisterInterface(
  706. struct{ ConsensusMessage }{},
  707. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  708. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  709. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  710. binary.ConcreteType{&PartMessage{}, msgTypePart},
  711. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  712. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  713. )
  714. // TODO: check for unnecessary extra bytes at the end.
  715. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  716. msgType = bz[0]
  717. n := new(int64)
  718. r := bytes.NewReader(bz)
  719. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  720. return
  721. }
  722. //-------------------------------------
  723. // For every height/round/step transition
  724. type NewRoundStepMessage struct {
  725. Height uint
  726. Round uint
  727. Step RoundStepType
  728. SecondsSinceStartTime uint
  729. LastCommitRound uint
  730. }
  731. func (m *NewRoundStepMessage) String() string {
  732. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  733. m.Height, m.Round, m.Step, m.LastCommitRound)
  734. }
  735. //-------------------------------------
  736. type CommitStepMessage struct {
  737. Height uint
  738. BlockParts types.PartSetHeader
  739. BlockBitArray *BitArray
  740. }
  741. func (m *CommitStepMessage) String() string {
  742. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockParts, m.BlockBitArray)
  743. }
  744. //-------------------------------------
  745. type ProposalMessage struct {
  746. Proposal *Proposal
  747. }
  748. func (m *ProposalMessage) String() string {
  749. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  750. }
  751. //-------------------------------------
  752. const (
  753. partTypeProposalBlock = byte(0x01)
  754. partTypeProposalPOL = byte(0x02)
  755. )
  756. type PartMessage struct {
  757. Height uint
  758. Round uint
  759. Type byte
  760. Part *types.Part
  761. }
  762. func (m *PartMessage) String() string {
  763. return fmt.Sprintf("[Part H:%v R:%v T:%X P:%v]", m.Height, m.Round, m.Type, m.Part)
  764. }
  765. //-------------------------------------
  766. type VoteMessage struct {
  767. ValidatorIndex uint
  768. Vote *types.Vote
  769. }
  770. func (m *VoteMessage) String() string {
  771. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  772. }
  773. //-------------------------------------
  774. type HasVoteMessage struct {
  775. Height uint
  776. Round uint
  777. Type byte
  778. Index uint
  779. }
  780. func (m *HasVoteMessage) String() string {
  781. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  782. }