You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1008 lines
30 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/go-events"
  11. "github.com/tendermint/go-p2p"
  12. "github.com/tendermint/go-wire"
  13. bc "github.com/tendermint/tendermint/blockchain"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. const (
  18. StateChannel = byte(0x20)
  19. DataChannel = byte(0x21)
  20. VoteChannel = byte(0x22)
  21. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  22. maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.
  23. )
  24. //-----------------------------------------------------------------------------
  25. type ConsensusReactor struct {
  26. p2p.BaseReactor // QuitService + p2p.Switch
  27. blockStore *bc.BlockStore
  28. conS *ConsensusState
  29. fastSync bool
  30. evsw *events.EventSwitch
  31. }
  32. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
  33. conR := &ConsensusReactor{
  34. blockStore: blockStore,
  35. conS: consensusState,
  36. fastSync: fastSync,
  37. }
  38. conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR)
  39. return conR
  40. }
  41. func (conR *ConsensusReactor) OnStart() error {
  42. log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
  43. conR.BaseReactor.OnStart()
  44. // callbacks for broadcasting new steps and votes to peers
  45. // upon their respective events (ie. uses evsw)
  46. conR.registerEventCallbacks()
  47. if !conR.fastSync {
  48. _, err := conR.conS.Start()
  49. if err != nil {
  50. return err
  51. }
  52. }
  53. return nil
  54. }
  55. func (conR *ConsensusReactor) OnStop() {
  56. conR.BaseReactor.OnStop()
  57. conR.conS.Stop()
  58. }
  59. // Switch from the fast_sync to the consensus:
  60. // reset the state, turn off fast_sync, start the consensus-state-machine
  61. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  62. log.Notice("SwitchToConsensus")
  63. conR.conS.reconstructLastCommit(state)
  64. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  65. // broadcast a NewRoundStepMessage.
  66. conR.conS.updateToState(state)
  67. conR.fastSync = false
  68. conR.conS.Start()
  69. }
  70. // Implements Reactor
  71. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  72. // TODO optimize
  73. return []*p2p.ChannelDescriptor{
  74. &p2p.ChannelDescriptor{
  75. ID: StateChannel,
  76. Priority: 5,
  77. SendQueueCapacity: 100,
  78. },
  79. &p2p.ChannelDescriptor{
  80. ID: DataChannel, // maybe split between gossiping current block and catchup stuff
  81. Priority: 10, // once we gossip the whole block there's nothing left to send until next height or round
  82. SendQueueCapacity: 100,
  83. RecvBufferCapacity: 50 * 4096,
  84. },
  85. &p2p.ChannelDescriptor{
  86. ID: VoteChannel,
  87. Priority: 5,
  88. SendQueueCapacity: 100,
  89. RecvBufferCapacity: 100 * 100,
  90. },
  91. }
  92. }
  93. // Implements Reactor
  94. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  95. if !conR.IsRunning() {
  96. return
  97. }
  98. // Create peerState for peer
  99. peerState := NewPeerState(peer)
  100. peer.Data.Set(types.PeerStateKey, peerState)
  101. // Begin gossip routines for this peer.
  102. go conR.gossipDataRoutine(peer, peerState)
  103. go conR.gossipVotesRoutine(peer, peerState)
  104. // Send our state to peer.
  105. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  106. if !conR.fastSync {
  107. conR.sendNewRoundStepMessage(peer)
  108. }
  109. }
  110. // Implements Reactor
  111. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  112. if !conR.IsRunning() {
  113. return
  114. }
  115. // TODO
  116. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  117. }
  118. // Implements Reactor
  119. // NOTE: We process these messages even when we're fast_syncing.
  120. // Messages affect either a peer state or the consensus state.
  121. // Peer state updates can happen in parallel, but processing of
  122. // proposals, block parts, and votes are ordered by the receiveRoutine
  123. // NOTE: blocks on consensus state for proposals, block parts, and votes
  124. func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  125. if !conR.IsRunning() {
  126. log.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
  127. return
  128. }
  129. _, msg, err := DecodeMessage(msgBytes)
  130. if err != nil {
  131. log.Warn("Error decoding message", "src", src, "chId", chID, "msg", msg, "error", err, "bytes", msgBytes)
  132. // TODO punish peer?
  133. return
  134. }
  135. log.Info("Receive", "src", src, "chId", chID, "msg", msg)
  136. // Get peer states
  137. ps := src.Data.Get(types.PeerStateKey).(*PeerState)
  138. switch chID {
  139. case StateChannel:
  140. switch msg := msg.(type) {
  141. case *NewRoundStepMessage:
  142. ps.ApplyNewRoundStepMessage(msg)
  143. case *CommitStepMessage:
  144. ps.ApplyCommitStepMessage(msg)
  145. case *HasVoteMessage:
  146. ps.ApplyHasVoteMessage(msg)
  147. default:
  148. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  149. }
  150. case DataChannel:
  151. if conR.fastSync {
  152. log.Warn("Ignoring message received during fastSync", "msg", msg)
  153. return
  154. }
  155. switch msg := msg.(type) {
  156. case *ProposalMessage:
  157. ps.SetHasProposal(msg.Proposal)
  158. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  159. case *ProposalPOLMessage:
  160. ps.ApplyProposalPOLMessage(msg)
  161. case *BlockPartMessage:
  162. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  163. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  164. default:
  165. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  166. }
  167. case VoteChannel:
  168. if conR.fastSync {
  169. log.Warn("Ignoring message received during fastSync", "msg", msg)
  170. return
  171. }
  172. switch msg := msg.(type) {
  173. case *VoteMessage:
  174. cs := conR.conS
  175. cs.mtx.Lock()
  176. height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
  177. cs.mtx.Unlock()
  178. ps.EnsureVoteBitArrays(height, valSize)
  179. ps.EnsureVoteBitArrays(height-1, lastCommitSize)
  180. ps.SetHasVote(msg.Vote, msg.ValidatorIndex)
  181. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
  182. default:
  183. // don't punish (leave room for soft upgrades)
  184. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  185. }
  186. default:
  187. log.Warn(Fmt("Unknown chId %X", chID))
  188. }
  189. if err != nil {
  190. log.Warn("Error in Receive()", "error", err)
  191. }
  192. }
  193. // Sets our private validator account for signing votes.
  194. func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
  195. conR.conS.SetPrivValidator(priv)
  196. }
  197. // implements events.Eventable
  198. func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
  199. conR.evsw = evsw
  200. conR.conS.SetEventSwitch(evsw)
  201. }
  202. //--------------------------------------
  203. // Listens for new steps and votes,
  204. // broadcasting the result to peers
  205. func (conR *ConsensusReactor) registerEventCallbacks() {
  206. conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) {
  207. rs := data.(types.EventDataRoundState).RoundState.(*RoundState)
  208. conR.broadcastNewRoundStep(rs)
  209. })
  210. conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) {
  211. edv := data.(types.EventDataVote)
  212. conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
  213. })
  214. }
  215. func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) {
  216. nrsMsg, csMsg := makeRoundStepMessages(rs)
  217. if nrsMsg != nil {
  218. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  219. }
  220. if csMsg != nil {
  221. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{csMsg})
  222. }
  223. }
  224. // Broadcasts HasVoteMessage to peers that care.
  225. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  226. msg := &HasVoteMessage{
  227. Height: vote.Height,
  228. Round: vote.Round,
  229. Type: vote.Type,
  230. Index: index,
  231. }
  232. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
  233. /*
  234. // TODO: Make this broadcast more selective.
  235. for _, peer := range conR.Switch.Peers().List() {
  236. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  237. prs := ps.GetRoundState()
  238. if prs.Height == vote.Height {
  239. // TODO: Also filter on round?
  240. peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
  241. } else {
  242. // Height doesn't match
  243. // TODO: check a field, maybe CatchupCommitRound?
  244. // TODO: But that requires changing the struct field comment.
  245. }
  246. }
  247. */
  248. }
  249. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  250. nrsMsg = &NewRoundStepMessage{
  251. Height: rs.Height,
  252. Round: rs.Round,
  253. Step: rs.Step,
  254. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  255. LastCommitRound: rs.LastCommit.Round(),
  256. }
  257. if rs.Step == RoundStepCommit {
  258. csMsg = &CommitStepMessage{
  259. Height: rs.Height,
  260. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  261. BlockParts: rs.ProposalBlockParts.BitArray(),
  262. }
  263. }
  264. return
  265. }
  266. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  267. rs := conR.conS.GetRoundState()
  268. nrsMsg, csMsg := makeRoundStepMessages(rs)
  269. if nrsMsg != nil {
  270. peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  271. }
  272. if csMsg != nil {
  273. peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg})
  274. }
  275. }
  276. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  277. log := log.New("peer", peer)
  278. OUTER_LOOP:
  279. for {
  280. // Manage disconnects from self or peer.
  281. if !peer.IsRunning() || !conR.IsRunning() {
  282. log.Notice(Fmt("Stopping gossipDataRoutine for %v.", peer))
  283. return
  284. }
  285. rs := conR.conS.GetRoundState()
  286. prs := ps.GetRoundState()
  287. // Send proposal Block parts?
  288. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  289. //log.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  290. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  291. part := rs.ProposalBlockParts.GetPart(index)
  292. msg := &BlockPartMessage{
  293. Height: rs.Height, // This tells peer that this part applies to us.
  294. Round: rs.Round, // This tells peer that this part applies to us.
  295. Part: part,
  296. }
  297. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  298. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  299. continue OUTER_LOOP
  300. }
  301. }
  302. // If the peer is on a previous height, help catch up.
  303. if (0 < prs.Height) && (prs.Height < rs.Height) {
  304. //log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  305. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  306. // Ensure that the peer's PartSetHeader is correct
  307. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  308. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  309. log.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  310. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  311. time.Sleep(peerGossipSleepDuration)
  312. continue OUTER_LOOP
  313. }
  314. // Load the part
  315. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  316. if part == nil {
  317. log.Warn("Could not load part", "index", index,
  318. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  319. time.Sleep(peerGossipSleepDuration)
  320. continue OUTER_LOOP
  321. }
  322. // Send the part
  323. msg := &BlockPartMessage{
  324. Height: prs.Height, // Not our height, so it doesn't matter.
  325. Round: prs.Round, // Not our height, so it doesn't matter.
  326. Part: part,
  327. }
  328. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  329. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  330. continue OUTER_LOOP
  331. } else {
  332. //log.Info("No parts to send in catch-up, sleeping")
  333. time.Sleep(peerGossipSleepDuration)
  334. continue OUTER_LOOP
  335. }
  336. }
  337. // If height and round don't match, sleep.
  338. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  339. //log.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  340. time.Sleep(peerGossipSleepDuration)
  341. continue OUTER_LOOP
  342. }
  343. // By here, height and round match.
  344. // Proposal block parts were already matched and sent if any were wanted.
  345. // (These can match on hash so the round doesn't matter)
  346. // Now consider sending other things, like the Proposal itself.
  347. // Send Proposal && ProposalPOL BitArray?
  348. if rs.Proposal != nil && !prs.Proposal {
  349. // Proposal
  350. {
  351. msg := &ProposalMessage{Proposal: rs.Proposal}
  352. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  353. ps.SetHasProposal(rs.Proposal)
  354. }
  355. // ProposalPOL.
  356. // Peer must receive ProposalMessage first.
  357. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  358. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  359. if 0 <= rs.Proposal.POLRound {
  360. msg := &ProposalPOLMessage{
  361. Height: rs.Height,
  362. ProposalPOLRound: rs.Proposal.POLRound,
  363. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  364. }
  365. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  366. }
  367. continue OUTER_LOOP
  368. }
  369. // Nothing to do. Sleep.
  370. time.Sleep(peerGossipSleepDuration)
  371. continue OUTER_LOOP
  372. }
  373. }
  374. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  375. log := log.New("peer", peer)
  376. // Simple hack to throttle logs upon sleep.
  377. var sleeping = 0
  378. OUTER_LOOP:
  379. for {
  380. // Manage disconnects from self or peer.
  381. if !peer.IsRunning() || !conR.IsRunning() {
  382. log.Notice(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  383. return
  384. }
  385. rs := conR.conS.GetRoundState()
  386. prs := ps.GetRoundState()
  387. switch sleeping {
  388. case 1: // First sleep
  389. sleeping = 2
  390. case 2: // No more sleep
  391. sleeping = 0
  392. }
  393. //log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  394. // "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  395. // If height matches, then send LastCommit, Prevotes, Precommits.
  396. if rs.Height == prs.Height {
  397. // If there are lastCommits to send...
  398. if prs.Step == RoundStepNewHeight {
  399. if ps.PickSendVote(rs.LastCommit) {
  400. log.Info("Picked rs.LastCommit to send")
  401. continue OUTER_LOOP
  402. }
  403. }
  404. // If there are prevotes to send...
  405. if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
  406. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  407. log.Info("Picked rs.Prevotes(prs.Round) to send")
  408. continue OUTER_LOOP
  409. }
  410. }
  411. // If there are precommits to send...
  412. if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
  413. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  414. log.Info("Picked rs.Precommits(prs.Round) to send")
  415. continue OUTER_LOOP
  416. }
  417. }
  418. // If there are POLPrevotes to send...
  419. if prs.ProposalPOLRound != -1 {
  420. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  421. if ps.PickSendVote(polPrevotes) {
  422. log.Info("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  423. continue OUTER_LOOP
  424. }
  425. }
  426. }
  427. }
  428. // Special catchup logic.
  429. // If peer is lagging by height 1, send LastCommit.
  430. if prs.Height != 0 && rs.Height == prs.Height+1 {
  431. if ps.PickSendVote(rs.LastCommit) {
  432. log.Info("Picked rs.LastCommit to send")
  433. continue OUTER_LOOP
  434. }
  435. }
  436. // Catchup logic
  437. // If peer is lagging by more than 1, send Commit.
  438. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  439. // Load the block commit for prs.Height,
  440. // which contains precommit signatures for prs.Height.
  441. commit := conR.blockStore.LoadBlockCommit(prs.Height)
  442. log.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit)
  443. if ps.PickSendVote(commit) {
  444. log.Info("Picked Catchup commit to send")
  445. continue OUTER_LOOP
  446. }
  447. }
  448. if sleeping == 0 {
  449. // We sent nothing. Sleep...
  450. sleeping = 1
  451. log.Info("No votes to send, sleeping", "peer", peer,
  452. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  453. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  454. } else if sleeping == 2 {
  455. // Continued sleep...
  456. sleeping = 1
  457. }
  458. time.Sleep(peerGossipSleepDuration)
  459. continue OUTER_LOOP
  460. }
  461. }
  462. //-----------------------------------------------------------------------------
  463. // Read only when returned by PeerState.GetRoundState().
  464. type PeerRoundState struct {
  465. Height int // Height peer is at
  466. Round int // Round peer is at, -1 if unknown.
  467. Step RoundStepType // Step peer is at
  468. StartTime time.Time // Estimated start of round 0 at this height
  469. Proposal bool // True if peer has proposal for this round
  470. ProposalBlockPartsHeader types.PartSetHeader //
  471. ProposalBlockParts *BitArray //
  472. ProposalPOLRound int // Proposal's POL round. -1 if none.
  473. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  474. Prevotes *BitArray // All votes peer has for this round
  475. Precommits *BitArray // All precommits peer has for this round
  476. LastCommitRound int // Round of commit for last height. -1 if none.
  477. LastCommit *BitArray // All commit precommits of commit for last height.
  478. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none.
  479. CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound
  480. }
  481. //-----------------------------------------------------------------------------
  482. var (
  483. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  484. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  485. )
  486. type PeerState struct {
  487. Peer *p2p.Peer
  488. mtx sync.Mutex
  489. PeerRoundState
  490. }
  491. func NewPeerState(peer *p2p.Peer) *PeerState {
  492. return &PeerState{
  493. Peer: peer,
  494. PeerRoundState: PeerRoundState{
  495. Round: -1,
  496. ProposalPOLRound: -1,
  497. LastCommitRound: -1,
  498. CatchupCommitRound: -1,
  499. },
  500. }
  501. }
  502. // Returns an atomic snapshot of the PeerRoundState.
  503. // There's no point in mutating it since it won't change PeerState.
  504. func (ps *PeerState) GetRoundState() *PeerRoundState {
  505. ps.mtx.Lock()
  506. defer ps.mtx.Unlock()
  507. prs := ps.PeerRoundState // copy
  508. return &prs
  509. }
  510. // Returns an atomic snapshot of the PeerRoundState's height
  511. // used by the mempool to ensure peers are caught up before broadcasting new txs
  512. func (ps *PeerState) GetHeight() int {
  513. ps.mtx.Lock()
  514. defer ps.mtx.Unlock()
  515. return ps.PeerRoundState.Height
  516. }
  517. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  518. ps.mtx.Lock()
  519. defer ps.mtx.Unlock()
  520. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  521. return
  522. }
  523. if ps.Proposal {
  524. return
  525. }
  526. ps.Proposal = true
  527. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  528. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  529. ps.ProposalPOLRound = proposal.POLRound
  530. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  531. }
  532. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  533. ps.mtx.Lock()
  534. defer ps.mtx.Unlock()
  535. if ps.Height != height || ps.Round != round {
  536. return
  537. }
  538. ps.ProposalBlockParts.SetIndex(index, true)
  539. }
  540. // Convenience function to send vote to peer.
  541. // Returns true if vote was sent.
  542. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  543. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  544. msg := &VoteMessage{index, vote}
  545. ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
  546. return true
  547. }
  548. return false
  549. }
  550. // votes: Must be the correct Size() for the Height().
  551. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  552. ps.mtx.Lock()
  553. defer ps.mtx.Unlock()
  554. if votes.Size() == 0 {
  555. return 0, nil, false
  556. }
  557. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  558. // Lazily set data using 'votes'.
  559. if votes.IsCommit() {
  560. ps.ensureCatchupCommitRound(height, round, size)
  561. }
  562. ps.ensureVoteBitArrays(height, size)
  563. psVotes := ps.getVoteBitArray(height, round, type_)
  564. if psVotes == nil {
  565. return 0, nil, false // Not something worth sending
  566. }
  567. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  568. ps.setHasVote(height, round, type_, index)
  569. return index, votes.GetByIndex(index), true
  570. }
  571. return 0, nil, false
  572. }
  573. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  574. if ps.Height == height {
  575. if ps.Round == round {
  576. switch type_ {
  577. case types.VoteTypePrevote:
  578. return ps.Prevotes
  579. case types.VoteTypePrecommit:
  580. return ps.Precommits
  581. default:
  582. PanicSanity(Fmt("Unexpected vote type %X", type_))
  583. }
  584. }
  585. if ps.CatchupCommitRound == round {
  586. switch type_ {
  587. case types.VoteTypePrevote:
  588. return nil
  589. case types.VoteTypePrecommit:
  590. return ps.CatchupCommit
  591. default:
  592. PanicSanity(Fmt("Unexpected vote type %X", type_))
  593. }
  594. }
  595. return nil
  596. }
  597. if ps.Height == height+1 {
  598. if ps.LastCommitRound == round {
  599. switch type_ {
  600. case types.VoteTypePrevote:
  601. return nil
  602. case types.VoteTypePrecommit:
  603. return ps.LastCommit
  604. default:
  605. PanicSanity(Fmt("Unexpected vote type %X", type_))
  606. }
  607. }
  608. return nil
  609. }
  610. return nil
  611. }
  612. // 'round': A round for which we have a +2/3 commit.
  613. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  614. if ps.Height != height {
  615. return
  616. }
  617. /*
  618. NOTE: This is wrong, 'round' could change.
  619. e.g. if orig round is not the same as block LastCommit round.
  620. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  621. PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  622. }
  623. */
  624. if ps.CatchupCommitRound == round {
  625. return // Nothing to do!
  626. }
  627. ps.CatchupCommitRound = round
  628. if round == ps.Round {
  629. ps.CatchupCommit = ps.Precommits
  630. } else {
  631. ps.CatchupCommit = NewBitArray(numValidators)
  632. }
  633. }
  634. // NOTE: It's important to make sure that numValidators actually matches
  635. // what the node sees as the number of validators for height.
  636. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  637. ps.mtx.Lock()
  638. defer ps.mtx.Unlock()
  639. ps.ensureVoteBitArrays(height, numValidators)
  640. }
  641. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  642. if ps.Height == height {
  643. if ps.Prevotes == nil {
  644. ps.Prevotes = NewBitArray(numValidators)
  645. }
  646. if ps.Precommits == nil {
  647. ps.Precommits = NewBitArray(numValidators)
  648. }
  649. if ps.CatchupCommit == nil {
  650. ps.CatchupCommit = NewBitArray(numValidators)
  651. }
  652. if ps.ProposalPOL == nil {
  653. ps.ProposalPOL = NewBitArray(numValidators)
  654. }
  655. } else if ps.Height == height+1 {
  656. if ps.LastCommit == nil {
  657. ps.LastCommit = NewBitArray(numValidators)
  658. }
  659. }
  660. }
  661. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  662. ps.mtx.Lock()
  663. defer ps.mtx.Unlock()
  664. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  665. }
  666. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  667. log := log.New("peer", ps.Peer, "peerRound", ps.Round, "height", height, "round", round)
  668. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  669. PanicSanity("Invalid vote type")
  670. }
  671. if ps.Height == height {
  672. if ps.Round == round {
  673. switch type_ {
  674. case types.VoteTypePrevote:
  675. ps.Prevotes.SetIndex(index, true)
  676. log.Info("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  677. case types.VoteTypePrecommit:
  678. ps.Precommits.SetIndex(index, true)
  679. log.Info("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  680. }
  681. } else if ps.CatchupCommitRound == round {
  682. switch type_ {
  683. case types.VoteTypePrevote:
  684. case types.VoteTypePrecommit:
  685. ps.CatchupCommit.SetIndex(index, true)
  686. log.Info("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  687. }
  688. } else if ps.ProposalPOLRound == round {
  689. switch type_ {
  690. case types.VoteTypePrevote:
  691. ps.ProposalPOL.SetIndex(index, true)
  692. log.Info("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  693. case types.VoteTypePrecommit:
  694. }
  695. }
  696. } else if ps.Height == height+1 {
  697. if ps.LastCommitRound == round {
  698. switch type_ {
  699. case types.VoteTypePrevote:
  700. case types.VoteTypePrecommit:
  701. ps.LastCommit.SetIndex(index, true)
  702. log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  703. }
  704. }
  705. } else {
  706. // Does not apply.
  707. }
  708. }
  709. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  710. ps.mtx.Lock()
  711. defer ps.mtx.Unlock()
  712. // Ignore duplicates or decreases
  713. if CompareHRS(msg.Height, msg.Round, msg.Step, ps.Height, ps.Round, ps.Step) <= 0 {
  714. return
  715. }
  716. // Just remember these values.
  717. psHeight := ps.Height
  718. psRound := ps.Round
  719. //psStep := ps.Step
  720. psCatchupCommitRound := ps.CatchupCommitRound
  721. psCatchupCommit := ps.CatchupCommit
  722. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  723. ps.Height = msg.Height
  724. ps.Round = msg.Round
  725. ps.Step = msg.Step
  726. ps.StartTime = startTime
  727. if psHeight != msg.Height || psRound != msg.Round {
  728. ps.Proposal = false
  729. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  730. ps.ProposalBlockParts = nil
  731. ps.ProposalPOLRound = -1
  732. ps.ProposalPOL = nil
  733. // We'll update the BitArray capacity later.
  734. ps.Prevotes = nil
  735. ps.Precommits = nil
  736. }
  737. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  738. // Peer caught up to CatchupCommitRound.
  739. // Preserve psCatchupCommit!
  740. // NOTE: We prefer to use prs.Precommits if
  741. // pr.Round matches pr.CatchupCommitRound.
  742. ps.Precommits = psCatchupCommit
  743. }
  744. if psHeight != msg.Height {
  745. // Shift Precommits to LastCommit.
  746. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  747. ps.LastCommitRound = msg.LastCommitRound
  748. ps.LastCommit = ps.Precommits
  749. } else {
  750. ps.LastCommitRound = msg.LastCommitRound
  751. ps.LastCommit = nil
  752. }
  753. // We'll update the BitArray capacity later.
  754. ps.CatchupCommitRound = -1
  755. ps.CatchupCommit = nil
  756. }
  757. }
  758. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  759. ps.mtx.Lock()
  760. defer ps.mtx.Unlock()
  761. if ps.Height != msg.Height {
  762. return
  763. }
  764. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  765. ps.ProposalBlockParts = msg.BlockParts
  766. }
  767. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  768. ps.mtx.Lock()
  769. defer ps.mtx.Unlock()
  770. if ps.Height != msg.Height {
  771. return
  772. }
  773. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  774. }
  775. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  776. ps.mtx.Lock()
  777. defer ps.mtx.Unlock()
  778. if ps.Height != msg.Height {
  779. return
  780. }
  781. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  782. return
  783. }
  784. // TODO: Merge onto existing ps.ProposalPOL?
  785. // We might have sent some prevotes in the meantime.
  786. ps.ProposalPOL = msg.ProposalPOL
  787. }
  788. //-----------------------------------------------------------------------------
  789. // Messages
  790. const (
  791. msgTypeNewRoundStep = byte(0x01)
  792. msgTypeCommitStep = byte(0x02)
  793. msgTypeProposal = byte(0x11)
  794. msgTypeProposalPOL = byte(0x12)
  795. msgTypeBlockPart = byte(0x13) // both block & POL
  796. msgTypeVote = byte(0x14)
  797. msgTypeHasVote = byte(0x15)
  798. )
  799. type ConsensusMessage interface{}
  800. var _ = wire.RegisterInterface(
  801. struct{ ConsensusMessage }{},
  802. wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  803. wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  804. wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  805. wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  806. wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  807. wire.ConcreteType{&VoteMessage{}, msgTypeVote},
  808. wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  809. )
  810. // TODO: check for unnecessary extra bytes at the end.
  811. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  812. msgType = bz[0]
  813. n := new(int)
  814. r := bytes.NewReader(bz)
  815. msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  816. return
  817. }
  818. //-------------------------------------
  819. // For every height/round/step transition
  820. type NewRoundStepMessage struct {
  821. Height int
  822. Round int
  823. Step RoundStepType
  824. SecondsSinceStartTime int
  825. LastCommitRound int
  826. }
  827. func (m *NewRoundStepMessage) String() string {
  828. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  829. m.Height, m.Round, m.Step, m.LastCommitRound)
  830. }
  831. //-------------------------------------
  832. type CommitStepMessage struct {
  833. Height int
  834. BlockPartsHeader types.PartSetHeader
  835. BlockParts *BitArray
  836. }
  837. func (m *CommitStepMessage) String() string {
  838. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  839. }
  840. //-------------------------------------
  841. type ProposalMessage struct {
  842. Proposal *types.Proposal
  843. }
  844. func (m *ProposalMessage) String() string {
  845. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  846. }
  847. //-------------------------------------
  848. type ProposalPOLMessage struct {
  849. Height int
  850. ProposalPOLRound int
  851. ProposalPOL *BitArray
  852. }
  853. func (m *ProposalPOLMessage) String() string {
  854. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  855. }
  856. //-------------------------------------
  857. type BlockPartMessage struct {
  858. Height int
  859. Round int
  860. Part *types.Part
  861. }
  862. func (m *BlockPartMessage) String() string {
  863. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  864. }
  865. //-------------------------------------
  866. type VoteMessage struct {
  867. ValidatorIndex int
  868. Vote *types.Vote
  869. }
  870. func (m *VoteMessage) String() string {
  871. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  872. }
  873. //-------------------------------------
  874. type HasVoteMessage struct {
  875. Height int
  876. Round int
  877. Type byte
  878. Index int
  879. }
  880. func (m *HasVoteMessage) String() string {
  881. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  882. }