You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1006 lines
29 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/go-events"
  11. "github.com/tendermint/go-p2p"
  12. "github.com/tendermint/go-wire"
  13. bc "github.com/tendermint/tendermint/blockchain"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. const (
  18. StateChannel = byte(0x20)
  19. DataChannel = byte(0x21)
  20. VoteChannel = byte(0x22)
  21. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  22. maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.
  23. )
  24. //-----------------------------------------------------------------------------
  25. type ConsensusReactor struct {
  26. p2p.BaseReactor // QuitService + p2p.Switch
  27. blockStore *bc.BlockStore
  28. conS *ConsensusState
  29. fastSync bool
  30. evsw *events.EventSwitch
  31. }
  32. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
  33. conR := &ConsensusReactor{
  34. blockStore: blockStore,
  35. conS: consensusState,
  36. fastSync: fastSync,
  37. }
  38. conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR)
  39. return conR
  40. }
  41. func (conR *ConsensusReactor) OnStart() error {
  42. log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
  43. conR.BaseReactor.OnStart()
  44. // callbacks for broadcasting new steps and votes to peers
  45. // upon their respective events (ie. uses evsw)
  46. conR.registerEventCallbacks()
  47. if !conR.fastSync {
  48. _, err := conR.conS.Start()
  49. if err != nil {
  50. return err
  51. }
  52. }
  53. return nil
  54. }
  55. func (conR *ConsensusReactor) OnStop() {
  56. conR.BaseReactor.OnStop()
  57. conR.conS.Stop()
  58. }
  59. // Switch from the fast_sync to the consensus:
  60. // reset the state, turn off fast_sync, start the consensus-state-machine
  61. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  62. log.Notice("SwitchToConsensus")
  63. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  64. // broadcast a NewRoundStepMessage.
  65. conR.conS.updateToState(state)
  66. conR.fastSync = false
  67. conR.conS.Start()
  68. }
  69. // Implements Reactor
  70. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  71. // TODO optimize
  72. return []*p2p.ChannelDescriptor{
  73. &p2p.ChannelDescriptor{
  74. ID: StateChannel,
  75. Priority: 5,
  76. SendQueueCapacity: 100,
  77. },
  78. &p2p.ChannelDescriptor{
  79. ID: DataChannel,
  80. Priority: 2,
  81. SendQueueCapacity: 50,
  82. RecvBufferCapacity: 50 * 4096,
  83. },
  84. &p2p.ChannelDescriptor{
  85. ID: VoteChannel,
  86. Priority: 5,
  87. SendQueueCapacity: 100,
  88. RecvBufferCapacity: 100 * 100,
  89. },
  90. }
  91. }
  92. // Implements Reactor
  93. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  94. if !conR.IsRunning() {
  95. return
  96. }
  97. // Create peerState for peer
  98. peerState := NewPeerState(peer)
  99. peer.Data.Set(types.PeerStateKey, peerState)
  100. // Begin gossip routines for this peer.
  101. go conR.gossipDataRoutine(peer, peerState)
  102. go conR.gossipVotesRoutine(peer, peerState)
  103. // Send our state to peer.
  104. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  105. if !conR.fastSync {
  106. conR.sendNewRoundStepMessage(peer)
  107. }
  108. }
  109. // Implements Reactor
  110. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  111. if !conR.IsRunning() {
  112. return
  113. }
  114. // TODO
  115. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  116. }
  117. // Implements Reactor
  118. // NOTE: We process these messages even when we're fast_syncing.
  119. // Messages affect either a peer state or the consensus state.
  120. // Peer state updates can happen in parallel, but processing of
  121. // proposals, block parts, and votes are ordered by the receiveRoutine
  122. func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  123. if !conR.IsRunning() {
  124. log.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
  125. return
  126. }
  127. _, msg, err := DecodeMessage(msgBytes)
  128. if err != nil {
  129. log.Warn("Error decoding message", "src", src, "chId", chID, "msg", msg, "error", err, "bytes", msgBytes)
  130. // TODO punish peer?
  131. return
  132. }
  133. log.Info("Receive", "src", src, "chId", chID, "msg", msg)
  134. // Get peer states
  135. ps := src.Data.Get(types.PeerStateKey).(*PeerState)
  136. switch chID {
  137. case StateChannel:
  138. switch msg := msg.(type) {
  139. case *NewRoundStepMessage:
  140. ps.ApplyNewRoundStepMessage(msg)
  141. case *CommitStepMessage:
  142. ps.ApplyCommitStepMessage(msg)
  143. case *HasVoteMessage:
  144. ps.ApplyHasVoteMessage(msg)
  145. default:
  146. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  147. }
  148. case DataChannel:
  149. if conR.fastSync {
  150. log.Warn("Ignoring message received during fastSync", "msg", msg)
  151. return
  152. }
  153. switch msg := msg.(type) {
  154. case *ProposalMessage:
  155. ps.SetHasProposal(msg.Proposal)
  156. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  157. case *ProposalPOLMessage:
  158. ps.ApplyProposalPOLMessage(msg)
  159. case *BlockPartMessage:
  160. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  161. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  162. default:
  163. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  164. }
  165. case VoteChannel:
  166. if conR.fastSync {
  167. log.Warn("Ignoring message received during fastSync", "msg", msg)
  168. return
  169. }
  170. switch msg := msg.(type) {
  171. case *VoteMessage:
  172. cs := conR.conS
  173. cs.mtx.Lock()
  174. height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
  175. cs.mtx.Unlock()
  176. ps.EnsureVoteBitArrays(height, valSize)
  177. ps.EnsureVoteBitArrays(height-1, lastCommitSize)
  178. ps.SetHasVote(msg.Vote, msg.ValidatorIndex)
  179. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  180. default:
  181. // don't punish (leave room for soft upgrades)
  182. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  183. }
  184. default:
  185. log.Warn(Fmt("Unknown chId %X", chID))
  186. }
  187. if err != nil {
  188. log.Warn("Error in Receive()", "error", err)
  189. }
  190. }
  191. // Sets our private validator account for signing votes.
  192. func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
  193. conR.conS.SetPrivValidator(priv)
  194. }
  195. // implements events.Eventable
  196. func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
  197. conR.evsw = evsw
  198. conR.conS.SetEventSwitch(evsw)
  199. }
  200. //--------------------------------------
  201. // Listens for new steps and votes,
  202. // broadcasting the result to peers
  203. func (conR *ConsensusReactor) registerEventCallbacks() {
  204. conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) {
  205. rs := data.(types.EventDataRoundState).RoundState.(*RoundState)
  206. conR.broadcastNewRoundStep(rs)
  207. })
  208. conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) {
  209. edv := data.(types.EventDataVote)
  210. conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
  211. })
  212. }
  213. func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) {
  214. nrsMsg, csMsg := makeRoundStepMessages(rs)
  215. if nrsMsg != nil {
  216. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  217. }
  218. if csMsg != nil {
  219. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{csMsg})
  220. }
  221. }
  222. // Broadcasts HasVoteMessage to peers that care.
  223. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  224. msg := &HasVoteMessage{
  225. Height: vote.Height,
  226. Round: vote.Round,
  227. Type: vote.Type,
  228. Index: index,
  229. }
  230. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
  231. /*
  232. // TODO: Make this broadcast more selective.
  233. for _, peer := range conR.Switch.Peers().List() {
  234. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  235. prs := ps.GetRoundState()
  236. if prs.Height == vote.Height {
  237. // TODO: Also filter on round?
  238. peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
  239. } else {
  240. // Height doesn't match
  241. // TODO: check a field, maybe CatchupCommitRound?
  242. // TODO: But that requires changing the struct field comment.
  243. }
  244. }
  245. */
  246. }
  247. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  248. nrsMsg = &NewRoundStepMessage{
  249. Height: rs.Height,
  250. Round: rs.Round,
  251. Step: rs.Step,
  252. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  253. LastCommitRound: rs.LastCommit.Round(),
  254. }
  255. if rs.Step == RoundStepCommit {
  256. csMsg = &CommitStepMessage{
  257. Height: rs.Height,
  258. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  259. BlockParts: rs.ProposalBlockParts.BitArray(),
  260. }
  261. }
  262. return
  263. }
  264. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  265. rs := conR.conS.GetRoundState()
  266. nrsMsg, csMsg := makeRoundStepMessages(rs)
  267. if nrsMsg != nil {
  268. peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  269. }
  270. if csMsg != nil {
  271. peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg})
  272. }
  273. }
  274. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  275. log := log.New("peer", peer)
  276. OUTER_LOOP:
  277. for {
  278. // Manage disconnects from self or peer.
  279. if !peer.IsRunning() || !conR.IsRunning() {
  280. log.Notice(Fmt("Stopping gossipDataRoutine for %v.", peer))
  281. return
  282. }
  283. rs := conR.conS.GetRoundState()
  284. prs := ps.GetRoundState()
  285. // Send proposal Block parts?
  286. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  287. //log.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  288. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  289. part := rs.ProposalBlockParts.GetPart(index)
  290. msg := &BlockPartMessage{
  291. Height: rs.Height, // This tells peer that this part applies to us.
  292. Round: rs.Round, // This tells peer that this part applies to us.
  293. Part: part,
  294. }
  295. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  296. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  297. continue OUTER_LOOP
  298. }
  299. }
  300. // If the peer is on a previous height, help catch up.
  301. if (0 < prs.Height) && (prs.Height < rs.Height) {
  302. //log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  303. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  304. // Ensure that the peer's PartSetHeader is correct
  305. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  306. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  307. log.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  308. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  309. time.Sleep(peerGossipSleepDuration)
  310. continue OUTER_LOOP
  311. }
  312. // Load the part
  313. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  314. if part == nil {
  315. log.Warn("Could not load part", "index", index,
  316. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  317. time.Sleep(peerGossipSleepDuration)
  318. continue OUTER_LOOP
  319. }
  320. // Send the part
  321. msg := &BlockPartMessage{
  322. Height: prs.Height, // Not our height, so it doesn't matter.
  323. Round: prs.Round, // Not our height, so it doesn't matter.
  324. Part: part,
  325. }
  326. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  327. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  328. continue OUTER_LOOP
  329. } else {
  330. //log.Info("No parts to send in catch-up, sleeping")
  331. time.Sleep(peerGossipSleepDuration)
  332. continue OUTER_LOOP
  333. }
  334. }
  335. // If height and round don't match, sleep.
  336. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  337. //log.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  338. time.Sleep(peerGossipSleepDuration)
  339. continue OUTER_LOOP
  340. }
  341. // By here, height and round match.
  342. // Proposal block parts were already matched and sent if any were wanted.
  343. // (These can match on hash so the round doesn't matter)
  344. // Now consider sending other things, like the Proposal itself.
  345. // Send Proposal && ProposalPOL BitArray?
  346. if rs.Proposal != nil && !prs.Proposal {
  347. // Proposal
  348. {
  349. msg := &ProposalMessage{Proposal: rs.Proposal}
  350. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  351. ps.SetHasProposal(rs.Proposal)
  352. }
  353. // ProposalPOL.
  354. // Peer must receive ProposalMessage first.
  355. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  356. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  357. if 0 <= rs.Proposal.POLRound {
  358. msg := &ProposalPOLMessage{
  359. Height: rs.Height,
  360. ProposalPOLRound: rs.Proposal.POLRound,
  361. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  362. }
  363. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  364. }
  365. continue OUTER_LOOP
  366. }
  367. // Nothing to do. Sleep.
  368. time.Sleep(peerGossipSleepDuration)
  369. continue OUTER_LOOP
  370. }
  371. }
  372. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  373. log := log.New("peer", peer)
  374. // Simple hack to throttle logs upon sleep.
  375. var sleeping = 0
  376. OUTER_LOOP:
  377. for {
  378. // Manage disconnects from self or peer.
  379. if !peer.IsRunning() || !conR.IsRunning() {
  380. log.Notice(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  381. return
  382. }
  383. rs := conR.conS.GetRoundState()
  384. prs := ps.GetRoundState()
  385. switch sleeping {
  386. case 1: // First sleep
  387. sleeping = 2
  388. case 2: // No more sleep
  389. sleeping = 0
  390. }
  391. //log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  392. // "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  393. // If height matches, then send LastCommit, Prevotes, Precommits.
  394. if rs.Height == prs.Height {
  395. // If there are lastCommits to send...
  396. if prs.Step == RoundStepNewHeight {
  397. if ps.PickSendVote(rs.LastCommit) {
  398. log.Info("Picked rs.LastCommit to send")
  399. continue OUTER_LOOP
  400. }
  401. }
  402. // If there are prevotes to send...
  403. if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
  404. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  405. log.Info("Picked rs.Prevotes(prs.Round) to send")
  406. continue OUTER_LOOP
  407. }
  408. }
  409. // If there are precommits to send...
  410. if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
  411. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  412. log.Info("Picked rs.Precommits(prs.Round) to send")
  413. continue OUTER_LOOP
  414. }
  415. }
  416. // If there are POLPrevotes to send...
  417. if prs.ProposalPOLRound != -1 {
  418. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  419. if ps.PickSendVote(polPrevotes) {
  420. log.Info("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  421. continue OUTER_LOOP
  422. }
  423. }
  424. }
  425. }
  426. // Special catchup logic.
  427. // If peer is lagging by height 1, send LastCommit.
  428. if prs.Height != 0 && rs.Height == prs.Height+1 {
  429. if ps.PickSendVote(rs.LastCommit) {
  430. log.Info("Picked rs.LastCommit to send")
  431. continue OUTER_LOOP
  432. }
  433. }
  434. // Catchup logic
  435. // If peer is lagging by more than 1, send Validation.
  436. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  437. // Load the block validation for prs.Height,
  438. // which contains precommit signatures for prs.Height.
  439. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  440. log.Info("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  441. if ps.PickSendVote(validation) {
  442. log.Info("Picked Catchup validation to send")
  443. continue OUTER_LOOP
  444. }
  445. }
  446. if sleeping == 0 {
  447. // We sent nothing. Sleep...
  448. sleeping = 1
  449. log.Info("No votes to send, sleeping", "peer", peer,
  450. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  451. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  452. } else if sleeping == 2 {
  453. // Continued sleep...
  454. sleeping = 1
  455. }
  456. time.Sleep(peerGossipSleepDuration)
  457. continue OUTER_LOOP
  458. }
  459. }
  460. //-----------------------------------------------------------------------------
  461. // Read only when returned by PeerState.GetRoundState().
  462. type PeerRoundState struct {
  463. Height int // Height peer is at
  464. Round int // Round peer is at, -1 if unknown.
  465. Step RoundStepType // Step peer is at
  466. StartTime time.Time // Estimated start of round 0 at this height
  467. Proposal bool // True if peer has proposal for this round
  468. ProposalBlockPartsHeader types.PartSetHeader //
  469. ProposalBlockParts *BitArray //
  470. ProposalPOLRound int // Proposal's POL round. -1 if none.
  471. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  472. Prevotes *BitArray // All votes peer has for this round
  473. Precommits *BitArray // All precommits peer has for this round
  474. LastCommitRound int // Round of commit for last height. -1 if none.
  475. LastCommit *BitArray // All commit precommits of commit for last height.
  476. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none.
  477. CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound
  478. }
  479. //-----------------------------------------------------------------------------
  480. var (
  481. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  482. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  483. )
  484. type PeerState struct {
  485. Peer *p2p.Peer
  486. mtx sync.Mutex
  487. PeerRoundState
  488. }
  489. func NewPeerState(peer *p2p.Peer) *PeerState {
  490. return &PeerState{
  491. Peer: peer,
  492. PeerRoundState: PeerRoundState{
  493. Round: -1,
  494. ProposalPOLRound: -1,
  495. LastCommitRound: -1,
  496. CatchupCommitRound: -1,
  497. },
  498. }
  499. }
  500. // Returns an atomic snapshot of the PeerRoundState.
  501. // There's no point in mutating it since it won't change PeerState.
  502. func (ps *PeerState) GetRoundState() *PeerRoundState {
  503. ps.mtx.Lock()
  504. defer ps.mtx.Unlock()
  505. prs := ps.PeerRoundState // copy
  506. return &prs
  507. }
  508. // Returns an atomic snapshot of the PeerRoundState's height
  509. // used by the mempool to ensure peers are caught up before broadcasting new txs
  510. func (ps *PeerState) GetHeight() int {
  511. ps.mtx.Lock()
  512. defer ps.mtx.Unlock()
  513. return ps.PeerRoundState.Height
  514. }
  515. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  516. ps.mtx.Lock()
  517. defer ps.mtx.Unlock()
  518. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  519. return
  520. }
  521. if ps.Proposal {
  522. return
  523. }
  524. ps.Proposal = true
  525. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  526. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  527. ps.ProposalPOLRound = proposal.POLRound
  528. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  529. }
  530. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  531. ps.mtx.Lock()
  532. defer ps.mtx.Unlock()
  533. if ps.Height != height || ps.Round != round {
  534. return
  535. }
  536. ps.ProposalBlockParts.SetIndex(index, true)
  537. }
  538. // Convenience function to send vote to peer.
  539. // Returns true if vote was sent.
  540. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  541. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  542. msg := &VoteMessage{index, vote}
  543. ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
  544. return true
  545. }
  546. return false
  547. }
  548. // votes: Must be the correct Size() for the Height().
  549. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  550. ps.mtx.Lock()
  551. defer ps.mtx.Unlock()
  552. if votes.Size() == 0 {
  553. return 0, nil, false
  554. }
  555. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  556. // Lazily set data using 'votes'.
  557. if votes.IsCommit() {
  558. ps.ensureCatchupCommitRound(height, round, size)
  559. }
  560. ps.ensureVoteBitArrays(height, size)
  561. psVotes := ps.getVoteBitArray(height, round, type_)
  562. if psVotes == nil {
  563. return 0, nil, false // Not something worth sending
  564. }
  565. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  566. ps.setHasVote(height, round, type_, index)
  567. return index, votes.GetByIndex(index), true
  568. }
  569. return 0, nil, false
  570. }
  571. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  572. if ps.Height == height {
  573. if ps.Round == round {
  574. switch type_ {
  575. case types.VoteTypePrevote:
  576. return ps.Prevotes
  577. case types.VoteTypePrecommit:
  578. return ps.Precommits
  579. default:
  580. PanicSanity(Fmt("Unexpected vote type %X", type_))
  581. }
  582. }
  583. if ps.CatchupCommitRound == round {
  584. switch type_ {
  585. case types.VoteTypePrevote:
  586. return nil
  587. case types.VoteTypePrecommit:
  588. return ps.CatchupCommit
  589. default:
  590. PanicSanity(Fmt("Unexpected vote type %X", type_))
  591. }
  592. }
  593. return nil
  594. }
  595. if ps.Height == height+1 {
  596. if ps.LastCommitRound == round {
  597. switch type_ {
  598. case types.VoteTypePrevote:
  599. return nil
  600. case types.VoteTypePrecommit:
  601. return ps.LastCommit
  602. default:
  603. PanicSanity(Fmt("Unexpected vote type %X", type_))
  604. }
  605. }
  606. return nil
  607. }
  608. return nil
  609. }
  610. // 'round': A round for which we have a +2/3 commit.
  611. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  612. if ps.Height != height {
  613. return
  614. }
  615. /*
  616. NOTE: This is wrong, 'round' could change.
  617. e.g. if orig round is not the same as block LastValidation round.
  618. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  619. PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  620. }
  621. */
  622. if ps.CatchupCommitRound == round {
  623. return // Nothing to do!
  624. }
  625. ps.CatchupCommitRound = round
  626. if round == ps.Round {
  627. ps.CatchupCommit = ps.Precommits
  628. } else {
  629. ps.CatchupCommit = NewBitArray(numValidators)
  630. }
  631. }
  632. // NOTE: It's important to make sure that numValidators actually matches
  633. // what the node sees as the number of validators for height.
  634. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  635. ps.mtx.Lock()
  636. defer ps.mtx.Unlock()
  637. ps.ensureVoteBitArrays(height, numValidators)
  638. }
  639. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  640. if ps.Height == height {
  641. if ps.Prevotes == nil {
  642. ps.Prevotes = NewBitArray(numValidators)
  643. }
  644. if ps.Precommits == nil {
  645. ps.Precommits = NewBitArray(numValidators)
  646. }
  647. if ps.CatchupCommit == nil {
  648. ps.CatchupCommit = NewBitArray(numValidators)
  649. }
  650. if ps.ProposalPOL == nil {
  651. ps.ProposalPOL = NewBitArray(numValidators)
  652. }
  653. } else if ps.Height == height+1 {
  654. if ps.LastCommit == nil {
  655. ps.LastCommit = NewBitArray(numValidators)
  656. }
  657. }
  658. }
  659. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  660. ps.mtx.Lock()
  661. defer ps.mtx.Unlock()
  662. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  663. }
  664. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  665. log := log.New("peer", ps.Peer, "peerRound", ps.Round, "height", height, "round", round)
  666. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  667. PanicSanity("Invalid vote type")
  668. }
  669. if ps.Height == height {
  670. if ps.Round == round {
  671. switch type_ {
  672. case types.VoteTypePrevote:
  673. ps.Prevotes.SetIndex(index, true)
  674. log.Info("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  675. case types.VoteTypePrecommit:
  676. ps.Precommits.SetIndex(index, true)
  677. log.Info("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  678. }
  679. } else if ps.CatchupCommitRound == round {
  680. switch type_ {
  681. case types.VoteTypePrevote:
  682. case types.VoteTypePrecommit:
  683. ps.CatchupCommit.SetIndex(index, true)
  684. log.Info("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  685. }
  686. } else if ps.ProposalPOLRound == round {
  687. switch type_ {
  688. case types.VoteTypePrevote:
  689. ps.ProposalPOL.SetIndex(index, true)
  690. log.Info("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  691. case types.VoteTypePrecommit:
  692. }
  693. }
  694. } else if ps.Height == height+1 {
  695. if ps.LastCommitRound == round {
  696. switch type_ {
  697. case types.VoteTypePrevote:
  698. case types.VoteTypePrecommit:
  699. ps.LastCommit.SetIndex(index, true)
  700. log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  701. }
  702. }
  703. } else {
  704. // Does not apply.
  705. }
  706. }
  707. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  708. ps.mtx.Lock()
  709. defer ps.mtx.Unlock()
  710. // Ignore duplicate messages.
  711. if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step {
  712. return
  713. }
  714. // Just remember these values.
  715. psHeight := ps.Height
  716. psRound := ps.Round
  717. //psStep := ps.Step
  718. psCatchupCommitRound := ps.CatchupCommitRound
  719. psCatchupCommit := ps.CatchupCommit
  720. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  721. ps.Height = msg.Height
  722. ps.Round = msg.Round
  723. ps.Step = msg.Step
  724. ps.StartTime = startTime
  725. if psHeight != msg.Height || psRound != msg.Round {
  726. ps.Proposal = false
  727. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  728. ps.ProposalBlockParts = nil
  729. ps.ProposalPOLRound = -1
  730. ps.ProposalPOL = nil
  731. // We'll update the BitArray capacity later.
  732. ps.Prevotes = nil
  733. ps.Precommits = nil
  734. }
  735. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  736. // Peer caught up to CatchupCommitRound.
  737. // Preserve psCatchupCommit!
  738. // NOTE: We prefer to use prs.Precommits if
  739. // pr.Round matches pr.CatchupCommitRound.
  740. ps.Precommits = psCatchupCommit
  741. }
  742. if psHeight != msg.Height {
  743. // Shift Precommits to LastCommit.
  744. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  745. ps.LastCommitRound = msg.LastCommitRound
  746. ps.LastCommit = ps.Precommits
  747. } else {
  748. ps.LastCommitRound = msg.LastCommitRound
  749. ps.LastCommit = nil
  750. }
  751. // We'll update the BitArray capacity later.
  752. ps.CatchupCommitRound = -1
  753. ps.CatchupCommit = nil
  754. }
  755. }
  756. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  757. ps.mtx.Lock()
  758. defer ps.mtx.Unlock()
  759. if ps.Height != msg.Height {
  760. return
  761. }
  762. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  763. ps.ProposalBlockParts = msg.BlockParts
  764. }
  765. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  766. ps.mtx.Lock()
  767. defer ps.mtx.Unlock()
  768. if ps.Height != msg.Height {
  769. return
  770. }
  771. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  772. }
  773. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  774. ps.mtx.Lock()
  775. defer ps.mtx.Unlock()
  776. if ps.Height != msg.Height {
  777. return
  778. }
  779. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  780. return
  781. }
  782. // TODO: Merge onto existing ps.ProposalPOL?
  783. // We might have sent some prevotes in the meantime.
  784. ps.ProposalPOL = msg.ProposalPOL
  785. }
  786. //-----------------------------------------------------------------------------
  787. // Messages
  788. const (
  789. msgTypeNewRoundStep = byte(0x01)
  790. msgTypeCommitStep = byte(0x02)
  791. msgTypeProposal = byte(0x11)
  792. msgTypeProposalPOL = byte(0x12)
  793. msgTypeBlockPart = byte(0x13) // both block & POL
  794. msgTypeVote = byte(0x14)
  795. msgTypeHasVote = byte(0x15)
  796. )
  797. type ConsensusMessage interface{}
  798. var _ = wire.RegisterInterface(
  799. struct{ ConsensusMessage }{},
  800. wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  801. wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  802. wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  803. wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  804. wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  805. wire.ConcreteType{&VoteMessage{}, msgTypeVote},
  806. wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  807. )
  808. // TODO: check for unnecessary extra bytes at the end.
  809. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  810. msgType = bz[0]
  811. n := new(int)
  812. r := bytes.NewReader(bz)
  813. msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  814. return
  815. }
  816. //-------------------------------------
  817. // For every height/round/step transition
  818. type NewRoundStepMessage struct {
  819. Height int
  820. Round int
  821. Step RoundStepType
  822. SecondsSinceStartTime int
  823. LastCommitRound int
  824. }
  825. func (m *NewRoundStepMessage) String() string {
  826. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  827. m.Height, m.Round, m.Step, m.LastCommitRound)
  828. }
  829. //-------------------------------------
  830. type CommitStepMessage struct {
  831. Height int
  832. BlockPartsHeader types.PartSetHeader
  833. BlockParts *BitArray
  834. }
  835. func (m *CommitStepMessage) String() string {
  836. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  837. }
  838. //-------------------------------------
  839. type ProposalMessage struct {
  840. Proposal *types.Proposal
  841. }
  842. func (m *ProposalMessage) String() string {
  843. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  844. }
  845. //-------------------------------------
  846. type ProposalPOLMessage struct {
  847. Height int
  848. ProposalPOLRound int
  849. ProposalPOL *BitArray
  850. }
  851. func (m *ProposalPOLMessage) String() string {
  852. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  853. }
  854. //-------------------------------------
  855. type BlockPartMessage struct {
  856. Height int
  857. Round int
  858. Part *types.Part
  859. }
  860. func (m *BlockPartMessage) String() string {
  861. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  862. }
  863. //-------------------------------------
  864. type VoteMessage struct {
  865. ValidatorIndex int
  866. Vote *types.Vote
  867. }
  868. func (m *VoteMessage) String() string {
  869. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  870. }
  871. //-------------------------------------
  872. type HasVoteMessage struct {
  873. Height int
  874. Round int
  875. Type byte
  876. Index int
  877. }
  878. func (m *HasVoteMessage) String() string {
  879. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  880. }