You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1016 lines
29 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. "github.com/tendermint/tendermint/binary"
  10. bc "github.com/tendermint/tendermint/blockchain"
  11. . "github.com/tendermint/tendermint/common"
  12. . "github.com/tendermint/tendermint/consensus/types"
  13. "github.com/tendermint/tendermint/events"
  14. "github.com/tendermint/tendermint/p2p"
  15. sm "github.com/tendermint/tendermint/state"
  16. "github.com/tendermint/tendermint/types"
  17. )
  18. const (
  19. StateChannel = byte(0x20)
  20. DataChannel = byte(0x21)
  21. VoteChannel = byte(0x22)
  22. PeerStateKey = "ConsensusReactor.peerState"
  23. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  24. )
  25. //-----------------------------------------------------------------------------
  26. type ConsensusReactor struct {
  27. p2p.BaseReactor
  28. blockStore *bc.BlockStore
  29. conS *ConsensusState
  30. fastSync bool
  31. evsw events.Fireable
  32. }
  33. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
  34. conR := &ConsensusReactor{
  35. blockStore: blockStore,
  36. conS: consensusState,
  37. fastSync: fastSync,
  38. }
  39. conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR)
  40. return conR
  41. }
  42. func (conR *ConsensusReactor) OnStart() {
  43. log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
  44. conR.BaseReactor.OnStart()
  45. if !conR.fastSync {
  46. conR.conS.Start()
  47. }
  48. go conR.broadcastNewRoundStepRoutine()
  49. }
  50. func (conR *ConsensusReactor) OnStop() {
  51. conR.BaseReactor.OnStop()
  52. conR.conS.Stop()
  53. }
  54. // Switch from the fast_sync to the consensus:
  55. // reset the state, turn off fast_sync, start the consensus-state-machine
  56. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  57. log.Notice("SwitchToConsensus")
  58. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  59. // broadcast a NewRoundStepMessage.
  60. conR.conS.updateToState(state, false)
  61. conR.fastSync = false
  62. conR.conS.Start()
  63. }
  64. // Implements Reactor
  65. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  66. // TODO optimize
  67. return []*p2p.ChannelDescriptor{
  68. &p2p.ChannelDescriptor{
  69. Id: StateChannel,
  70. Priority: 5,
  71. SendQueueCapacity: 100,
  72. },
  73. &p2p.ChannelDescriptor{
  74. Id: DataChannel,
  75. Priority: 5,
  76. SendQueueCapacity: 2,
  77. },
  78. &p2p.ChannelDescriptor{
  79. Id: VoteChannel,
  80. Priority: 5,
  81. SendQueueCapacity: 40,
  82. },
  83. }
  84. }
  85. // Implements Reactor
  86. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  87. if !conR.IsRunning() {
  88. return
  89. }
  90. // Create peerState for peer
  91. peerState := NewPeerState(peer)
  92. peer.Data.Set(PeerStateKey, peerState)
  93. // Begin gossip routines for this peer.
  94. go conR.gossipDataRoutine(peer, peerState)
  95. go conR.gossipVotesRoutine(peer, peerState)
  96. // Send our state to peer.
  97. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  98. if !conR.fastSync {
  99. conR.sendNewRoundStepMessage(peer)
  100. }
  101. }
  102. // Implements Reactor
  103. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  104. if !conR.IsRunning() {
  105. return
  106. }
  107. // TODO
  108. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  109. }
  110. // Implements Reactor
  111. // NOTE: We process these messages even when we're fast_syncing.
  112. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  113. if !conR.IsRunning() {
  114. log.Debug("Receive", "channel", chId, "peer", peer, "bytes", msgBytes)
  115. return
  116. }
  117. // Get round state
  118. rs := conR.conS.GetRoundState()
  119. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  120. _, msg, err := DecodeMessage(msgBytes)
  121. if err != nil {
  122. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
  123. return
  124. }
  125. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg, "rsHeight", rs.Height)
  126. switch chId {
  127. case StateChannel:
  128. switch msg := msg.(type) {
  129. case *NewRoundStepMessage:
  130. ps.ApplyNewRoundStepMessage(msg, rs)
  131. case *CommitStepMessage:
  132. ps.ApplyCommitStepMessage(msg)
  133. case *HasVoteMessage:
  134. ps.ApplyHasVoteMessage(msg)
  135. default:
  136. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  137. }
  138. case DataChannel:
  139. if conR.fastSync {
  140. log.Warn("Ignoring message received during fastSync", "msg", msg)
  141. return
  142. }
  143. switch msg := msg.(type) {
  144. case *ProposalMessage:
  145. ps.SetHasProposal(msg.Proposal)
  146. err = conR.conS.SetProposal(msg.Proposal)
  147. case *ProposalPOLMessage:
  148. ps.ApplyProposalPOLMessage(msg)
  149. case *BlockPartMessage:
  150. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  151. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
  152. default:
  153. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  154. }
  155. case VoteChannel:
  156. if conR.fastSync {
  157. log.Warn("Ignoring message received during fastSync", "msg", msg)
  158. return
  159. }
  160. switch msg := msg.(type) {
  161. case *VoteMessage:
  162. vote := msg.Vote
  163. var validators *sm.ValidatorSet
  164. if rs.Height == vote.Height {
  165. validators = rs.Validators
  166. } else if rs.Height == vote.Height+1 {
  167. if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
  168. return // Wrong height, not a LastCommit straggler commit.
  169. }
  170. validators = rs.LastValidators
  171. } else {
  172. return // Wrong height. Not necessarily a bad peer.
  173. }
  174. // We have vote/validators. Height may not be rs.Height
  175. address, _ := validators.GetByIndex(msg.ValidatorIndex)
  176. added, index, err := conR.conS.AddVote(address, vote, peer.Key)
  177. if err != nil {
  178. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  179. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  180. log.Warn("Found conflicting vote. Publish evidence")
  181. evidenceTx := &types.DupeoutTx{
  182. Address: address,
  183. VoteA: *errDupe.VoteA,
  184. VoteB: *errDupe.VoteB,
  185. }
  186. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  187. } else {
  188. // Probably an invalid signature. Bad peer.
  189. log.Warn("Error attempting to add vote", "error", err)
  190. // TODO: punish peer
  191. }
  192. }
  193. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  194. ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size())
  195. ps.SetHasVote(vote, index)
  196. if added {
  197. // If rs.Height == vote.Height && rs.Round < vote.Round,
  198. // the peer is sending us CatchupCommit precommits.
  199. // We could make note of this and help filter in broadcastHasVoteMessage().
  200. conR.broadcastHasVoteMessage(vote, index)
  201. }
  202. default:
  203. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  204. }
  205. default:
  206. log.Warn(Fmt("Unknown channel %X", chId))
  207. }
  208. if err != nil {
  209. log.Warn("Error in Receive()", "error", err)
  210. }
  211. }
  212. // Broadcasts HasVoteMessage to peers that care.
  213. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  214. msg := &HasVoteMessage{
  215. Height: vote.Height,
  216. Round: vote.Round,
  217. Type: vote.Type,
  218. Index: index,
  219. }
  220. conR.Switch.Broadcast(StateChannel, msg)
  221. /*
  222. // TODO: Make this broadcast more selective.
  223. for _, peer := range conR.Switch.Peers().List() {
  224. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  225. prs := ps.GetRoundState()
  226. if prs.Height == vote.Height {
  227. // TODO: Also filter on round?
  228. peer.TrySend(StateChannel, msg)
  229. } else {
  230. // Height doesn't match
  231. // TODO: check a field, maybe CatchupCommitRound?
  232. // TODO: But that requires changing the struct field comment.
  233. }
  234. }
  235. */
  236. }
  237. // Sets our private validator account for signing votes.
  238. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  239. conR.conS.SetPrivValidator(priv)
  240. }
  241. // implements events.Eventable
  242. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  243. conR.evsw = evsw
  244. conR.conS.SetFireable(evsw)
  245. }
  246. //--------------------------------------
  247. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  248. nrsMsg = &NewRoundStepMessage{
  249. Height: rs.Height,
  250. Round: rs.Round,
  251. Step: rs.Step,
  252. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  253. LastCommitRound: rs.LastCommit.Round(),
  254. }
  255. if rs.Step == RoundStepCommit {
  256. csMsg = &CommitStepMessage{
  257. Height: rs.Height,
  258. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  259. BlockParts: rs.ProposalBlockParts.BitArray(),
  260. }
  261. }
  262. return
  263. }
  264. // Listens for changes to the ConsensusState.Step by pulling
  265. // on conR.conS.NewStepCh().
  266. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  267. for {
  268. // Get RoundState with new Step or quit.
  269. var rs *RoundState
  270. select {
  271. case rs = <-conR.conS.NewStepCh():
  272. case <-conR.Quit:
  273. return
  274. }
  275. nrsMsg, csMsg := makeRoundStepMessages(rs)
  276. if nrsMsg != nil {
  277. conR.Switch.Broadcast(StateChannel, nrsMsg)
  278. }
  279. if csMsg != nil {
  280. conR.Switch.Broadcast(StateChannel, csMsg)
  281. }
  282. }
  283. }
  284. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  285. rs := conR.conS.GetRoundState()
  286. nrsMsg, csMsg := makeRoundStepMessages(rs)
  287. if nrsMsg != nil {
  288. peer.Send(StateChannel, nrsMsg)
  289. }
  290. if csMsg != nil {
  291. peer.Send(StateChannel, csMsg)
  292. }
  293. }
  294. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  295. log := log.New("peer", peer.Key)
  296. OUTER_LOOP:
  297. for {
  298. // Manage disconnects from self or peer.
  299. if !peer.IsRunning() || !conR.IsRunning() {
  300. log.Notice(Fmt("Stopping gossipDataRoutine for %v.", peer))
  301. return
  302. }
  303. rs := conR.conS.GetRoundState()
  304. prs := ps.GetRoundState()
  305. // Send proposal Block parts?
  306. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  307. //log.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  308. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  309. part := rs.ProposalBlockParts.GetPart(index)
  310. msg := &BlockPartMessage{
  311. Height: rs.Height, // This tells peer that this part applies to us.
  312. Round: rs.Round, // This tells peer that this part applies to us.
  313. Part: part,
  314. }
  315. peer.Send(DataChannel, msg)
  316. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  317. continue OUTER_LOOP
  318. }
  319. }
  320. // If the peer is on a previous height, help catch up.
  321. if (0 < prs.Height) && (prs.Height < rs.Height) {
  322. //log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  323. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  324. // Ensure that the peer's PartSetHeader is correct
  325. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  326. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  327. log.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  328. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  329. time.Sleep(peerGossipSleepDuration)
  330. continue OUTER_LOOP
  331. }
  332. // Load the part
  333. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  334. if part == nil {
  335. log.Warn("Could not load part", "index", index,
  336. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  337. time.Sleep(peerGossipSleepDuration)
  338. continue OUTER_LOOP
  339. }
  340. // Send the part
  341. msg := &BlockPartMessage{
  342. Height: prs.Height, // Not our height, so it doesn't matter.
  343. Round: prs.Round, // Not our height, so it doesn't matter.
  344. Part: part,
  345. }
  346. peer.Send(DataChannel, msg)
  347. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  348. continue OUTER_LOOP
  349. } else {
  350. //log.Info("No parts to send in catch-up, sleeping")
  351. time.Sleep(peerGossipSleepDuration)
  352. continue OUTER_LOOP
  353. }
  354. }
  355. // If height and round don't match, sleep.
  356. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  357. //log.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  358. time.Sleep(peerGossipSleepDuration)
  359. continue OUTER_LOOP
  360. }
  361. // By here, height and round match.
  362. // Proposal block parts were already matched and sent if any were wanted.
  363. // (These can match on hash so the round doesn't matter)
  364. // Now consider sending other things, like the Proposal itself.
  365. // Send Proposal && ProposalPOL BitArray?
  366. if rs.Proposal != nil && !prs.Proposal {
  367. // Proposal
  368. {
  369. msg := &ProposalMessage{Proposal: rs.Proposal}
  370. peer.Send(DataChannel, msg)
  371. ps.SetHasProposal(rs.Proposal)
  372. }
  373. // ProposalPOL.
  374. // Peer must receive ProposalMessage first.
  375. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  376. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  377. if 0 <= rs.Proposal.POLRound {
  378. msg := &ProposalPOLMessage{
  379. Height: rs.Height,
  380. ProposalPOLRound: rs.Proposal.POLRound,
  381. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  382. }
  383. peer.Send(DataChannel, msg)
  384. }
  385. continue OUTER_LOOP
  386. }
  387. // Nothing to do. Sleep.
  388. time.Sleep(peerGossipSleepDuration)
  389. continue OUTER_LOOP
  390. }
  391. }
  392. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  393. log := log.New("peer", peer.Key)
  394. // Simple hack to throttle logs upon sleep.
  395. var sleeping = 0
  396. OUTER_LOOP:
  397. for {
  398. // Manage disconnects from self or peer.
  399. if !peer.IsRunning() || !conR.IsRunning() {
  400. log.Notice(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  401. return
  402. }
  403. rs := conR.conS.GetRoundState()
  404. prs := ps.GetRoundState()
  405. switch sleeping {
  406. case 1: // First sleep
  407. sleeping = 2
  408. case 2: // No more sleep
  409. sleeping = 0
  410. }
  411. log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  412. "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  413. // If height matches, then send LastCommit, Prevotes, Precommits.
  414. if rs.Height == prs.Height {
  415. // If there are lastCommits to send...
  416. if prs.Step == RoundStepNewHeight {
  417. if ps.PickSendVote(rs.LastCommit) {
  418. log.Info("Picked rs.LastCommit to send")
  419. continue OUTER_LOOP
  420. }
  421. }
  422. // If there are prevotes to send...
  423. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  424. if ps.PickSendVote(rs.Votes.Prevotes(rs.Round)) {
  425. log.Info("Picked rs.Prevotes(rs.Round) to send")
  426. continue OUTER_LOOP
  427. }
  428. }
  429. // If there are precommits to send...
  430. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  431. if ps.PickSendVote(rs.Votes.Precommits(rs.Round)) {
  432. log.Info("Picked rs.Precommits(rs.Round) to send")
  433. continue OUTER_LOOP
  434. }
  435. }
  436. // If there are prevotes to send for the last round...
  437. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrevote {
  438. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  439. log.Info("Picked rs.Prevotes(prs.Round) to send")
  440. continue OUTER_LOOP
  441. }
  442. }
  443. // If there are precommits to send for the last round...
  444. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrecommit {
  445. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  446. log.Info("Picked rs.Precommits(prs.Round) to send")
  447. continue OUTER_LOOP
  448. }
  449. }
  450. // If there are POLPrevotes to send...
  451. if 0 <= prs.ProposalPOLRound {
  452. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  453. if ps.PickSendVote(polPrevotes) {
  454. log.Info("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  455. continue OUTER_LOOP
  456. }
  457. }
  458. }
  459. }
  460. // Special catchup logic.
  461. // If peer is lagging by height 1, send LastCommit.
  462. if prs.Height != 0 && rs.Height == prs.Height+1 {
  463. if ps.PickSendVote(rs.LastCommit) {
  464. log.Info("Picked rs.LastCommit to send")
  465. continue OUTER_LOOP
  466. }
  467. }
  468. // Catchup logic
  469. // If peer is lagging by more than 1, send Validation.
  470. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  471. // Load the block validation for prs.Height,
  472. // which contains precommit signatures for prs.Height.
  473. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  474. log.Info("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  475. if ps.PickSendVote(validation) {
  476. log.Info("Picked Catchup validation to send")
  477. continue OUTER_LOOP
  478. }
  479. }
  480. if sleeping == 0 {
  481. // We sent nothing. Sleep...
  482. sleeping = 1
  483. log.Info("No votes to send, sleeping", "peer", peer,
  484. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  485. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  486. } else if sleeping == 2 {
  487. // Continued sleep...
  488. sleeping = 1
  489. }
  490. time.Sleep(peerGossipSleepDuration)
  491. continue OUTER_LOOP
  492. }
  493. }
  494. //-----------------------------------------------------------------------------
  495. // Read only when returned by PeerState.GetRoundState().
  496. type PeerRoundState struct {
  497. Height int // Height peer is at
  498. Round int // Round peer is at
  499. Step RoundStepType // Step peer is at
  500. StartTime time.Time // Estimated start of round 0 at this height
  501. Proposal bool // True if peer has proposal for this round
  502. ProposalBlockPartsHeader types.PartSetHeader //
  503. ProposalBlockParts *BitArray //
  504. ProposalPOLRound int // -1 if none
  505. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  506. Prevotes *BitArray // All votes peer has for this round
  507. Precommits *BitArray // All precommits peer has for this round
  508. LastCommitRound int // Round of commit for last height.
  509. LastCommit *BitArray // All commit precommits of commit for last height.
  510. CatchupCommitRound int // Round that we believe commit round is.
  511. CatchupCommit *BitArray // All commit precommits peer has for this height
  512. }
  513. //-----------------------------------------------------------------------------
  514. var (
  515. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  516. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  517. )
  518. type PeerState struct {
  519. Peer *p2p.Peer
  520. mtx sync.Mutex
  521. PeerRoundState
  522. }
  523. func NewPeerState(peer *p2p.Peer) *PeerState {
  524. return &PeerState{Peer: peer}
  525. }
  526. // Returns an atomic snapshot of the PeerRoundState.
  527. // There's no point in mutating it since it won't change PeerState.
  528. func (ps *PeerState) GetRoundState() *PeerRoundState {
  529. ps.mtx.Lock()
  530. defer ps.mtx.Unlock()
  531. prs := ps.PeerRoundState // copy
  532. return &prs
  533. }
  534. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  535. ps.mtx.Lock()
  536. defer ps.mtx.Unlock()
  537. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  538. return
  539. }
  540. if ps.Proposal {
  541. return
  542. }
  543. ps.Proposal = true
  544. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  545. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  546. ps.ProposalPOLRound = proposal.POLRound
  547. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  548. }
  549. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  550. ps.mtx.Lock()
  551. defer ps.mtx.Unlock()
  552. if ps.Height != height || ps.Round != round {
  553. return
  554. }
  555. ps.ProposalBlockParts.SetIndex(index, true)
  556. }
  557. // Convenience function to send vote to peer.
  558. // Returns true if vote was sent.
  559. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  560. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  561. msg := &VoteMessage{index, vote}
  562. ps.Peer.Send(VoteChannel, msg)
  563. return true
  564. }
  565. return false
  566. }
  567. // votes: Must be the correct Size() for the Height().
  568. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  569. ps.mtx.Lock()
  570. defer ps.mtx.Unlock()
  571. if votes.Size() == 0 {
  572. return 0, nil, false
  573. }
  574. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  575. // Lazily set data using 'votes'.
  576. if votes.IsCommit() {
  577. ps.ensureCatchupCommitRound(height, round, size)
  578. }
  579. ps.ensureVoteBitArrays(height, size)
  580. psVotes := ps.getVoteBitArray(height, round, type_)
  581. if psVotes == nil {
  582. return 0, nil, false // Not something worth sending
  583. }
  584. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  585. ps.setHasVote(height, round, type_, index)
  586. return index, votes.GetByIndex(index), true
  587. }
  588. return 0, nil, false
  589. }
  590. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  591. if ps.Height == height {
  592. if ps.Round == round {
  593. switch type_ {
  594. case types.VoteTypePrevote:
  595. return ps.Prevotes
  596. case types.VoteTypePrecommit:
  597. return ps.Precommits
  598. default:
  599. PanicSanity(Fmt("Unexpected vote type %X", type_))
  600. }
  601. }
  602. if ps.CatchupCommitRound == round {
  603. switch type_ {
  604. case types.VoteTypePrevote:
  605. return nil
  606. case types.VoteTypePrecommit:
  607. return ps.CatchupCommit
  608. default:
  609. PanicSanity(Fmt("Unexpected vote type %X", type_))
  610. }
  611. }
  612. return nil
  613. }
  614. if ps.Height == height+1 {
  615. if ps.LastCommitRound == round {
  616. switch type_ {
  617. case types.VoteTypePrevote:
  618. return nil
  619. case types.VoteTypePrecommit:
  620. return ps.LastCommit
  621. default:
  622. PanicSanity(Fmt("Unexpected vote type %X", type_))
  623. }
  624. }
  625. return nil
  626. }
  627. return nil
  628. }
  629. // NOTE: 'round' is what we know to be the commit round for height.
  630. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  631. if ps.Height != height {
  632. return
  633. }
  634. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  635. PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  636. }
  637. if ps.CatchupCommitRound == round {
  638. return // Nothing to do!
  639. }
  640. ps.CatchupCommitRound = round
  641. if round == ps.Round {
  642. ps.CatchupCommit = ps.Precommits
  643. } else {
  644. ps.CatchupCommit = NewBitArray(numValidators)
  645. }
  646. }
  647. // NOTE: It's important to make sure that numValidators actually matches
  648. // what the node sees as the number of validators for height.
  649. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  650. ps.mtx.Lock()
  651. defer ps.mtx.Unlock()
  652. ps.ensureVoteBitArrays(height, numValidators)
  653. }
  654. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  655. if ps.Height == height {
  656. if ps.Prevotes == nil {
  657. ps.Prevotes = NewBitArray(numValidators)
  658. }
  659. if ps.Precommits == nil {
  660. ps.Precommits = NewBitArray(numValidators)
  661. }
  662. if ps.CatchupCommit == nil {
  663. ps.CatchupCommit = NewBitArray(numValidators)
  664. }
  665. if ps.ProposalPOL == nil {
  666. ps.ProposalPOL = NewBitArray(numValidators)
  667. }
  668. } else if ps.Height == height+1 {
  669. if ps.LastCommit == nil {
  670. ps.LastCommit = NewBitArray(numValidators)
  671. }
  672. }
  673. }
  674. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  675. ps.mtx.Lock()
  676. defer ps.mtx.Unlock()
  677. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  678. }
  679. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  680. log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
  681. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  682. PanicSanity("Invalid vote type")
  683. }
  684. if ps.Height == height {
  685. if ps.Round == round {
  686. switch type_ {
  687. case types.VoteTypePrevote:
  688. ps.Prevotes.SetIndex(index, true)
  689. log.Info("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  690. case types.VoteTypePrecommit:
  691. ps.Precommits.SetIndex(index, true)
  692. log.Info("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  693. }
  694. } else if ps.CatchupCommitRound == round {
  695. switch type_ {
  696. case types.VoteTypePrevote:
  697. case types.VoteTypePrecommit:
  698. ps.CatchupCommit.SetIndex(index, true)
  699. log.Info("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  700. }
  701. } else if ps.ProposalPOLRound == round {
  702. switch type_ {
  703. case types.VoteTypePrevote:
  704. ps.ProposalPOL.SetIndex(index, true)
  705. log.Info("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  706. case types.VoteTypePrecommit:
  707. }
  708. }
  709. } else if ps.Height == height+1 {
  710. if ps.LastCommitRound == round {
  711. switch type_ {
  712. case types.VoteTypePrevote:
  713. case types.VoteTypePrecommit:
  714. ps.LastCommit.SetIndex(index, true)
  715. log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  716. }
  717. }
  718. } else {
  719. // Does not apply.
  720. }
  721. }
  722. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  723. ps.mtx.Lock()
  724. defer ps.mtx.Unlock()
  725. // Ignore duplicate messages.
  726. if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step {
  727. return
  728. }
  729. // Just remember these values.
  730. psHeight := ps.Height
  731. psRound := ps.Round
  732. //psStep := ps.Step
  733. psCatchupCommitRound := ps.CatchupCommitRound
  734. psCatchupCommit := ps.CatchupCommit
  735. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  736. ps.Height = msg.Height
  737. ps.Round = msg.Round
  738. ps.Step = msg.Step
  739. ps.StartTime = startTime
  740. if psHeight != msg.Height || psRound != msg.Round {
  741. ps.Proposal = false
  742. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  743. ps.ProposalBlockParts = nil
  744. ps.ProposalPOLRound = -1
  745. ps.ProposalPOL = nil
  746. // We'll update the BitArray capacity later.
  747. ps.Prevotes = nil
  748. ps.Precommits = nil
  749. }
  750. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  751. // Peer caught up to CatchupCommitRound.
  752. // Preserve psCatchupCommit!
  753. // NOTE: We prefer to use prs.Precommits if
  754. // pr.Round matches pr.CatchupCommitRound.
  755. ps.Precommits = psCatchupCommit
  756. }
  757. if psHeight != msg.Height {
  758. // Shift Precommits to LastCommit.
  759. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  760. ps.LastCommitRound = msg.LastCommitRound
  761. ps.LastCommit = ps.Precommits
  762. } else {
  763. ps.LastCommitRound = msg.LastCommitRound
  764. ps.LastCommit = nil
  765. }
  766. // We'll update the BitArray capacity later.
  767. ps.CatchupCommitRound = -1
  768. ps.CatchupCommit = nil
  769. }
  770. }
  771. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  772. ps.mtx.Lock()
  773. defer ps.mtx.Unlock()
  774. if ps.Height != msg.Height {
  775. return
  776. }
  777. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  778. ps.ProposalBlockParts = msg.BlockParts
  779. }
  780. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  781. ps.mtx.Lock()
  782. defer ps.mtx.Unlock()
  783. if ps.Height != msg.Height {
  784. return
  785. }
  786. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  787. }
  788. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  789. ps.mtx.Lock()
  790. defer ps.mtx.Unlock()
  791. if ps.Height != msg.Height {
  792. return
  793. }
  794. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  795. return
  796. }
  797. // TODO: Merge onto existing ps.ProposalPOL?
  798. // We might have sent some prevotes in the meantime.
  799. ps.ProposalPOL = msg.ProposalPOL
  800. }
  801. //-----------------------------------------------------------------------------
  802. // Messages
  803. const (
  804. msgTypeNewRoundStep = byte(0x01)
  805. msgTypeCommitStep = byte(0x02)
  806. msgTypeProposal = byte(0x11)
  807. msgTypeProposalPOL = byte(0x12)
  808. msgTypeBlockPart = byte(0x13) // both block & POL
  809. msgTypeVote = byte(0x14)
  810. msgTypeHasVote = byte(0x15)
  811. )
  812. type ConsensusMessage interface{}
  813. var _ = binary.RegisterInterface(
  814. struct{ ConsensusMessage }{},
  815. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  816. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  817. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  818. binary.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  819. binary.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  820. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  821. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  822. )
  823. // TODO: check for unnecessary extra bytes at the end.
  824. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  825. msgType = bz[0]
  826. n := new(int64)
  827. r := bytes.NewReader(bz)
  828. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  829. return
  830. }
  831. //-------------------------------------
  832. // For every height/round/step transition
  833. type NewRoundStepMessage struct {
  834. Height int
  835. Round int
  836. Step RoundStepType
  837. SecondsSinceStartTime int
  838. LastCommitRound int
  839. }
  840. func (m *NewRoundStepMessage) String() string {
  841. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  842. m.Height, m.Round, m.Step, m.LastCommitRound)
  843. }
  844. //-------------------------------------
  845. type CommitStepMessage struct {
  846. Height int
  847. BlockPartsHeader types.PartSetHeader
  848. BlockParts *BitArray
  849. }
  850. func (m *CommitStepMessage) String() string {
  851. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  852. }
  853. //-------------------------------------
  854. type ProposalMessage struct {
  855. Proposal *Proposal
  856. }
  857. func (m *ProposalMessage) String() string {
  858. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  859. }
  860. //-------------------------------------
  861. type ProposalPOLMessage struct {
  862. Height int
  863. ProposalPOLRound int
  864. ProposalPOL *BitArray
  865. }
  866. func (m *ProposalPOLMessage) String() string {
  867. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  868. }
  869. //-------------------------------------
  870. type BlockPartMessage struct {
  871. Height int
  872. Round int
  873. Part *types.Part
  874. }
  875. func (m *BlockPartMessage) String() string {
  876. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  877. }
  878. //-------------------------------------
  879. type VoteMessage struct {
  880. ValidatorIndex int
  881. Vote *types.Vote
  882. }
  883. func (m *VoteMessage) String() string {
  884. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  885. }
  886. //-------------------------------------
  887. type HasVoteMessage struct {
  888. Height int
  889. Round int
  890. Type byte
  891. Index int
  892. }
  893. func (m *HasVoteMessage) String() string {
  894. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  895. }