You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1027 lines
30 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. bc "github.com/tendermint/tendermint/blockchain"
  10. . "github.com/tendermint/tendermint/common"
  11. "github.com/tendermint/tendermint/events"
  12. "github.com/tendermint/tendermint/p2p"
  13. sm "github.com/tendermint/tendermint/state"
  14. "github.com/tendermint/tendermint/types"
  15. "github.com/tendermint/tendermint/wire"
  16. )
  17. const (
  18. StateChannel = byte(0x20)
  19. DataChannel = byte(0x21)
  20. VoteChannel = byte(0x22)
  21. PeerStateKey = "ConsensusReactor.peerState"
  22. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  23. )
  24. //-----------------------------------------------------------------------------
  25. type ConsensusReactor struct {
  26. p2p.BaseReactor
  27. blockStore *bc.BlockStore
  28. conS *ConsensusState
  29. fastSync bool
  30. evsw events.Fireable
  31. }
  32. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
  33. conR := &ConsensusReactor{
  34. blockStore: blockStore,
  35. conS: consensusState,
  36. fastSync: fastSync,
  37. }
  38. conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR)
  39. return conR
  40. }
  41. func (conR *ConsensusReactor) OnStart() error {
  42. log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
  43. conR.BaseReactor.OnStart()
  44. if !conR.fastSync {
  45. _, err := conR.conS.Start()
  46. if err != nil {
  47. return err
  48. }
  49. }
  50. go conR.broadcastNewRoundStepRoutine()
  51. return nil
  52. }
  53. func (conR *ConsensusReactor) OnStop() {
  54. conR.BaseReactor.OnStop()
  55. conR.conS.Stop()
  56. }
  57. // Switch from the fast_sync to the consensus:
  58. // reset the state, turn off fast_sync, start the consensus-state-machine
  59. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  60. log.Notice("SwitchToConsensus")
  61. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  62. // broadcast a NewRoundStepMessage.
  63. conR.conS.updateToState(state, false)
  64. conR.fastSync = false
  65. conR.conS.Start()
  66. }
  67. // Implements Reactor
  68. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  69. // TODO optimize
  70. return []*p2p.ChannelDescriptor{
  71. &p2p.ChannelDescriptor{
  72. ID: StateChannel,
  73. Priority: 5,
  74. SendQueueCapacity: 100,
  75. },
  76. &p2p.ChannelDescriptor{
  77. ID: DataChannel,
  78. Priority: 5,
  79. SendQueueCapacity: 2,
  80. },
  81. &p2p.ChannelDescriptor{
  82. ID: VoteChannel,
  83. Priority: 5,
  84. SendQueueCapacity: 40,
  85. },
  86. }
  87. }
  88. // Implements Reactor
  89. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  90. if !conR.IsRunning() {
  91. return
  92. }
  93. // Create peerState for peer
  94. peerState := NewPeerState(peer)
  95. peer.Data.Set(PeerStateKey, peerState)
  96. // Begin gossip routines for this peer.
  97. go conR.gossipDataRoutine(peer, peerState)
  98. go conR.gossipVotesRoutine(peer, peerState)
  99. // Send our state to peer.
  100. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  101. if !conR.fastSync {
  102. conR.sendNewRoundStepMessage(peer)
  103. }
  104. }
  105. // Implements Reactor
  106. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. if !conR.IsRunning() {
  108. return
  109. }
  110. // TODO
  111. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  112. }
  113. // Implements Reactor
  114. // NOTE: We process these messages even when we're fast_syncing.
  115. func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
  116. if !conR.IsRunning() {
  117. log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes)
  118. return
  119. }
  120. // Get round state
  121. rs := conR.conS.GetRoundState()
  122. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  123. _, msg, err := DecodeMessage(msgBytes)
  124. if err != nil {
  125. log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
  126. return
  127. }
  128. log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg, "rsHeight", rs.Height)
  129. switch chID {
  130. case StateChannel:
  131. switch msg := msg.(type) {
  132. case *NewRoundStepMessage:
  133. ps.ApplyNewRoundStepMessage(msg, rs)
  134. case *CommitStepMessage:
  135. ps.ApplyCommitStepMessage(msg)
  136. case *HasVoteMessage:
  137. ps.ApplyHasVoteMessage(msg)
  138. default:
  139. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  140. }
  141. case DataChannel:
  142. if conR.fastSync {
  143. log.Warn("Ignoring message received during fastSync", "msg", msg)
  144. return
  145. }
  146. switch msg := msg.(type) {
  147. case *ProposalMessage:
  148. ps.SetHasProposal(msg.Proposal)
  149. err = conR.conS.SetProposal(msg.Proposal)
  150. case *ProposalPOLMessage:
  151. ps.ApplyProposalPOLMessage(msg)
  152. case *BlockPartMessage:
  153. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  154. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
  155. default:
  156. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  157. }
  158. case VoteChannel:
  159. if conR.fastSync {
  160. log.Warn("Ignoring message received during fastSync", "msg", msg)
  161. return
  162. }
  163. switch msg := msg.(type) {
  164. case *VoteMessage:
  165. vote := msg.Vote
  166. var validators *types.ValidatorSet
  167. if rs.Height == vote.Height {
  168. validators = rs.Validators
  169. } else if rs.Height == vote.Height+1 {
  170. if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
  171. return // Wrong height, not a LastCommit straggler commit.
  172. }
  173. validators = rs.LastValidators
  174. } else {
  175. return // Wrong height. Not necessarily a bad peer.
  176. }
  177. // We have vote/validators. Height may not be rs.Height
  178. address, _ := validators.GetByIndex(msg.ValidatorIndex)
  179. added, index, err := conR.conS.AddVote(address, vote, peer.Key)
  180. if err != nil {
  181. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  182. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  183. log.Warn("Found conflicting vote. Publish evidence")
  184. evidenceTx := &types.DupeoutTx{
  185. Address: address,
  186. VoteA: *errDupe.VoteA,
  187. VoteB: *errDupe.VoteB,
  188. }
  189. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  190. } else {
  191. // Probably an invalid signature. Bad peer.
  192. log.Warn("Error attempting to add vote", "error", err)
  193. // TODO: punish peer
  194. }
  195. }
  196. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  197. ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size())
  198. ps.SetHasVote(vote, index)
  199. if added {
  200. // If rs.Height == vote.Height && rs.Round < vote.Round,
  201. // the peer is sending us CatchupCommit precommits.
  202. // We could make note of this and help filter in broadcastHasVoteMessage().
  203. conR.broadcastHasVoteMessage(vote, index)
  204. }
  205. default:
  206. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  207. }
  208. default:
  209. log.Warn(Fmt("Unknown channel %X", chID))
  210. }
  211. if err != nil {
  212. log.Warn("Error in Receive()", "error", err)
  213. }
  214. }
  215. // Broadcasts HasVoteMessage to peers that care.
  216. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  217. msg := &HasVoteMessage{
  218. Height: vote.Height,
  219. Round: vote.Round,
  220. Type: vote.Type,
  221. Index: index,
  222. }
  223. conR.Switch.Broadcast(StateChannel, msg)
  224. /*
  225. // TODO: Make this broadcast more selective.
  226. for _, peer := range conR.Switch.Peers().List() {
  227. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  228. prs := ps.GetRoundState()
  229. if prs.Height == vote.Height {
  230. // TODO: Also filter on round?
  231. peer.TrySend(StateChannel, msg)
  232. } else {
  233. // Height doesn't match
  234. // TODO: check a field, maybe CatchupCommitRound?
  235. // TODO: But that requires changing the struct field comment.
  236. }
  237. }
  238. */
  239. }
  240. // Sets our private validator account for signing votes.
  241. func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
  242. conR.conS.SetPrivValidator(priv)
  243. }
  244. // implements events.Eventable
  245. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  246. conR.evsw = evsw
  247. conR.conS.SetFireable(evsw)
  248. }
  249. //--------------------------------------
  250. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  251. nrsMsg = &NewRoundStepMessage{
  252. Height: rs.Height,
  253. Round: rs.Round,
  254. Step: rs.Step,
  255. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  256. LastCommitRound: rs.LastCommit.Round(),
  257. }
  258. if rs.Step == RoundStepCommit {
  259. csMsg = &CommitStepMessage{
  260. Height: rs.Height,
  261. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  262. BlockParts: rs.ProposalBlockParts.BitArray(),
  263. }
  264. }
  265. return
  266. }
  267. // Listens for changes to the ConsensusState.Step by pulling
  268. // on conR.conS.NewStepCh().
  269. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  270. for {
  271. // Get RoundState with new Step or quit.
  272. var rs *RoundState
  273. select {
  274. case rs = <-conR.conS.NewStepCh():
  275. case <-conR.Quit:
  276. return
  277. }
  278. nrsMsg, csMsg := makeRoundStepMessages(rs)
  279. if nrsMsg != nil {
  280. conR.Switch.Broadcast(StateChannel, nrsMsg)
  281. }
  282. if csMsg != nil {
  283. conR.Switch.Broadcast(StateChannel, csMsg)
  284. }
  285. }
  286. }
  287. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  288. rs := conR.conS.GetRoundState()
  289. nrsMsg, csMsg := makeRoundStepMessages(rs)
  290. if nrsMsg != nil {
  291. peer.Send(StateChannel, nrsMsg)
  292. }
  293. if csMsg != nil {
  294. peer.Send(StateChannel, csMsg)
  295. }
  296. }
  297. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  298. log := log.New("peer", peer.Key)
  299. OUTER_LOOP:
  300. for {
  301. // Manage disconnects from self or peer.
  302. if !peer.IsRunning() || !conR.IsRunning() {
  303. log.Notice(Fmt("Stopping gossipDataRoutine for %v.", peer))
  304. return
  305. }
  306. rs := conR.conS.GetRoundState()
  307. prs := ps.GetRoundState()
  308. // Send proposal Block parts?
  309. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  310. //log.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  311. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  312. part := rs.ProposalBlockParts.GetPart(index)
  313. msg := &BlockPartMessage{
  314. Height: rs.Height, // This tells peer that this part applies to us.
  315. Round: rs.Round, // This tells peer that this part applies to us.
  316. Part: part,
  317. }
  318. peer.Send(DataChannel, msg)
  319. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  320. continue OUTER_LOOP
  321. }
  322. }
  323. // If the peer is on a previous height, help catch up.
  324. if (0 < prs.Height) && (prs.Height < rs.Height) {
  325. //log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  326. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  327. // Ensure that the peer's PartSetHeader is correct
  328. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  329. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  330. log.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  331. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  332. time.Sleep(peerGossipSleepDuration)
  333. continue OUTER_LOOP
  334. }
  335. // Load the part
  336. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  337. if part == nil {
  338. log.Warn("Could not load part", "index", index,
  339. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  340. time.Sleep(peerGossipSleepDuration)
  341. continue OUTER_LOOP
  342. }
  343. // Send the part
  344. msg := &BlockPartMessage{
  345. Height: prs.Height, // Not our height, so it doesn't matter.
  346. Round: prs.Round, // Not our height, so it doesn't matter.
  347. Part: part,
  348. }
  349. peer.Send(DataChannel, msg)
  350. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  351. continue OUTER_LOOP
  352. } else {
  353. //log.Info("No parts to send in catch-up, sleeping")
  354. time.Sleep(peerGossipSleepDuration)
  355. continue OUTER_LOOP
  356. }
  357. }
  358. // If height and round don't match, sleep.
  359. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  360. //log.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  361. time.Sleep(peerGossipSleepDuration)
  362. continue OUTER_LOOP
  363. }
  364. // By here, height and round match.
  365. // Proposal block parts were already matched and sent if any were wanted.
  366. // (These can match on hash so the round doesn't matter)
  367. // Now consider sending other things, like the Proposal itself.
  368. // Send Proposal && ProposalPOL BitArray?
  369. if rs.Proposal != nil && !prs.Proposal {
  370. // Proposal
  371. {
  372. msg := &ProposalMessage{Proposal: rs.Proposal}
  373. peer.Send(DataChannel, msg)
  374. ps.SetHasProposal(rs.Proposal)
  375. }
  376. // ProposalPOL.
  377. // Peer must receive ProposalMessage first.
  378. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  379. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  380. if 0 <= rs.Proposal.POLRound {
  381. msg := &ProposalPOLMessage{
  382. Height: rs.Height,
  383. ProposalPOLRound: rs.Proposal.POLRound,
  384. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  385. }
  386. peer.Send(DataChannel, msg)
  387. }
  388. continue OUTER_LOOP
  389. }
  390. // Nothing to do. Sleep.
  391. time.Sleep(peerGossipSleepDuration)
  392. continue OUTER_LOOP
  393. }
  394. }
  395. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  396. log := log.New("peer", peer.Key)
  397. // Simple hack to throttle logs upon sleep.
  398. var sleeping = 0
  399. OUTER_LOOP:
  400. for {
  401. // Manage disconnects from self or peer.
  402. if !peer.IsRunning() || !conR.IsRunning() {
  403. log.Notice(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  404. return
  405. }
  406. rs := conR.conS.GetRoundState()
  407. prs := ps.GetRoundState()
  408. switch sleeping {
  409. case 1: // First sleep
  410. sleeping = 2
  411. case 2: // No more sleep
  412. sleeping = 0
  413. }
  414. log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  415. "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  416. // If height matches, then send LastCommit, Prevotes, Precommits.
  417. if rs.Height == prs.Height {
  418. // If there are lastCommits to send...
  419. if prs.Step == RoundStepNewHeight {
  420. if ps.PickSendVote(rs.LastCommit) {
  421. log.Info("Picked rs.LastCommit to send")
  422. continue OUTER_LOOP
  423. }
  424. }
  425. // If there are prevotes to send...
  426. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  427. if ps.PickSendVote(rs.Votes.Prevotes(rs.Round)) {
  428. log.Info("Picked rs.Prevotes(rs.Round) to send")
  429. continue OUTER_LOOP
  430. }
  431. }
  432. // If there are precommits to send...
  433. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  434. if ps.PickSendVote(rs.Votes.Precommits(rs.Round)) {
  435. log.Info("Picked rs.Precommits(rs.Round) to send")
  436. continue OUTER_LOOP
  437. }
  438. }
  439. // If there are prevotes to send for the last round...
  440. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrevote {
  441. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  442. log.Info("Picked rs.Prevotes(prs.Round) to send")
  443. continue OUTER_LOOP
  444. }
  445. }
  446. // If there are precommits to send for the last round...
  447. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrecommit {
  448. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  449. log.Info("Picked rs.Precommits(prs.Round) to send")
  450. continue OUTER_LOOP
  451. }
  452. }
  453. // If there are POLPrevotes to send...
  454. if 0 <= prs.ProposalPOLRound {
  455. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  456. if ps.PickSendVote(polPrevotes) {
  457. log.Info("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  458. continue OUTER_LOOP
  459. }
  460. }
  461. }
  462. }
  463. // Special catchup logic.
  464. // If peer is lagging by height 1, send LastCommit.
  465. if prs.Height != 0 && rs.Height == prs.Height+1 {
  466. if ps.PickSendVote(rs.LastCommit) {
  467. log.Info("Picked rs.LastCommit to send")
  468. continue OUTER_LOOP
  469. }
  470. }
  471. // Catchup logic
  472. // If peer is lagging by more than 1, send Validation.
  473. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  474. // Load the block validation for prs.Height,
  475. // which contains precommit signatures for prs.Height.
  476. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  477. log.Info("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  478. if ps.PickSendVote(validation) {
  479. log.Info("Picked Catchup validation to send")
  480. continue OUTER_LOOP
  481. }
  482. }
  483. if sleeping == 0 {
  484. // We sent nothing. Sleep...
  485. sleeping = 1
  486. log.Info("No votes to send, sleeping", "peer", peer,
  487. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  488. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  489. } else if sleeping == 2 {
  490. // Continued sleep...
  491. sleeping = 1
  492. }
  493. time.Sleep(peerGossipSleepDuration)
  494. continue OUTER_LOOP
  495. }
  496. }
  497. //-----------------------------------------------------------------------------
  498. // Read only when returned by PeerState.GetRoundState().
  499. type PeerRoundState struct {
  500. Height int // Height peer is at
  501. Round int // Round peer is at, -1 if unknown.
  502. Step RoundStepType // Step peer is at
  503. StartTime time.Time // Estimated start of round 0 at this height
  504. Proposal bool // True if peer has proposal for this round
  505. ProposalBlockPartsHeader types.PartSetHeader //
  506. ProposalBlockParts *BitArray //
  507. ProposalPOLRound int // Proposal's POL round. -1 if none.
  508. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  509. Prevotes *BitArray // All votes peer has for this round
  510. Precommits *BitArray // All precommits peer has for this round
  511. LastCommitRound int // Round of commit for last height. -1 if none.
  512. LastCommit *BitArray // All commit precommits of commit for last height.
  513. CatchupCommitRound int // Round that we believe commit round is. -1 if none.
  514. CatchupCommit *BitArray // All commit precommits peer has for this height
  515. }
  516. //-----------------------------------------------------------------------------
  517. var (
  518. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  519. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  520. )
  521. type PeerState struct {
  522. Peer *p2p.Peer
  523. mtx sync.Mutex
  524. PeerRoundState
  525. }
  526. func NewPeerState(peer *p2p.Peer) *PeerState {
  527. return &PeerState{
  528. Peer: peer,
  529. PeerRoundState: PeerRoundState{
  530. Round: -1,
  531. ProposalPOLRound: -1,
  532. LastCommitRound: -1,
  533. CatchupCommitRound: -1,
  534. },
  535. }
  536. }
  537. // Returns an atomic snapshot of the PeerRoundState.
  538. // There's no point in mutating it since it won't change PeerState.
  539. func (ps *PeerState) GetRoundState() *PeerRoundState {
  540. ps.mtx.Lock()
  541. defer ps.mtx.Unlock()
  542. prs := ps.PeerRoundState // copy
  543. return &prs
  544. }
  545. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  546. ps.mtx.Lock()
  547. defer ps.mtx.Unlock()
  548. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  549. return
  550. }
  551. if ps.Proposal {
  552. return
  553. }
  554. ps.Proposal = true
  555. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  556. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  557. ps.ProposalPOLRound = proposal.POLRound
  558. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  559. }
  560. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  561. ps.mtx.Lock()
  562. defer ps.mtx.Unlock()
  563. if ps.Height != height || ps.Round != round {
  564. return
  565. }
  566. ps.ProposalBlockParts.SetIndex(index, true)
  567. }
  568. // Convenience function to send vote to peer.
  569. // Returns true if vote was sent.
  570. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  571. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  572. msg := &VoteMessage{index, vote}
  573. ps.Peer.Send(VoteChannel, msg)
  574. return true
  575. }
  576. return false
  577. }
  578. // votes: Must be the correct Size() for the Height().
  579. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  580. ps.mtx.Lock()
  581. defer ps.mtx.Unlock()
  582. if votes.Size() == 0 {
  583. return 0, nil, false
  584. }
  585. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  586. // Lazily set data using 'votes'.
  587. if votes.IsCommit() {
  588. ps.ensureCatchupCommitRound(height, round, size)
  589. }
  590. ps.ensureVoteBitArrays(height, size)
  591. psVotes := ps.getVoteBitArray(height, round, type_)
  592. if psVotes == nil {
  593. return 0, nil, false // Not something worth sending
  594. }
  595. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  596. ps.setHasVote(height, round, type_, index)
  597. return index, votes.GetByIndex(index), true
  598. }
  599. return 0, nil, false
  600. }
  601. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  602. if ps.Height == height {
  603. if ps.Round == round {
  604. switch type_ {
  605. case types.VoteTypePrevote:
  606. return ps.Prevotes
  607. case types.VoteTypePrecommit:
  608. return ps.Precommits
  609. default:
  610. PanicSanity(Fmt("Unexpected vote type %X", type_))
  611. }
  612. }
  613. if ps.CatchupCommitRound == round {
  614. switch type_ {
  615. case types.VoteTypePrevote:
  616. return nil
  617. case types.VoteTypePrecommit:
  618. return ps.CatchupCommit
  619. default:
  620. PanicSanity(Fmt("Unexpected vote type %X", type_))
  621. }
  622. }
  623. return nil
  624. }
  625. if ps.Height == height+1 {
  626. if ps.LastCommitRound == round {
  627. switch type_ {
  628. case types.VoteTypePrevote:
  629. return nil
  630. case types.VoteTypePrecommit:
  631. return ps.LastCommit
  632. default:
  633. PanicSanity(Fmt("Unexpected vote type %X", type_))
  634. }
  635. }
  636. return nil
  637. }
  638. return nil
  639. }
  640. // NOTE: 'round' is what we know to be the commit round for height.
  641. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  642. if ps.Height != height {
  643. return
  644. }
  645. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  646. PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  647. }
  648. if ps.CatchupCommitRound == round {
  649. return // Nothing to do!
  650. }
  651. ps.CatchupCommitRound = round
  652. if round == ps.Round {
  653. ps.CatchupCommit = ps.Precommits
  654. } else {
  655. ps.CatchupCommit = NewBitArray(numValidators)
  656. }
  657. }
  658. // NOTE: It's important to make sure that numValidators actually matches
  659. // what the node sees as the number of validators for height.
  660. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  661. ps.mtx.Lock()
  662. defer ps.mtx.Unlock()
  663. ps.ensureVoteBitArrays(height, numValidators)
  664. }
  665. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  666. if ps.Height == height {
  667. if ps.Prevotes == nil {
  668. ps.Prevotes = NewBitArray(numValidators)
  669. }
  670. if ps.Precommits == nil {
  671. ps.Precommits = NewBitArray(numValidators)
  672. }
  673. if ps.CatchupCommit == nil {
  674. ps.CatchupCommit = NewBitArray(numValidators)
  675. }
  676. if ps.ProposalPOL == nil {
  677. ps.ProposalPOL = NewBitArray(numValidators)
  678. }
  679. } else if ps.Height == height+1 {
  680. if ps.LastCommit == nil {
  681. ps.LastCommit = NewBitArray(numValidators)
  682. }
  683. }
  684. }
  685. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  686. ps.mtx.Lock()
  687. defer ps.mtx.Unlock()
  688. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  689. }
  690. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  691. log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
  692. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  693. PanicSanity("Invalid vote type")
  694. }
  695. if ps.Height == height {
  696. if ps.Round == round {
  697. switch type_ {
  698. case types.VoteTypePrevote:
  699. ps.Prevotes.SetIndex(index, true)
  700. log.Info("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  701. case types.VoteTypePrecommit:
  702. ps.Precommits.SetIndex(index, true)
  703. log.Info("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  704. }
  705. } else if ps.CatchupCommitRound == round {
  706. switch type_ {
  707. case types.VoteTypePrevote:
  708. case types.VoteTypePrecommit:
  709. ps.CatchupCommit.SetIndex(index, true)
  710. log.Info("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  711. }
  712. } else if ps.ProposalPOLRound == round {
  713. switch type_ {
  714. case types.VoteTypePrevote:
  715. ps.ProposalPOL.SetIndex(index, true)
  716. log.Info("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  717. case types.VoteTypePrecommit:
  718. }
  719. }
  720. } else if ps.Height == height+1 {
  721. if ps.LastCommitRound == round {
  722. switch type_ {
  723. case types.VoteTypePrevote:
  724. case types.VoteTypePrecommit:
  725. ps.LastCommit.SetIndex(index, true)
  726. log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  727. }
  728. }
  729. } else {
  730. // Does not apply.
  731. }
  732. }
  733. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  734. ps.mtx.Lock()
  735. defer ps.mtx.Unlock()
  736. // Ignore duplicate messages.
  737. if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step {
  738. return
  739. }
  740. // Just remember these values.
  741. psHeight := ps.Height
  742. psRound := ps.Round
  743. //psStep := ps.Step
  744. psCatchupCommitRound := ps.CatchupCommitRound
  745. psCatchupCommit := ps.CatchupCommit
  746. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  747. ps.Height = msg.Height
  748. ps.Round = msg.Round
  749. ps.Step = msg.Step
  750. ps.StartTime = startTime
  751. if psHeight != msg.Height || psRound != msg.Round {
  752. ps.Proposal = false
  753. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  754. ps.ProposalBlockParts = nil
  755. ps.ProposalPOLRound = -1
  756. ps.ProposalPOL = nil
  757. // We'll update the BitArray capacity later.
  758. ps.Prevotes = nil
  759. ps.Precommits = nil
  760. }
  761. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  762. // Peer caught up to CatchupCommitRound.
  763. // Preserve psCatchupCommit!
  764. // NOTE: We prefer to use prs.Precommits if
  765. // pr.Round matches pr.CatchupCommitRound.
  766. ps.Precommits = psCatchupCommit
  767. }
  768. if psHeight != msg.Height {
  769. // Shift Precommits to LastCommit.
  770. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  771. ps.LastCommitRound = msg.LastCommitRound
  772. ps.LastCommit = ps.Precommits
  773. } else {
  774. ps.LastCommitRound = msg.LastCommitRound
  775. ps.LastCommit = nil
  776. }
  777. // We'll update the BitArray capacity later.
  778. ps.CatchupCommitRound = -1
  779. ps.CatchupCommit = nil
  780. }
  781. }
  782. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  783. ps.mtx.Lock()
  784. defer ps.mtx.Unlock()
  785. if ps.Height != msg.Height {
  786. return
  787. }
  788. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  789. ps.ProposalBlockParts = msg.BlockParts
  790. }
  791. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  792. ps.mtx.Lock()
  793. defer ps.mtx.Unlock()
  794. if ps.Height != msg.Height {
  795. return
  796. }
  797. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  798. }
  799. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  800. ps.mtx.Lock()
  801. defer ps.mtx.Unlock()
  802. if ps.Height != msg.Height {
  803. return
  804. }
  805. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  806. return
  807. }
  808. // TODO: Merge onto existing ps.ProposalPOL?
  809. // We might have sent some prevotes in the meantime.
  810. ps.ProposalPOL = msg.ProposalPOL
  811. }
  812. //-----------------------------------------------------------------------------
  813. // Messages
  814. const (
  815. msgTypeNewRoundStep = byte(0x01)
  816. msgTypeCommitStep = byte(0x02)
  817. msgTypeProposal = byte(0x11)
  818. msgTypeProposalPOL = byte(0x12)
  819. msgTypeBlockPart = byte(0x13) // both block & POL
  820. msgTypeVote = byte(0x14)
  821. msgTypeHasVote = byte(0x15)
  822. )
  823. type ConsensusMessage interface{}
  824. var _ = wire.RegisterInterface(
  825. struct{ ConsensusMessage }{},
  826. wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  827. wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  828. wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  829. wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  830. wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  831. wire.ConcreteType{&VoteMessage{}, msgTypeVote},
  832. wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  833. )
  834. // TODO: check for unnecessary extra bytes at the end.
  835. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  836. msgType = bz[0]
  837. n := new(int64)
  838. r := bytes.NewReader(bz)
  839. msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  840. return
  841. }
  842. //-------------------------------------
  843. // For every height/round/step transition
  844. type NewRoundStepMessage struct {
  845. Height int
  846. Round int
  847. Step RoundStepType
  848. SecondsSinceStartTime int
  849. LastCommitRound int
  850. }
  851. func (m *NewRoundStepMessage) String() string {
  852. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  853. m.Height, m.Round, m.Step, m.LastCommitRound)
  854. }
  855. //-------------------------------------
  856. type CommitStepMessage struct {
  857. Height int
  858. BlockPartsHeader types.PartSetHeader
  859. BlockParts *BitArray
  860. }
  861. func (m *CommitStepMessage) String() string {
  862. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  863. }
  864. //-------------------------------------
  865. type ProposalMessage struct {
  866. Proposal *types.Proposal
  867. }
  868. func (m *ProposalMessage) String() string {
  869. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  870. }
  871. //-------------------------------------
  872. type ProposalPOLMessage struct {
  873. Height int
  874. ProposalPOLRound int
  875. ProposalPOL *BitArray
  876. }
  877. func (m *ProposalPOLMessage) String() string {
  878. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  879. }
  880. //-------------------------------------
  881. type BlockPartMessage struct {
  882. Height int
  883. Round int
  884. Part *types.Part
  885. }
  886. func (m *BlockPartMessage) String() string {
  887. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  888. }
  889. //-------------------------------------
  890. type VoteMessage struct {
  891. ValidatorIndex int
  892. Vote *types.Vote
  893. }
  894. func (m *VoteMessage) String() string {
  895. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  896. }
  897. //-------------------------------------
  898. type HasVoteMessage struct {
  899. Height int
  900. Round int
  901. Type byte
  902. Index int
  903. }
  904. func (m *HasVoteMessage) String() string {
  905. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  906. }