You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

971 lines
28 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. PeerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. )
  26. //-----------------------------------------------------------------------------
  27. // The reactor's underlying ConsensusState may change state at any time.
  28. // We atomically copy the RoundState struct before using it.
  29. type ConsensusReactor struct {
  30. sw *p2p.Switch
  31. running uint32
  32. quit chan struct{}
  33. blockStore *bc.BlockStore
  34. conS *ConsensusState
  35. // if fast sync is running we don't really do anything
  36. sync bool
  37. evsw events.Fireable
  38. }
  39. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor {
  40. conR := &ConsensusReactor{
  41. quit: make(chan struct{}),
  42. blockStore: blockStore,
  43. conS: consensusState,
  44. sync: sync,
  45. }
  46. return conR
  47. }
  48. // Implements Reactor
  49. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  50. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  51. log.Info("Starting ConsensusReactor")
  52. conR.sw = sw
  53. if !conR.sync {
  54. conR.conS.Start()
  55. }
  56. go conR.broadcastNewRoundStepRoutine()
  57. }
  58. }
  59. // Implements Reactor
  60. func (conR *ConsensusReactor) Stop() {
  61. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  62. log.Info("Stopping ConsensusReactor")
  63. conR.conS.Stop()
  64. close(conR.quit)
  65. }
  66. }
  67. func (conR *ConsensusReactor) IsRunning() bool {
  68. return atomic.LoadUint32(&conR.running) == 1
  69. }
  70. // Implements Reactor
  71. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  72. // TODO optimize
  73. return []*p2p.ChannelDescriptor{
  74. &p2p.ChannelDescriptor{
  75. Id: StateChannel,
  76. Priority: 5,
  77. SendQueueCapacity: 100,
  78. },
  79. &p2p.ChannelDescriptor{
  80. Id: DataChannel,
  81. Priority: 5,
  82. SendQueueCapacity: 2,
  83. },
  84. &p2p.ChannelDescriptor{
  85. Id: VoteChannel,
  86. Priority: 5,
  87. SendQueueCapacity: 40,
  88. },
  89. }
  90. }
  91. // Implements Reactor
  92. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  93. if !conR.IsRunning() {
  94. return
  95. }
  96. // Create peerState for peer
  97. peerState := NewPeerState(peer)
  98. peer.Data.Set(PeerStateKey, peerState)
  99. // Begin gossip routines for this peer.
  100. go conR.gossipDataRoutine(peer, peerState)
  101. go conR.gossipVotesRoutine(peer, peerState)
  102. // Send our state to peer.
  103. conR.sendNewRoundStepMessage(peer)
  104. }
  105. // Implements Reactor
  106. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. if !conR.IsRunning() {
  108. return
  109. }
  110. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  111. }
  112. // Implements Reactor
  113. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  114. if conR.sync || !conR.IsRunning() {
  115. return
  116. }
  117. // Get round state
  118. rs := conR.conS.GetRoundState()
  119. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  120. _, msg_, err := DecodeMessage(msgBytes)
  121. if err != nil {
  122. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  123. return
  124. }
  125. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_) //, "bytes", msgBytes)
  126. switch chId {
  127. case StateChannel:
  128. switch msg := msg_.(type) {
  129. case *NewRoundStepMessage:
  130. ps.ApplyNewRoundStepMessage(msg, rs)
  131. case *CommitStepMessage:
  132. ps.ApplyCommitStepMessage(msg)
  133. case *HasVoteMessage:
  134. ps.ApplyHasVoteMessage(msg)
  135. default:
  136. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  137. }
  138. case DataChannel:
  139. switch msg := msg_.(type) {
  140. case *ProposalMessage:
  141. ps.SetHasProposal(msg.Proposal)
  142. err = conR.conS.SetProposal(msg.Proposal)
  143. case *ProposalPOLMessage:
  144. ps.ApplyProposalPOLMessage(msg)
  145. case *BlockPartMessage:
  146. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  147. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
  148. default:
  149. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  150. }
  151. case VoteChannel:
  152. switch msg := msg_.(type) {
  153. case *VoteMessage:
  154. vote := msg.Vote
  155. var validators *sm.ValidatorSet
  156. if rs.Height == vote.Height {
  157. validators = rs.Validators
  158. } else if rs.Height == vote.Height+1 {
  159. if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
  160. return // Wrong height, not a LastCommit straggler commit.
  161. }
  162. validators = rs.LastValidators
  163. } else {
  164. return // Wrong height. Not necessarily a bad peer.
  165. }
  166. // We have vote/validators. Height may not be rs.Height
  167. address, _ := validators.GetByIndex(msg.ValidatorIndex)
  168. added, index, err := conR.conS.AddVote(address, vote, peer.Key)
  169. if err != nil {
  170. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  171. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  172. log.Warn("Found conflicting vote. Publish evidence")
  173. evidenceTx := &types.DupeoutTx{
  174. Address: address,
  175. VoteA: *errDupe.VoteA,
  176. VoteB: *errDupe.VoteB,
  177. }
  178. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  179. } else {
  180. // Probably an invalid signature. Bad peer.
  181. log.Warn("Error attempting to add vote", "error", err)
  182. // TODO: punish peer
  183. }
  184. }
  185. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size(), nil)
  186. ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size(), nil)
  187. ps.SetHasVote(vote, index)
  188. if added {
  189. // If rs.Height == vote.Height && rs.Round < vote.Round,
  190. // the peer is sending us CatchupCommit precommits.
  191. // We could make note of this and help filter in broadcastHasVoteMessage().
  192. conR.broadcastHasVoteMessage(vote, index)
  193. }
  194. default:
  195. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  196. }
  197. default:
  198. log.Warn(Fmt("Unknown channel %X", chId))
  199. }
  200. if err != nil {
  201. log.Warn("Error in Receive()", "error", err)
  202. }
  203. }
  204. // Broadcasts HasVoteMessage to peers that care.
  205. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  206. msg := &HasVoteMessage{
  207. Height: vote.Height,
  208. Round: vote.Round,
  209. Type: vote.Type,
  210. Index: index,
  211. }
  212. conR.sw.Broadcast(StateChannel, msg)
  213. /*
  214. // TODO: Make this broadcast more selective.
  215. for _, peer := range conR.sw.Peers().List() {
  216. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  217. prs := ps.GetRoundState()
  218. if prs.Height == vote.Height {
  219. // TODO: Also filter on round?
  220. peer.TrySend(StateChannel, msg)
  221. } else {
  222. // Height doesn't match
  223. // TODO: check a field, maybe CatchupCommitRound?
  224. // TODO: But that requires changing the struct field comment.
  225. }
  226. }
  227. */
  228. }
  229. // Sets our private validator account for signing votes.
  230. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  231. conR.conS.SetPrivValidator(priv)
  232. }
  233. // Switch from the fast sync to the consensus:
  234. // reset the state, turn off fast sync, start the consensus-state-machine
  235. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  236. conR.conS.updateToState(state, false)
  237. conR.sync = false
  238. conR.conS.Start()
  239. }
  240. // implements events.Eventable
  241. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  242. conR.evsw = evsw
  243. conR.conS.SetFireable(evsw)
  244. }
  245. //--------------------------------------
  246. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  247. nrsMsg = &NewRoundStepMessage{
  248. Height: rs.Height,
  249. Round: rs.Round,
  250. Step: rs.Step,
  251. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  252. LastCommitRound: rs.LastCommit.Round(),
  253. }
  254. if rs.Step == RoundStepCommit {
  255. csMsg = &CommitStepMessage{
  256. Height: rs.Height,
  257. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  258. BlockParts: rs.ProposalBlockParts.BitArray(),
  259. }
  260. }
  261. return
  262. }
  263. // Listens for changes to the ConsensusState.Step by pulling
  264. // on conR.conS.NewStepCh().
  265. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  266. for {
  267. // Get RoundState with new Step or quit.
  268. var rs *RoundState
  269. select {
  270. case rs = <-conR.conS.NewStepCh():
  271. case <-conR.quit:
  272. return
  273. }
  274. nrsMsg, csMsg := makeRoundStepMessages(rs)
  275. if nrsMsg != nil {
  276. conR.sw.Broadcast(StateChannel, nrsMsg)
  277. }
  278. if csMsg != nil {
  279. conR.sw.Broadcast(StateChannel, csMsg)
  280. }
  281. }
  282. }
  283. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  284. rs := conR.conS.GetRoundState()
  285. nrsMsg, csMsg := makeRoundStepMessages(rs)
  286. if nrsMsg != nil {
  287. peer.Send(StateChannel, nrsMsg)
  288. }
  289. if csMsg != nil {
  290. peer.Send(StateChannel, csMsg)
  291. }
  292. }
  293. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  294. OUTER_LOOP:
  295. for {
  296. // Manage disconnects from self or peer.
  297. if !peer.IsRunning() || !conR.IsRunning() {
  298. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  299. return
  300. }
  301. rs := conR.conS.GetRoundState()
  302. prs := ps.GetRoundState()
  303. // Send proposal Block parts?
  304. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  305. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  306. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  307. part := rs.ProposalBlockParts.GetPart(index)
  308. msg := &BlockPartMessage{
  309. Height: rs.Height, // This tells peer that this part applies to us.
  310. Round: rs.Round, // This tells peer that this part applies to us.
  311. Part: part,
  312. }
  313. peer.Send(DataChannel, msg)
  314. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  315. continue OUTER_LOOP
  316. }
  317. }
  318. // If the peer is on a previous height, help catch up.
  319. if (0 < prs.Height) && (prs.Height < rs.Height) {
  320. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  321. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  322. // Ensure that the peer's PartSetHeader is correct
  323. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  324. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  325. log.Debug("Peer ProposalBlockPartsHeader mismatch, sleeping",
  326. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  327. time.Sleep(peerGossipSleepDuration)
  328. continue OUTER_LOOP
  329. }
  330. // Load the part
  331. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  332. if part == nil {
  333. log.Warn("Could not load part", "index", index,
  334. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  335. time.Sleep(peerGossipSleepDuration)
  336. continue OUTER_LOOP
  337. }
  338. // Send the part
  339. msg := &BlockPartMessage{
  340. Height: prs.Height, // Not our height, so it doesn't matter.
  341. Round: prs.Round, // Not our height, so it doesn't matter.
  342. Part: part,
  343. }
  344. peer.Send(DataChannel, msg)
  345. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  346. continue OUTER_LOOP
  347. } else {
  348. //log.Debug("No parts to send in catch-up, sleeping")
  349. time.Sleep(peerGossipSleepDuration)
  350. continue OUTER_LOOP
  351. }
  352. }
  353. // If height and round don't match, sleep.
  354. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  355. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  356. time.Sleep(peerGossipSleepDuration)
  357. continue OUTER_LOOP
  358. }
  359. // By here, height and round match.
  360. // Proposal block parts were already matched and sent if any were wanted.
  361. // (These can match on hash so the round doesn't matter)
  362. // Now consider sending other things, like the Proposal itself.
  363. // Send Proposal && ProposalPOL BitArray?
  364. if rs.Proposal != nil && !prs.Proposal {
  365. // Proposal
  366. {
  367. msg := &ProposalMessage{Proposal: rs.Proposal}
  368. peer.Send(DataChannel, msg)
  369. ps.SetHasProposal(rs.Proposal)
  370. }
  371. // ProposalPOL.
  372. // Peer must receive ProposalMessage first.
  373. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  374. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  375. if 0 <= rs.Proposal.POLRound {
  376. msg := &ProposalPOLMessage{
  377. Height: rs.Height,
  378. ProposalPOLRound: rs.Proposal.POLRound,
  379. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  380. }
  381. peer.Send(DataChannel, msg)
  382. }
  383. continue OUTER_LOOP
  384. }
  385. // Nothing to do. Sleep.
  386. time.Sleep(peerGossipSleepDuration)
  387. continue OUTER_LOOP
  388. }
  389. }
  390. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  391. // Simple hack to throttle logs upon sleep.
  392. var sleeping = 0
  393. OUTER_LOOP:
  394. for {
  395. // Manage disconnects from self or peer.
  396. if !peer.IsRunning() || !conR.IsRunning() {
  397. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  398. return
  399. }
  400. rs := conR.conS.GetRoundState()
  401. prs := ps.GetRoundState()
  402. switch sleeping {
  403. case 1: // First sleep
  404. sleeping = 2
  405. case 2: // No more sleep
  406. sleeping = 0
  407. }
  408. // prsVoteSet: a pointer to a VoteSet field of prs.
  409. // Returns true when useful work was done.
  410. trySendVote := func(voteSet *VoteSet, prsVoteSet **BitArray) (sent bool) {
  411. if voteSet == nil {
  412. return false
  413. }
  414. if *prsVoteSet == nil {
  415. ps.EnsureVoteBitArrays(voteSet.Height(), voteSet.Size(), prs)
  416. // We could return true here (useful work was done)
  417. // or, we can continue since prsVoteSet is no longer nil.
  418. if *prsVoteSet == nil {
  419. panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays")
  420. }
  421. }
  422. // TODO: give priority to our vote.
  423. if index, ok := voteSet.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok {
  424. vote := voteSet.GetByIndex(index)
  425. msg := &VoteMessage{index, vote}
  426. peer.Send(VoteChannel, msg)
  427. ps.SetHasVote(vote, index)
  428. return true
  429. }
  430. return false
  431. }
  432. // prsVoteSet: a pointer to a VoteSet field of prs.
  433. // Returns true when useful work was done.
  434. trySendPrecommitFromValidation := func(validation *types.Validation, prsVoteSet **BitArray) (sent bool) {
  435. if validation == nil {
  436. return false
  437. } else if *prsVoteSet == nil {
  438. ps.EnsureVoteBitArrays(validation.Height(), len(validation.Precommits), prs)
  439. // We could return true here (useful work was done)
  440. // or, we can continue since prsVoteSet is no longer nil.
  441. if *prsVoteSet == nil {
  442. panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays")
  443. }
  444. }
  445. if index, ok := validation.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok {
  446. precommit := validation.Precommits[index]
  447. log.Debug("Picked precommit to send", "index", index, "precommit", precommit)
  448. msg := &VoteMessage{index, precommit}
  449. peer.Send(VoteChannel, msg)
  450. ps.SetHasVote(precommit, index)
  451. return true
  452. }
  453. return false
  454. }
  455. // If height matches, then send LastCommit, Prevotes, Precommits.
  456. if rs.Height == prs.Height {
  457. // If there are lastCommits to send...
  458. if prs.Step == RoundStepNewHeight {
  459. if trySendVote(rs.LastCommit, &prs.LastCommit) {
  460. continue OUTER_LOOP
  461. }
  462. }
  463. // If there are prevotes to send...
  464. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  465. if trySendVote(rs.Votes.Prevotes(rs.Round), &prs.Prevotes) {
  466. continue OUTER_LOOP
  467. }
  468. }
  469. // If there are precommits to send...
  470. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  471. if trySendVote(rs.Votes.Precommits(rs.Round), &prs.Precommits) {
  472. continue OUTER_LOOP
  473. }
  474. }
  475. // If there are prevotes to send for the last round...
  476. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrevote {
  477. if trySendVote(rs.Votes.Prevotes(prs.Round), &prs.Prevotes) {
  478. continue OUTER_LOOP
  479. }
  480. }
  481. // If there are precommits to send for the last round...
  482. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrecommit {
  483. if trySendVote(rs.Votes.Precommits(prs.Round), &prs.Precommits) {
  484. continue OUTER_LOOP
  485. }
  486. }
  487. // If there are POLPrevotes to send...
  488. if 0 <= prs.ProposalPOLRound {
  489. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  490. if trySendVote(polPrevotes, &prs.ProposalPOL) {
  491. continue OUTER_LOOP
  492. }
  493. }
  494. }
  495. }
  496. // Special catchup logic.
  497. // If peer is lagging by height 1, send LastCommit.
  498. if prs.Height != 0 && prs.Height == rs.Height-1 {
  499. if prs.Round == rs.LastCommit.Round() {
  500. // NOTE: We prefer to use prs.Precommits if
  501. // prs.Round matches prs.CatchupCommitRound.
  502. if trySendVote(rs.LastCommit, &prs.Precommits) {
  503. continue OUTER_LOOP
  504. }
  505. } else {
  506. ps.EnsureCatchupCommitRound(prs.Height, rs.LastCommit.Round())
  507. if trySendVote(rs.LastCommit, &prs.CatchupCommit) {
  508. continue OUTER_LOOP
  509. }
  510. }
  511. }
  512. // Catchup logic
  513. // If peer is lagging by more than 1, send Validation.
  514. if prs.Height != 0 && prs.Height <= rs.Height-2 {
  515. // Load the block validation for prs.Height,
  516. // which contains precommit signatures for prs.Height.
  517. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  518. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  519. ps.EnsureCatchupCommitRound(prs.Height, validation.Round())
  520. if trySendPrecommitFromValidation(validation, &prs.CatchupCommit) {
  521. continue OUTER_LOOP
  522. }
  523. }
  524. if sleeping == 0 {
  525. // We sent nothing. Sleep...
  526. sleeping = 1
  527. log.Debug("No votes to send, sleeping", "peer", peer,
  528. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  529. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  530. } else if sleeping == 2 {
  531. // Continued sleep...
  532. sleeping = 1
  533. }
  534. time.Sleep(peerGossipSleepDuration)
  535. continue OUTER_LOOP
  536. }
  537. }
  538. //-----------------------------------------------------------------------------
  539. // Read only when returned by PeerState.GetRoundState().
  540. type PeerRoundState struct {
  541. Height int // Height peer is at
  542. Round int // Round peer is at
  543. Step RoundStepType // Step peer is at
  544. StartTime time.Time // Estimated start of round 0 at this height
  545. Proposal bool // True if peer has proposal for this round
  546. ProposalBlockPartsHeader types.PartSetHeader //
  547. ProposalBlockParts *BitArray //
  548. ProposalPOLRound int // -1 if none
  549. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  550. Prevotes *BitArray // All votes peer has for this round
  551. Precommits *BitArray // All precommits peer has for this round
  552. LastCommitRound int // Round of commit for last height.
  553. LastCommit *BitArray // All commit precommits of commit for last height.
  554. CatchupCommitRound int // Round that we believe commit round is.
  555. CatchupCommit *BitArray // All commit precommits peer has for this height
  556. }
  557. //-----------------------------------------------------------------------------
  558. var (
  559. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  560. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  561. )
  562. type PeerState struct {
  563. Key string
  564. mtx sync.Mutex
  565. PeerRoundState
  566. }
  567. func NewPeerState(peer *p2p.Peer) *PeerState {
  568. return &PeerState{Key: peer.Key}
  569. }
  570. // Returns an atomic snapshot of the PeerRoundState.
  571. // There's no point in mutating it since it won't change PeerState.
  572. func (ps *PeerState) GetRoundState() *PeerRoundState {
  573. ps.mtx.Lock()
  574. defer ps.mtx.Unlock()
  575. prs := ps.PeerRoundState // copy
  576. return &prs
  577. }
  578. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  579. ps.mtx.Lock()
  580. defer ps.mtx.Unlock()
  581. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  582. return
  583. }
  584. if ps.Proposal {
  585. return
  586. }
  587. ps.Proposal = true
  588. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  589. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  590. ps.ProposalPOLRound = proposal.POLRound
  591. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  592. }
  593. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  594. ps.mtx.Lock()
  595. defer ps.mtx.Unlock()
  596. if ps.Height != height || ps.Round != round {
  597. return
  598. }
  599. ps.ProposalBlockParts.SetIndex(index, true)
  600. }
  601. // prs: If given, will also update this PeerRoundState copy.
  602. // NOTE: It's important to make sure that numValidators actually matches
  603. // what the node sees as the number of validators for height.
  604. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int, prs *PeerRoundState) {
  605. ps.mtx.Lock()
  606. defer ps.mtx.Unlock()
  607. if ps.Height == height {
  608. if ps.Prevotes == nil {
  609. ps.Prevotes = NewBitArray(numValidators)
  610. }
  611. if ps.Precommits == nil {
  612. ps.Precommits = NewBitArray(numValidators)
  613. }
  614. if ps.CatchupCommit == nil {
  615. ps.CatchupCommit = NewBitArray(numValidators)
  616. }
  617. if ps.ProposalPOL == nil {
  618. ps.ProposalPOL = NewBitArray(numValidators)
  619. }
  620. } else if ps.Height == height+1 {
  621. if ps.LastCommit == nil {
  622. ps.LastCommit = NewBitArray(numValidators)
  623. }
  624. }
  625. // Also, update prs if given.
  626. if prs != nil {
  627. prs.Prevotes = ps.Prevotes
  628. prs.Precommits = ps.Precommits
  629. prs.CatchupCommit = ps.CatchupCommit
  630. prs.ProposalPOL = ps.ProposalPOL
  631. prs.LastCommit = ps.LastCommit
  632. }
  633. }
  634. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  635. ps.mtx.Lock()
  636. defer ps.mtx.Unlock()
  637. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  638. }
  639. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  640. if ps.Height == height+1 && ps.LastCommitRound == round && type_ == types.VoteTypePrecommit {
  641. // Special case for LastCommit.
  642. ps.LastCommit.SetIndex(index, true)
  643. log.Debug("setHasVote", "LastCommit", ps.LastCommit, "index", index)
  644. return
  645. } else if ps.Height != height {
  646. // Does not apply.
  647. return
  648. }
  649. // By here, ps.Height is height.
  650. switch type_ {
  651. case types.VoteTypePrevote:
  652. if ps.ProposalPOLRound == round {
  653. ps.ProposalPOL.SetIndex(index, true)
  654. }
  655. ps.Prevotes.SetIndex(index, true)
  656. log.Debug("SetHasVote", "peer", ps.Key, "prevotes", ps.Prevotes, "index", index)
  657. case types.VoteTypePrecommit:
  658. if ps.CatchupCommitRound == round {
  659. ps.CatchupCommit.SetIndex(index, true)
  660. }
  661. ps.Precommits.SetIndex(index, true)
  662. log.Debug("SetHasVote", "peer", ps.Key, "precommits", ps.Precommits, "index", index)
  663. default:
  664. panic("Invalid vote type")
  665. }
  666. }
  667. // NOTE: 'round' is what we know to be the commit round for height.
  668. func (ps *PeerState) EnsureCatchupCommitRound(height, round int) {
  669. ps.mtx.Lock()
  670. defer ps.mtx.Unlock()
  671. if ps.Height != height {
  672. return
  673. }
  674. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  675. panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  676. }
  677. if ps.CatchupCommitRound == round {
  678. return // Nothing to do!
  679. }
  680. ps.CatchupCommitRound = round
  681. ps.CatchupCommit = nil
  682. }
  683. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  684. ps.mtx.Lock()
  685. defer ps.mtx.Unlock()
  686. // Just remember these values.
  687. psHeight := ps.Height
  688. psRound := ps.Round
  689. //psStep := ps.Step
  690. psCatchupCommitRound := ps.CatchupCommitRound
  691. psCatchupCommit := ps.CatchupCommit
  692. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  693. ps.Height = msg.Height
  694. ps.Round = msg.Round
  695. ps.Step = msg.Step
  696. ps.StartTime = startTime
  697. if psHeight != msg.Height || psRound != msg.Round {
  698. ps.Proposal = false
  699. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  700. ps.ProposalBlockParts = nil
  701. ps.ProposalPOLRound = -1
  702. ps.ProposalPOL = nil
  703. // We'll update the BitArray capacity later.
  704. ps.Prevotes = nil
  705. ps.Precommits = nil
  706. }
  707. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  708. // Peer caught up to CatchupCommitRound.
  709. // Preserve psCatchupCommit!
  710. // NOTE: We prefer to use prs.Precommits if
  711. // pr.Round matches pr.CatchupCommitRound.
  712. ps.Precommits = psCatchupCommit
  713. }
  714. if psHeight != msg.Height {
  715. // Shift Precommits to LastCommit.
  716. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  717. ps.LastCommitRound = msg.LastCommitRound
  718. ps.LastCommit = ps.Precommits
  719. } else {
  720. ps.LastCommitRound = msg.LastCommitRound
  721. ps.LastCommit = nil
  722. }
  723. // We'll update the BitArray capacity later.
  724. ps.CatchupCommitRound = -1
  725. ps.CatchupCommit = nil
  726. }
  727. }
  728. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  729. ps.mtx.Lock()
  730. defer ps.mtx.Unlock()
  731. if ps.Height != msg.Height {
  732. return
  733. }
  734. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  735. ps.ProposalBlockParts = msg.BlockParts
  736. }
  737. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  738. ps.mtx.Lock()
  739. defer ps.mtx.Unlock()
  740. if ps.Height != msg.Height {
  741. return
  742. }
  743. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  744. }
  745. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  746. ps.mtx.Lock()
  747. defer ps.mtx.Unlock()
  748. if ps.Height != msg.Height {
  749. return
  750. }
  751. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  752. return
  753. }
  754. // TODO: Merge onto existing ps.ProposalPOL?
  755. // We might have sent some prevotes in the meantime.
  756. ps.ProposalPOL = msg.ProposalPOL
  757. }
  758. //-----------------------------------------------------------------------------
  759. // Messages
  760. const (
  761. msgTypeNewRoundStep = byte(0x01)
  762. msgTypeCommitStep = byte(0x02)
  763. msgTypeProposal = byte(0x11)
  764. msgTypeProposalPOL = byte(0x12)
  765. msgTypeBlockPart = byte(0x13) // both block & POL
  766. msgTypeVote = byte(0x14)
  767. msgTypeHasVote = byte(0x15)
  768. )
  769. type ConsensusMessage interface{}
  770. var _ = binary.RegisterInterface(
  771. struct{ ConsensusMessage }{},
  772. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  773. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  774. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  775. binary.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  776. binary.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  777. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  778. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  779. )
  780. // TODO: check for unnecessary extra bytes at the end.
  781. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  782. msgType = bz[0]
  783. n := new(int64)
  784. r := bytes.NewReader(bz)
  785. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  786. return
  787. }
  788. //-------------------------------------
  789. // For every height/round/step transition
  790. type NewRoundStepMessage struct {
  791. Height int
  792. Round int
  793. Step RoundStepType
  794. SecondsSinceStartTime int
  795. LastCommitRound int
  796. }
  797. func (m *NewRoundStepMessage) String() string {
  798. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  799. m.Height, m.Round, m.Step, m.LastCommitRound)
  800. }
  801. //-------------------------------------
  802. type CommitStepMessage struct {
  803. Height int
  804. BlockPartsHeader types.PartSetHeader
  805. BlockParts *BitArray
  806. }
  807. func (m *CommitStepMessage) String() string {
  808. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  809. }
  810. //-------------------------------------
  811. type ProposalMessage struct {
  812. Proposal *Proposal
  813. }
  814. func (m *ProposalMessage) String() string {
  815. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  816. }
  817. //-------------------------------------
  818. type ProposalPOLMessage struct {
  819. Height int
  820. ProposalPOLRound int
  821. ProposalPOL *BitArray
  822. }
  823. func (m *ProposalPOLMessage) String() string {
  824. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  825. }
  826. //-------------------------------------
  827. type BlockPartMessage struct {
  828. Height int
  829. Round int
  830. Part *types.Part
  831. }
  832. func (m *BlockPartMessage) String() string {
  833. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  834. }
  835. //-------------------------------------
  836. type VoteMessage struct {
  837. ValidatorIndex int
  838. Vote *types.Vote
  839. }
  840. func (m *VoteMessage) String() string {
  841. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  842. }
  843. //-------------------------------------
  844. type HasVoteMessage struct {
  845. Height int
  846. Round int
  847. Type byte
  848. Index int
  849. }
  850. func (m *HasVoteMessage) String() string {
  851. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  852. }