You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1007 lines
29 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/go-events"
  11. "github.com/tendermint/go-p2p"
  12. "github.com/tendermint/go-wire"
  13. bc "github.com/tendermint/tendermint/blockchain"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. const (
  18. StateChannel = byte(0x20)
  19. DataChannel = byte(0x21)
  20. VoteChannel = byte(0x22)
  21. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  22. maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.
  23. )
  24. //-----------------------------------------------------------------------------
  25. type ConsensusReactor struct {
  26. p2p.BaseReactor // QuitService + p2p.Switch
  27. blockStore *bc.BlockStore
  28. conS *ConsensusState
  29. fastSync bool
  30. evsw *events.EventSwitch
  31. }
  32. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
  33. conR := &ConsensusReactor{
  34. blockStore: blockStore,
  35. conS: consensusState,
  36. fastSync: fastSync,
  37. }
  38. conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR)
  39. return conR
  40. }
  41. func (conR *ConsensusReactor) OnStart() error {
  42. log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
  43. conR.BaseReactor.OnStart()
  44. // callbacks for broadcasting new steps and votes to peers
  45. // upon their respective events (ie. uses evsw)
  46. conR.registerEventCallbacks()
  47. if !conR.fastSync {
  48. _, err := conR.conS.Start()
  49. if err != nil {
  50. return err
  51. }
  52. }
  53. return nil
  54. }
  55. func (conR *ConsensusReactor) OnStop() {
  56. conR.BaseReactor.OnStop()
  57. conR.conS.Stop()
  58. }
  59. // Switch from the fast_sync to the consensus:
  60. // reset the state, turn off fast_sync, start the consensus-state-machine
  61. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  62. log.Notice("SwitchToConsensus")
  63. conR.conS.reconstructLastCommit(state)
  64. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  65. // broadcast a NewRoundStepMessage.
  66. conR.conS.updateToState(state)
  67. conR.fastSync = false
  68. conR.conS.Start()
  69. }
  70. // Implements Reactor
  71. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  72. // TODO optimize
  73. return []*p2p.ChannelDescriptor{
  74. &p2p.ChannelDescriptor{
  75. ID: StateChannel,
  76. Priority: 5,
  77. SendQueueCapacity: 100,
  78. },
  79. &p2p.ChannelDescriptor{
  80. ID: DataChannel, // maybe split between gossiping current block and catchup stuff
  81. Priority: 10, // once we gossip the whole block there's nothing left to send until next height or round
  82. SendQueueCapacity: 100,
  83. RecvBufferCapacity: 50 * 4096,
  84. },
  85. &p2p.ChannelDescriptor{
  86. ID: VoteChannel,
  87. Priority: 5,
  88. SendQueueCapacity: 100,
  89. RecvBufferCapacity: 100 * 100,
  90. },
  91. }
  92. }
  93. // Implements Reactor
  94. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  95. if !conR.IsRunning() {
  96. return
  97. }
  98. // Create peerState for peer
  99. peerState := NewPeerState(peer)
  100. peer.Data.Set(types.PeerStateKey, peerState)
  101. // Begin gossip routines for this peer.
  102. go conR.gossipDataRoutine(peer, peerState)
  103. go conR.gossipVotesRoutine(peer, peerState)
  104. // Send our state to peer.
  105. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  106. if !conR.fastSync {
  107. conR.sendNewRoundStepMessage(peer)
  108. }
  109. }
  110. // Implements Reactor
  111. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  112. if !conR.IsRunning() {
  113. return
  114. }
  115. // TODO
  116. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  117. }
  118. // Implements Reactor
  119. // NOTE: We process these messages even when we're fast_syncing.
  120. // Messages affect either a peer state or the consensus state.
  121. // Peer state updates can happen in parallel, but processing of
  122. // proposals, block parts, and votes are ordered by the receiveRoutine
  123. func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  124. if !conR.IsRunning() {
  125. log.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
  126. return
  127. }
  128. _, msg, err := DecodeMessage(msgBytes)
  129. if err != nil {
  130. log.Warn("Error decoding message", "src", src, "chId", chID, "msg", msg, "error", err, "bytes", msgBytes)
  131. // TODO punish peer?
  132. return
  133. }
  134. log.Info("Receive", "src", src, "chId", chID, "msg", msg)
  135. // Get peer states
  136. ps := src.Data.Get(types.PeerStateKey).(*PeerState)
  137. switch chID {
  138. case StateChannel:
  139. switch msg := msg.(type) {
  140. case *NewRoundStepMessage:
  141. ps.ApplyNewRoundStepMessage(msg)
  142. case *CommitStepMessage:
  143. ps.ApplyCommitStepMessage(msg)
  144. case *HasVoteMessage:
  145. ps.ApplyHasVoteMessage(msg)
  146. default:
  147. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  148. }
  149. case DataChannel:
  150. if conR.fastSync {
  151. log.Warn("Ignoring message received during fastSync", "msg", msg)
  152. return
  153. }
  154. switch msg := msg.(type) {
  155. case *ProposalMessage:
  156. ps.SetHasProposal(msg.Proposal)
  157. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  158. case *ProposalPOLMessage:
  159. ps.ApplyProposalPOLMessage(msg)
  160. case *BlockPartMessage:
  161. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  162. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  163. default:
  164. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  165. }
  166. case VoteChannel:
  167. if conR.fastSync {
  168. log.Warn("Ignoring message received during fastSync", "msg", msg)
  169. return
  170. }
  171. switch msg := msg.(type) {
  172. case *VoteMessage:
  173. cs := conR.conS
  174. cs.mtx.Lock()
  175. height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
  176. cs.mtx.Unlock()
  177. ps.EnsureVoteBitArrays(height, valSize)
  178. ps.EnsureVoteBitArrays(height-1, lastCommitSize)
  179. ps.SetHasVote(msg.Vote, msg.ValidatorIndex)
  180. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  181. default:
  182. // don't punish (leave room for soft upgrades)
  183. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  184. }
  185. default:
  186. log.Warn(Fmt("Unknown chId %X", chID))
  187. }
  188. if err != nil {
  189. log.Warn("Error in Receive()", "error", err)
  190. }
  191. }
  192. // Sets our private validator account for signing votes.
  193. func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
  194. conR.conS.SetPrivValidator(priv)
  195. }
  196. // implements events.Eventable
  197. func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
  198. conR.evsw = evsw
  199. conR.conS.SetEventSwitch(evsw)
  200. }
  201. //--------------------------------------
  202. // Listens for new steps and votes,
  203. // broadcasting the result to peers
  204. func (conR *ConsensusReactor) registerEventCallbacks() {
  205. conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) {
  206. rs := data.(types.EventDataRoundState).RoundState.(*RoundState)
  207. conR.broadcastNewRoundStep(rs)
  208. })
  209. conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) {
  210. edv := data.(types.EventDataVote)
  211. conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
  212. })
  213. }
  214. func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) {
  215. nrsMsg, csMsg := makeRoundStepMessages(rs)
  216. if nrsMsg != nil {
  217. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  218. }
  219. if csMsg != nil {
  220. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{csMsg})
  221. }
  222. }
  223. // Broadcasts HasVoteMessage to peers that care.
  224. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  225. msg := &HasVoteMessage{
  226. Height: vote.Height,
  227. Round: vote.Round,
  228. Type: vote.Type,
  229. Index: index,
  230. }
  231. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
  232. /*
  233. // TODO: Make this broadcast more selective.
  234. for _, peer := range conR.Switch.Peers().List() {
  235. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  236. prs := ps.GetRoundState()
  237. if prs.Height == vote.Height {
  238. // TODO: Also filter on round?
  239. peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
  240. } else {
  241. // Height doesn't match
  242. // TODO: check a field, maybe CatchupCommitRound?
  243. // TODO: But that requires changing the struct field comment.
  244. }
  245. }
  246. */
  247. }
  248. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  249. nrsMsg = &NewRoundStepMessage{
  250. Height: rs.Height,
  251. Round: rs.Round,
  252. Step: rs.Step,
  253. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  254. LastCommitRound: rs.LastCommit.Round(),
  255. }
  256. if rs.Step == RoundStepCommit {
  257. csMsg = &CommitStepMessage{
  258. Height: rs.Height,
  259. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  260. BlockParts: rs.ProposalBlockParts.BitArray(),
  261. }
  262. }
  263. return
  264. }
  265. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  266. rs := conR.conS.GetRoundState()
  267. nrsMsg, csMsg := makeRoundStepMessages(rs)
  268. if nrsMsg != nil {
  269. peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  270. }
  271. if csMsg != nil {
  272. peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg})
  273. }
  274. }
  275. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  276. log := log.New("peer", peer)
  277. OUTER_LOOP:
  278. for {
  279. // Manage disconnects from self or peer.
  280. if !peer.IsRunning() || !conR.IsRunning() {
  281. log.Notice(Fmt("Stopping gossipDataRoutine for %v.", peer))
  282. return
  283. }
  284. rs := conR.conS.GetRoundState()
  285. prs := ps.GetRoundState()
  286. // Send proposal Block parts?
  287. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  288. //log.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  289. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  290. part := rs.ProposalBlockParts.GetPart(index)
  291. msg := &BlockPartMessage{
  292. Height: rs.Height, // This tells peer that this part applies to us.
  293. Round: rs.Round, // This tells peer that this part applies to us.
  294. Part: part,
  295. }
  296. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  297. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  298. continue OUTER_LOOP
  299. }
  300. }
  301. // If the peer is on a previous height, help catch up.
  302. if (0 < prs.Height) && (prs.Height < rs.Height) {
  303. //log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  304. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  305. // Ensure that the peer's PartSetHeader is correct
  306. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  307. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  308. log.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  309. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  310. time.Sleep(peerGossipSleepDuration)
  311. continue OUTER_LOOP
  312. }
  313. // Load the part
  314. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  315. if part == nil {
  316. log.Warn("Could not load part", "index", index,
  317. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  318. time.Sleep(peerGossipSleepDuration)
  319. continue OUTER_LOOP
  320. }
  321. // Send the part
  322. msg := &BlockPartMessage{
  323. Height: prs.Height, // Not our height, so it doesn't matter.
  324. Round: prs.Round, // Not our height, so it doesn't matter.
  325. Part: part,
  326. }
  327. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  328. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  329. continue OUTER_LOOP
  330. } else {
  331. //log.Info("No parts to send in catch-up, sleeping")
  332. time.Sleep(peerGossipSleepDuration)
  333. continue OUTER_LOOP
  334. }
  335. }
  336. // If height and round don't match, sleep.
  337. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  338. //log.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  339. time.Sleep(peerGossipSleepDuration)
  340. continue OUTER_LOOP
  341. }
  342. // By here, height and round match.
  343. // Proposal block parts were already matched and sent if any were wanted.
  344. // (These can match on hash so the round doesn't matter)
  345. // Now consider sending other things, like the Proposal itself.
  346. // Send Proposal && ProposalPOL BitArray?
  347. if rs.Proposal != nil && !prs.Proposal {
  348. // Proposal
  349. {
  350. msg := &ProposalMessage{Proposal: rs.Proposal}
  351. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  352. ps.SetHasProposal(rs.Proposal)
  353. }
  354. // ProposalPOL.
  355. // Peer must receive ProposalMessage first.
  356. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  357. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  358. if 0 <= rs.Proposal.POLRound {
  359. msg := &ProposalPOLMessage{
  360. Height: rs.Height,
  361. ProposalPOLRound: rs.Proposal.POLRound,
  362. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  363. }
  364. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  365. }
  366. continue OUTER_LOOP
  367. }
  368. // Nothing to do. Sleep.
  369. time.Sleep(peerGossipSleepDuration)
  370. continue OUTER_LOOP
  371. }
  372. }
  373. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  374. log := log.New("peer", peer)
  375. // Simple hack to throttle logs upon sleep.
  376. var sleeping = 0
  377. OUTER_LOOP:
  378. for {
  379. // Manage disconnects from self or peer.
  380. if !peer.IsRunning() || !conR.IsRunning() {
  381. log.Notice(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  382. return
  383. }
  384. rs := conR.conS.GetRoundState()
  385. prs := ps.GetRoundState()
  386. switch sleeping {
  387. case 1: // First sleep
  388. sleeping = 2
  389. case 2: // No more sleep
  390. sleeping = 0
  391. }
  392. //log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  393. // "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  394. // If height matches, then send LastCommit, Prevotes, Precommits.
  395. if rs.Height == prs.Height {
  396. // If there are lastCommits to send...
  397. if prs.Step == RoundStepNewHeight {
  398. if ps.PickSendVote(rs.LastCommit) {
  399. log.Info("Picked rs.LastCommit to send")
  400. continue OUTER_LOOP
  401. }
  402. }
  403. // If there are prevotes to send...
  404. if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
  405. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  406. log.Info("Picked rs.Prevotes(prs.Round) to send")
  407. continue OUTER_LOOP
  408. }
  409. }
  410. // If there are precommits to send...
  411. if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
  412. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  413. log.Info("Picked rs.Precommits(prs.Round) to send")
  414. continue OUTER_LOOP
  415. }
  416. }
  417. // If there are POLPrevotes to send...
  418. if prs.ProposalPOLRound != -1 {
  419. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  420. if ps.PickSendVote(polPrevotes) {
  421. log.Info("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  422. continue OUTER_LOOP
  423. }
  424. }
  425. }
  426. }
  427. // Special catchup logic.
  428. // If peer is lagging by height 1, send LastCommit.
  429. if prs.Height != 0 && rs.Height == prs.Height+1 {
  430. if ps.PickSendVote(rs.LastCommit) {
  431. log.Info("Picked rs.LastCommit to send")
  432. continue OUTER_LOOP
  433. }
  434. }
  435. // Catchup logic
  436. // If peer is lagging by more than 1, send Commit.
  437. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  438. // Load the block commit for prs.Height,
  439. // which contains precommit signatures for prs.Height.
  440. commit := conR.blockStore.LoadBlockCommit(prs.Height)
  441. log.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit)
  442. if ps.PickSendVote(commit) {
  443. log.Info("Picked Catchup commit to send")
  444. continue OUTER_LOOP
  445. }
  446. }
  447. if sleeping == 0 {
  448. // We sent nothing. Sleep...
  449. sleeping = 1
  450. log.Info("No votes to send, sleeping", "peer", peer,
  451. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  452. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  453. } else if sleeping == 2 {
  454. // Continued sleep...
  455. sleeping = 1
  456. }
  457. time.Sleep(peerGossipSleepDuration)
  458. continue OUTER_LOOP
  459. }
  460. }
  461. //-----------------------------------------------------------------------------
  462. // Read only when returned by PeerState.GetRoundState().
  463. type PeerRoundState struct {
  464. Height int // Height peer is at
  465. Round int // Round peer is at, -1 if unknown.
  466. Step RoundStepType // Step peer is at
  467. StartTime time.Time // Estimated start of round 0 at this height
  468. Proposal bool // True if peer has proposal for this round
  469. ProposalBlockPartsHeader types.PartSetHeader //
  470. ProposalBlockParts *BitArray //
  471. ProposalPOLRound int // Proposal's POL round. -1 if none.
  472. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  473. Prevotes *BitArray // All votes peer has for this round
  474. Precommits *BitArray // All precommits peer has for this round
  475. LastCommitRound int // Round of commit for last height. -1 if none.
  476. LastCommit *BitArray // All commit precommits of commit for last height.
  477. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none.
  478. CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound
  479. }
  480. //-----------------------------------------------------------------------------
  481. var (
  482. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  483. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  484. )
  485. type PeerState struct {
  486. Peer *p2p.Peer
  487. mtx sync.Mutex
  488. PeerRoundState
  489. }
  490. func NewPeerState(peer *p2p.Peer) *PeerState {
  491. return &PeerState{
  492. Peer: peer,
  493. PeerRoundState: PeerRoundState{
  494. Round: -1,
  495. ProposalPOLRound: -1,
  496. LastCommitRound: -1,
  497. CatchupCommitRound: -1,
  498. },
  499. }
  500. }
  501. // Returns an atomic snapshot of the PeerRoundState.
  502. // There's no point in mutating it since it won't change PeerState.
  503. func (ps *PeerState) GetRoundState() *PeerRoundState {
  504. ps.mtx.Lock()
  505. defer ps.mtx.Unlock()
  506. prs := ps.PeerRoundState // copy
  507. return &prs
  508. }
  509. // Returns an atomic snapshot of the PeerRoundState's height
  510. // used by the mempool to ensure peers are caught up before broadcasting new txs
  511. func (ps *PeerState) GetHeight() int {
  512. ps.mtx.Lock()
  513. defer ps.mtx.Unlock()
  514. return ps.PeerRoundState.Height
  515. }
  516. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  517. ps.mtx.Lock()
  518. defer ps.mtx.Unlock()
  519. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  520. return
  521. }
  522. if ps.Proposal {
  523. return
  524. }
  525. ps.Proposal = true
  526. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  527. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  528. ps.ProposalPOLRound = proposal.POLRound
  529. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  530. }
  531. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  532. ps.mtx.Lock()
  533. defer ps.mtx.Unlock()
  534. if ps.Height != height || ps.Round != round {
  535. return
  536. }
  537. ps.ProposalBlockParts.SetIndex(index, true)
  538. }
  539. // Convenience function to send vote to peer.
  540. // Returns true if vote was sent.
  541. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  542. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  543. msg := &VoteMessage{index, vote}
  544. ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
  545. return true
  546. }
  547. return false
  548. }
  549. // votes: Must be the correct Size() for the Height().
  550. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  551. ps.mtx.Lock()
  552. defer ps.mtx.Unlock()
  553. if votes.Size() == 0 {
  554. return 0, nil, false
  555. }
  556. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  557. // Lazily set data using 'votes'.
  558. if votes.IsCommit() {
  559. ps.ensureCatchupCommitRound(height, round, size)
  560. }
  561. ps.ensureVoteBitArrays(height, size)
  562. psVotes := ps.getVoteBitArray(height, round, type_)
  563. if psVotes == nil {
  564. return 0, nil, false // Not something worth sending
  565. }
  566. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  567. ps.setHasVote(height, round, type_, index)
  568. return index, votes.GetByIndex(index), true
  569. }
  570. return 0, nil, false
  571. }
  572. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  573. if ps.Height == height {
  574. if ps.Round == round {
  575. switch type_ {
  576. case types.VoteTypePrevote:
  577. return ps.Prevotes
  578. case types.VoteTypePrecommit:
  579. return ps.Precommits
  580. default:
  581. PanicSanity(Fmt("Unexpected vote type %X", type_))
  582. }
  583. }
  584. if ps.CatchupCommitRound == round {
  585. switch type_ {
  586. case types.VoteTypePrevote:
  587. return nil
  588. case types.VoteTypePrecommit:
  589. return ps.CatchupCommit
  590. default:
  591. PanicSanity(Fmt("Unexpected vote type %X", type_))
  592. }
  593. }
  594. return nil
  595. }
  596. if ps.Height == height+1 {
  597. if ps.LastCommitRound == round {
  598. switch type_ {
  599. case types.VoteTypePrevote:
  600. return nil
  601. case types.VoteTypePrecommit:
  602. return ps.LastCommit
  603. default:
  604. PanicSanity(Fmt("Unexpected vote type %X", type_))
  605. }
  606. }
  607. return nil
  608. }
  609. return nil
  610. }
  611. // 'round': A round for which we have a +2/3 commit.
  612. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  613. if ps.Height != height {
  614. return
  615. }
  616. /*
  617. NOTE: This is wrong, 'round' could change.
  618. e.g. if orig round is not the same as block LastCommit round.
  619. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  620. PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  621. }
  622. */
  623. if ps.CatchupCommitRound == round {
  624. return // Nothing to do!
  625. }
  626. ps.CatchupCommitRound = round
  627. if round == ps.Round {
  628. ps.CatchupCommit = ps.Precommits
  629. } else {
  630. ps.CatchupCommit = NewBitArray(numValidators)
  631. }
  632. }
  633. // NOTE: It's important to make sure that numValidators actually matches
  634. // what the node sees as the number of validators for height.
  635. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  636. ps.mtx.Lock()
  637. defer ps.mtx.Unlock()
  638. ps.ensureVoteBitArrays(height, numValidators)
  639. }
  640. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  641. if ps.Height == height {
  642. if ps.Prevotes == nil {
  643. ps.Prevotes = NewBitArray(numValidators)
  644. }
  645. if ps.Precommits == nil {
  646. ps.Precommits = NewBitArray(numValidators)
  647. }
  648. if ps.CatchupCommit == nil {
  649. ps.CatchupCommit = NewBitArray(numValidators)
  650. }
  651. if ps.ProposalPOL == nil {
  652. ps.ProposalPOL = NewBitArray(numValidators)
  653. }
  654. } else if ps.Height == height+1 {
  655. if ps.LastCommit == nil {
  656. ps.LastCommit = NewBitArray(numValidators)
  657. }
  658. }
  659. }
  660. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  661. ps.mtx.Lock()
  662. defer ps.mtx.Unlock()
  663. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  664. }
  665. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  666. log := log.New("peer", ps.Peer, "peerRound", ps.Round, "height", height, "round", round)
  667. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  668. PanicSanity("Invalid vote type")
  669. }
  670. if ps.Height == height {
  671. if ps.Round == round {
  672. switch type_ {
  673. case types.VoteTypePrevote:
  674. ps.Prevotes.SetIndex(index, true)
  675. log.Info("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  676. case types.VoteTypePrecommit:
  677. ps.Precommits.SetIndex(index, true)
  678. log.Info("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  679. }
  680. } else if ps.CatchupCommitRound == round {
  681. switch type_ {
  682. case types.VoteTypePrevote:
  683. case types.VoteTypePrecommit:
  684. ps.CatchupCommit.SetIndex(index, true)
  685. log.Info("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  686. }
  687. } else if ps.ProposalPOLRound == round {
  688. switch type_ {
  689. case types.VoteTypePrevote:
  690. ps.ProposalPOL.SetIndex(index, true)
  691. log.Info("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  692. case types.VoteTypePrecommit:
  693. }
  694. }
  695. } else if ps.Height == height+1 {
  696. if ps.LastCommitRound == round {
  697. switch type_ {
  698. case types.VoteTypePrevote:
  699. case types.VoteTypePrecommit:
  700. ps.LastCommit.SetIndex(index, true)
  701. log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  702. }
  703. }
  704. } else {
  705. // Does not apply.
  706. }
  707. }
  708. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  709. ps.mtx.Lock()
  710. defer ps.mtx.Unlock()
  711. // Ignore duplicates or decreases
  712. if CompareHRS(msg.Height, msg.Round, msg.Step, ps.Height, ps.Round, ps.Step) <= 0 {
  713. return
  714. }
  715. // Just remember these values.
  716. psHeight := ps.Height
  717. psRound := ps.Round
  718. //psStep := ps.Step
  719. psCatchupCommitRound := ps.CatchupCommitRound
  720. psCatchupCommit := ps.CatchupCommit
  721. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  722. ps.Height = msg.Height
  723. ps.Round = msg.Round
  724. ps.Step = msg.Step
  725. ps.StartTime = startTime
  726. if psHeight != msg.Height || psRound != msg.Round {
  727. ps.Proposal = false
  728. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  729. ps.ProposalBlockParts = nil
  730. ps.ProposalPOLRound = -1
  731. ps.ProposalPOL = nil
  732. // We'll update the BitArray capacity later.
  733. ps.Prevotes = nil
  734. ps.Precommits = nil
  735. }
  736. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  737. // Peer caught up to CatchupCommitRound.
  738. // Preserve psCatchupCommit!
  739. // NOTE: We prefer to use prs.Precommits if
  740. // pr.Round matches pr.CatchupCommitRound.
  741. ps.Precommits = psCatchupCommit
  742. }
  743. if psHeight != msg.Height {
  744. // Shift Precommits to LastCommit.
  745. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  746. ps.LastCommitRound = msg.LastCommitRound
  747. ps.LastCommit = ps.Precommits
  748. } else {
  749. ps.LastCommitRound = msg.LastCommitRound
  750. ps.LastCommit = nil
  751. }
  752. // We'll update the BitArray capacity later.
  753. ps.CatchupCommitRound = -1
  754. ps.CatchupCommit = nil
  755. }
  756. }
  757. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  758. ps.mtx.Lock()
  759. defer ps.mtx.Unlock()
  760. if ps.Height != msg.Height {
  761. return
  762. }
  763. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  764. ps.ProposalBlockParts = msg.BlockParts
  765. }
  766. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  767. ps.mtx.Lock()
  768. defer ps.mtx.Unlock()
  769. if ps.Height != msg.Height {
  770. return
  771. }
  772. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  773. }
  774. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  775. ps.mtx.Lock()
  776. defer ps.mtx.Unlock()
  777. if ps.Height != msg.Height {
  778. return
  779. }
  780. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  781. return
  782. }
  783. // TODO: Merge onto existing ps.ProposalPOL?
  784. // We might have sent some prevotes in the meantime.
  785. ps.ProposalPOL = msg.ProposalPOL
  786. }
  787. //-----------------------------------------------------------------------------
  788. // Messages
  789. const (
  790. msgTypeNewRoundStep = byte(0x01)
  791. msgTypeCommitStep = byte(0x02)
  792. msgTypeProposal = byte(0x11)
  793. msgTypeProposalPOL = byte(0x12)
  794. msgTypeBlockPart = byte(0x13) // both block & POL
  795. msgTypeVote = byte(0x14)
  796. msgTypeHasVote = byte(0x15)
  797. )
  798. type ConsensusMessage interface{}
  799. var _ = wire.RegisterInterface(
  800. struct{ ConsensusMessage }{},
  801. wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  802. wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  803. wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  804. wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  805. wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  806. wire.ConcreteType{&VoteMessage{}, msgTypeVote},
  807. wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  808. )
  809. // TODO: check for unnecessary extra bytes at the end.
  810. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  811. msgType = bz[0]
  812. n := new(int)
  813. r := bytes.NewReader(bz)
  814. msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  815. return
  816. }
  817. //-------------------------------------
  818. // For every height/round/step transition
  819. type NewRoundStepMessage struct {
  820. Height int
  821. Round int
  822. Step RoundStepType
  823. SecondsSinceStartTime int
  824. LastCommitRound int
  825. }
  826. func (m *NewRoundStepMessage) String() string {
  827. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  828. m.Height, m.Round, m.Step, m.LastCommitRound)
  829. }
  830. //-------------------------------------
  831. type CommitStepMessage struct {
  832. Height int
  833. BlockPartsHeader types.PartSetHeader
  834. BlockParts *BitArray
  835. }
  836. func (m *CommitStepMessage) String() string {
  837. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  838. }
  839. //-------------------------------------
  840. type ProposalMessage struct {
  841. Proposal *types.Proposal
  842. }
  843. func (m *ProposalMessage) String() string {
  844. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  845. }
  846. //-------------------------------------
  847. type ProposalPOLMessage struct {
  848. Height int
  849. ProposalPOLRound int
  850. ProposalPOL *BitArray
  851. }
  852. func (m *ProposalPOLMessage) String() string {
  853. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  854. }
  855. //-------------------------------------
  856. type BlockPartMessage struct {
  857. Height int
  858. Round int
  859. Part *types.Part
  860. }
  861. func (m *BlockPartMessage) String() string {
  862. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  863. }
  864. //-------------------------------------
  865. type VoteMessage struct {
  866. ValidatorIndex int
  867. Vote *types.Vote
  868. }
  869. func (m *VoteMessage) String() string {
  870. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  871. }
  872. //-------------------------------------
  873. type HasVoteMessage struct {
  874. Height int
  875. Round int
  876. Type byte
  877. Index int
  878. }
  879. func (m *HasVoteMessage) String() string {
  880. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  881. }