You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1031 lines
30 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. PeerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. )
  26. //-----------------------------------------------------------------------------
  27. type ConsensusReactor struct {
  28. sw *p2p.Switch
  29. running uint32
  30. quit chan struct{}
  31. blockStore *bc.BlockStore
  32. conS *ConsensusState
  33. fastSync bool
  34. evsw events.Fireable
  35. }
  36. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
  37. conR := &ConsensusReactor{
  38. quit: make(chan struct{}),
  39. blockStore: blockStore,
  40. conS: consensusState,
  41. fastSync: fastSync,
  42. }
  43. return conR
  44. }
  45. // Implements Reactor
  46. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  47. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  48. log.Info("Starting ConsensusReactor", "fastSync", conR.fastSync)
  49. conR.sw = sw
  50. if !conR.fastSync {
  51. conR.conS.Start()
  52. }
  53. go conR.broadcastNewRoundStepRoutine()
  54. }
  55. }
  56. // Implements Reactor
  57. func (conR *ConsensusReactor) Stop() {
  58. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  59. log.Info("Stopping ConsensusReactor")
  60. conR.conS.Stop()
  61. close(conR.quit)
  62. }
  63. }
  64. func (conR *ConsensusReactor) IsRunning() bool {
  65. return atomic.LoadUint32(&conR.running) == 1
  66. }
  67. // Implements Reactor
  68. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  69. // TODO optimize
  70. return []*p2p.ChannelDescriptor{
  71. &p2p.ChannelDescriptor{
  72. Id: StateChannel,
  73. Priority: 5,
  74. SendQueueCapacity: 100,
  75. },
  76. &p2p.ChannelDescriptor{
  77. Id: DataChannel,
  78. Priority: 5,
  79. SendQueueCapacity: 2,
  80. },
  81. &p2p.ChannelDescriptor{
  82. Id: VoteChannel,
  83. Priority: 5,
  84. SendQueueCapacity: 40,
  85. },
  86. }
  87. }
  88. // Implements Reactor
  89. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  90. if !conR.IsRunning() {
  91. return
  92. }
  93. // Create peerState for peer
  94. peerState := NewPeerState(peer)
  95. peer.Data.Set(PeerStateKey, peerState)
  96. // Begin gossip routines for this peer.
  97. go conR.gossipDataRoutine(peer, peerState)
  98. go conR.gossipVotesRoutine(peer, peerState)
  99. // Send our state to peer.
  100. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  101. if !conR.fastSync {
  102. conR.sendNewRoundStepMessage(peer)
  103. }
  104. }
  105. // Implements Reactor
  106. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. if !conR.IsRunning() {
  108. return
  109. }
  110. // TODO
  111. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  112. }
  113. // Implements Reactor
  114. // NOTE: We process these messages even when we're fast_syncing.
  115. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  116. log.Debug("Receive", "channel", chId, "peer", peer, "bytes", msgBytes)
  117. if !conR.IsRunning() {
  118. return
  119. }
  120. // Get round state
  121. rs := conR.conS.GetRoundState()
  122. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  123. _, msg, err := DecodeMessage(msgBytes)
  124. if err != nil {
  125. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
  126. return
  127. }
  128. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg, "rsHeight", rs.Height) //, "bytes", msgBytes)
  129. switch chId {
  130. case StateChannel:
  131. switch msg := msg.(type) {
  132. case *NewRoundStepMessage:
  133. ps.ApplyNewRoundStepMessage(msg, rs)
  134. case *CommitStepMessage:
  135. ps.ApplyCommitStepMessage(msg)
  136. case *HasVoteMessage:
  137. ps.ApplyHasVoteMessage(msg)
  138. default:
  139. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  140. }
  141. case DataChannel:
  142. if conR.fastSync {
  143. log.Warn("Ignoring message received during fastSync", "msg", msg)
  144. return
  145. }
  146. switch msg := msg.(type) {
  147. case *ProposalMessage:
  148. ps.SetHasProposal(msg.Proposal)
  149. err = conR.conS.SetProposal(msg.Proposal)
  150. case *ProposalPOLMessage:
  151. ps.ApplyProposalPOLMessage(msg)
  152. case *BlockPartMessage:
  153. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  154. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
  155. default:
  156. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  157. }
  158. case VoteChannel:
  159. if conR.fastSync {
  160. log.Warn("Ignoring message received during fastSync", "msg", msg)
  161. return
  162. }
  163. switch msg := msg.(type) {
  164. case *VoteMessage:
  165. vote := msg.Vote
  166. var validators *sm.ValidatorSet
  167. if rs.Height == vote.Height {
  168. validators = rs.Validators
  169. } else if rs.Height == vote.Height+1 {
  170. if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
  171. return // Wrong height, not a LastCommit straggler commit.
  172. }
  173. validators = rs.LastValidators
  174. } else {
  175. return // Wrong height. Not necessarily a bad peer.
  176. }
  177. // We have vote/validators. Height may not be rs.Height
  178. address, _ := validators.GetByIndex(msg.ValidatorIndex)
  179. added, index, err := conR.conS.AddVote(address, vote, peer.Key)
  180. if err != nil {
  181. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  182. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  183. log.Warn("Found conflicting vote. Publish evidence")
  184. evidenceTx := &types.DupeoutTx{
  185. Address: address,
  186. VoteA: *errDupe.VoteA,
  187. VoteB: *errDupe.VoteB,
  188. }
  189. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  190. } else {
  191. // Probably an invalid signature. Bad peer.
  192. log.Warn("Error attempting to add vote", "error", err)
  193. // TODO: punish peer
  194. }
  195. }
  196. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  197. ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size())
  198. ps.SetHasVote(vote, index)
  199. if added {
  200. // If rs.Height == vote.Height && rs.Round < vote.Round,
  201. // the peer is sending us CatchupCommit precommits.
  202. // We could make note of this and help filter in broadcastHasVoteMessage().
  203. conR.broadcastHasVoteMessage(vote, index)
  204. }
  205. default:
  206. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  207. }
  208. default:
  209. log.Warn(Fmt("Unknown channel %X", chId))
  210. }
  211. if err != nil {
  212. log.Warn("Error in Receive()", "error", err)
  213. }
  214. }
  215. // Broadcasts HasVoteMessage to peers that care.
  216. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  217. msg := &HasVoteMessage{
  218. Height: vote.Height,
  219. Round: vote.Round,
  220. Type: vote.Type,
  221. Index: index,
  222. }
  223. conR.sw.Broadcast(StateChannel, msg)
  224. /*
  225. // TODO: Make this broadcast more selective.
  226. for _, peer := range conR.sw.Peers().List() {
  227. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  228. prs := ps.GetRoundState()
  229. if prs.Height == vote.Height {
  230. // TODO: Also filter on round?
  231. peer.TrySend(StateChannel, msg)
  232. } else {
  233. // Height doesn't match
  234. // TODO: check a field, maybe CatchupCommitRound?
  235. // TODO: But that requires changing the struct field comment.
  236. }
  237. }
  238. */
  239. }
  240. // Sets our private validator account for signing votes.
  241. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  242. conR.conS.SetPrivValidator(priv)
  243. }
  244. // Switch from the fast_sync to the consensus:
  245. // reset the state, turn off fast_sync, start the consensus-state-machine
  246. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  247. log.Info("SwitchToConsensus")
  248. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  249. // broadcast a NewRoundStepMessage.
  250. conR.conS.updateToState(state, false)
  251. conR.fastSync = false
  252. conR.conS.Start()
  253. }
  254. // implements events.Eventable
  255. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  256. conR.evsw = evsw
  257. conR.conS.SetFireable(evsw)
  258. }
  259. //--------------------------------------
  260. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  261. nrsMsg = &NewRoundStepMessage{
  262. Height: rs.Height,
  263. Round: rs.Round,
  264. Step: rs.Step,
  265. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  266. LastCommitRound: rs.LastCommit.Round(),
  267. }
  268. if rs.Step == RoundStepCommit {
  269. csMsg = &CommitStepMessage{
  270. Height: rs.Height,
  271. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  272. BlockParts: rs.ProposalBlockParts.BitArray(),
  273. }
  274. }
  275. return
  276. }
  277. // Listens for changes to the ConsensusState.Step by pulling
  278. // on conR.conS.NewStepCh().
  279. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  280. for {
  281. // Get RoundState with new Step or quit.
  282. var rs *RoundState
  283. select {
  284. case rs = <-conR.conS.NewStepCh():
  285. case <-conR.quit:
  286. return
  287. }
  288. nrsMsg, csMsg := makeRoundStepMessages(rs)
  289. if nrsMsg != nil {
  290. conR.sw.Broadcast(StateChannel, nrsMsg)
  291. }
  292. if csMsg != nil {
  293. conR.sw.Broadcast(StateChannel, csMsg)
  294. }
  295. }
  296. }
  297. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  298. rs := conR.conS.GetRoundState()
  299. nrsMsg, csMsg := makeRoundStepMessages(rs)
  300. if nrsMsg != nil {
  301. peer.Send(StateChannel, nrsMsg)
  302. }
  303. if csMsg != nil {
  304. peer.Send(StateChannel, csMsg)
  305. }
  306. }
  307. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  308. log := log.New("peer", peer.Key)
  309. OUTER_LOOP:
  310. for {
  311. // Manage disconnects from self or peer.
  312. if !peer.IsRunning() || !conR.IsRunning() {
  313. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  314. return
  315. }
  316. rs := conR.conS.GetRoundState()
  317. prs := ps.GetRoundState()
  318. // Send proposal Block parts?
  319. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  320. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  321. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  322. part := rs.ProposalBlockParts.GetPart(index)
  323. msg := &BlockPartMessage{
  324. Height: rs.Height, // This tells peer that this part applies to us.
  325. Round: rs.Round, // This tells peer that this part applies to us.
  326. Part: part,
  327. }
  328. peer.Send(DataChannel, msg)
  329. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  330. continue OUTER_LOOP
  331. }
  332. }
  333. // If the peer is on a previous height, help catch up.
  334. if (0 < prs.Height) && (prs.Height < rs.Height) {
  335. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  336. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  337. // Ensure that the peer's PartSetHeader is correct
  338. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  339. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  340. log.Debug("Peer ProposalBlockPartsHeader mismatch, sleeping",
  341. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  342. time.Sleep(peerGossipSleepDuration)
  343. continue OUTER_LOOP
  344. }
  345. // Load the part
  346. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  347. if part == nil {
  348. log.Warn("Could not load part", "index", index,
  349. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  350. time.Sleep(peerGossipSleepDuration)
  351. continue OUTER_LOOP
  352. }
  353. // Send the part
  354. msg := &BlockPartMessage{
  355. Height: prs.Height, // Not our height, so it doesn't matter.
  356. Round: prs.Round, // Not our height, so it doesn't matter.
  357. Part: part,
  358. }
  359. peer.Send(DataChannel, msg)
  360. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  361. continue OUTER_LOOP
  362. } else {
  363. //log.Debug("No parts to send in catch-up, sleeping")
  364. time.Sleep(peerGossipSleepDuration)
  365. continue OUTER_LOOP
  366. }
  367. }
  368. // If height and round don't match, sleep.
  369. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  370. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  371. time.Sleep(peerGossipSleepDuration)
  372. continue OUTER_LOOP
  373. }
  374. // By here, height and round match.
  375. // Proposal block parts were already matched and sent if any were wanted.
  376. // (These can match on hash so the round doesn't matter)
  377. // Now consider sending other things, like the Proposal itself.
  378. // Send Proposal && ProposalPOL BitArray?
  379. if rs.Proposal != nil && !prs.Proposal {
  380. // Proposal
  381. {
  382. msg := &ProposalMessage{Proposal: rs.Proposal}
  383. peer.Send(DataChannel, msg)
  384. ps.SetHasProposal(rs.Proposal)
  385. }
  386. // ProposalPOL.
  387. // Peer must receive ProposalMessage first.
  388. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  389. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  390. if 0 <= rs.Proposal.POLRound {
  391. msg := &ProposalPOLMessage{
  392. Height: rs.Height,
  393. ProposalPOLRound: rs.Proposal.POLRound,
  394. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  395. }
  396. peer.Send(DataChannel, msg)
  397. }
  398. continue OUTER_LOOP
  399. }
  400. // Nothing to do. Sleep.
  401. time.Sleep(peerGossipSleepDuration)
  402. continue OUTER_LOOP
  403. }
  404. }
  405. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  406. log := log.New("peer", peer.Key)
  407. // Simple hack to throttle logs upon sleep.
  408. var sleeping = 0
  409. OUTER_LOOP:
  410. for {
  411. // Manage disconnects from self or peer.
  412. if !peer.IsRunning() || !conR.IsRunning() {
  413. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  414. return
  415. }
  416. rs := conR.conS.GetRoundState()
  417. prs := ps.GetRoundState()
  418. switch sleeping {
  419. case 1: // First sleep
  420. sleeping = 2
  421. case 2: // No more sleep
  422. sleeping = 0
  423. }
  424. log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  425. "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  426. // If height matches, then send LastCommit, Prevotes, Precommits.
  427. if rs.Height == prs.Height {
  428. // If there are lastCommits to send...
  429. if prs.Step == RoundStepNewHeight {
  430. if ps.PickSendVote(rs.LastCommit) {
  431. log.Debug("Picked rs.LastCommit to send")
  432. continue OUTER_LOOP
  433. }
  434. }
  435. // If there are prevotes to send...
  436. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  437. if ps.PickSendVote(rs.Votes.Prevotes(rs.Round)) {
  438. log.Debug("Picked rs.Prevotes(rs.Round) to send")
  439. continue OUTER_LOOP
  440. }
  441. }
  442. // If there are precommits to send...
  443. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  444. if ps.PickSendVote(rs.Votes.Precommits(rs.Round)) {
  445. log.Debug("Picked rs.Precommits(rs.Round) to send")
  446. continue OUTER_LOOP
  447. }
  448. }
  449. // If there are prevotes to send for the last round...
  450. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrevote {
  451. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  452. log.Debug("Picked rs.Prevotes(prs.Round) to send")
  453. continue OUTER_LOOP
  454. }
  455. }
  456. // If there are precommits to send for the last round...
  457. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrecommit {
  458. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  459. log.Debug("Picked rs.Precommits(prs.Round) to send")
  460. continue OUTER_LOOP
  461. }
  462. }
  463. // If there are POLPrevotes to send...
  464. if 0 <= prs.ProposalPOLRound {
  465. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  466. if ps.PickSendVote(polPrevotes) {
  467. log.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  468. continue OUTER_LOOP
  469. }
  470. }
  471. }
  472. }
  473. // Special catchup logic.
  474. // If peer is lagging by height 1, send LastCommit.
  475. if prs.Height != 0 && rs.Height == prs.Height+1 {
  476. if ps.PickSendVote(rs.LastCommit) {
  477. log.Debug("Picked rs.LastCommit to send")
  478. continue OUTER_LOOP
  479. }
  480. }
  481. // Catchup logic
  482. // If peer is lagging by more than 1, send Validation.
  483. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  484. // Load the block validation for prs.Height,
  485. // which contains precommit signatures for prs.Height.
  486. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  487. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  488. if ps.PickSendVote(validation) {
  489. log.Debug("Picked Catchup validation to send")
  490. continue OUTER_LOOP
  491. }
  492. }
  493. if sleeping == 0 {
  494. // We sent nothing. Sleep...
  495. sleeping = 1
  496. log.Debug("No votes to send, sleeping", "peer", peer,
  497. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  498. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  499. } else if sleeping == 2 {
  500. // Continued sleep...
  501. sleeping = 1
  502. }
  503. time.Sleep(peerGossipSleepDuration)
  504. continue OUTER_LOOP
  505. }
  506. }
  507. //-----------------------------------------------------------------------------
  508. // Read only when returned by PeerState.GetRoundState().
  509. type PeerRoundState struct {
  510. Height int // Height peer is at
  511. Round int // Round peer is at
  512. Step RoundStepType // Step peer is at
  513. StartTime time.Time // Estimated start of round 0 at this height
  514. Proposal bool // True if peer has proposal for this round
  515. ProposalBlockPartsHeader types.PartSetHeader //
  516. ProposalBlockParts *BitArray //
  517. ProposalPOLRound int // -1 if none
  518. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  519. Prevotes *BitArray // All votes peer has for this round
  520. Precommits *BitArray // All precommits peer has for this round
  521. LastCommitRound int // Round of commit for last height.
  522. LastCommit *BitArray // All commit precommits of commit for last height.
  523. CatchupCommitRound int // Round that we believe commit round is.
  524. CatchupCommit *BitArray // All commit precommits peer has for this height
  525. }
  526. //-----------------------------------------------------------------------------
  527. var (
  528. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  529. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  530. )
  531. type PeerState struct {
  532. Peer *p2p.Peer
  533. mtx sync.Mutex
  534. PeerRoundState
  535. }
  536. func NewPeerState(peer *p2p.Peer) *PeerState {
  537. return &PeerState{Peer: peer}
  538. }
  539. // Returns an atomic snapshot of the PeerRoundState.
  540. // There's no point in mutating it since it won't change PeerState.
  541. func (ps *PeerState) GetRoundState() *PeerRoundState {
  542. ps.mtx.Lock()
  543. defer ps.mtx.Unlock()
  544. prs := ps.PeerRoundState // copy
  545. return &prs
  546. }
  547. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  548. ps.mtx.Lock()
  549. defer ps.mtx.Unlock()
  550. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  551. return
  552. }
  553. if ps.Proposal {
  554. return
  555. }
  556. ps.Proposal = true
  557. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  558. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  559. ps.ProposalPOLRound = proposal.POLRound
  560. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  561. }
  562. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  563. ps.mtx.Lock()
  564. defer ps.mtx.Unlock()
  565. if ps.Height != height || ps.Round != round {
  566. return
  567. }
  568. ps.ProposalBlockParts.SetIndex(index, true)
  569. }
  570. // Convenience function to send vote to peer.
  571. // Returns true if vote was sent.
  572. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  573. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  574. msg := &VoteMessage{index, vote}
  575. ps.Peer.Send(VoteChannel, msg)
  576. return true
  577. }
  578. return false
  579. }
  580. // votes: Must be the correct Size() for the Height().
  581. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  582. ps.mtx.Lock()
  583. defer ps.mtx.Unlock()
  584. if votes.Size() == 0 {
  585. return 0, nil, false
  586. }
  587. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  588. // Lazily set data using 'votes'.
  589. if votes.IsCommit() {
  590. ps.ensureCatchupCommitRound(height, round, size)
  591. }
  592. ps.ensureVoteBitArrays(height, size)
  593. psVotes := ps.getVoteBitArray(height, round, type_)
  594. if psVotes == nil {
  595. return 0, nil, false // Not something worth sending
  596. }
  597. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  598. ps.setHasVote(height, round, type_, index)
  599. return index, votes.GetByIndex(index), true
  600. }
  601. return 0, nil, false
  602. }
  603. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  604. if ps.Height == height {
  605. if ps.Round == round {
  606. switch type_ {
  607. case types.VoteTypePrevote:
  608. return ps.Prevotes
  609. case types.VoteTypePrecommit:
  610. return ps.Precommits
  611. default:
  612. panic(Fmt("Unexpected vote type %X", type_))
  613. }
  614. }
  615. if ps.CatchupCommitRound == round {
  616. switch type_ {
  617. case types.VoteTypePrevote:
  618. return nil
  619. case types.VoteTypePrecommit:
  620. return ps.CatchupCommit
  621. default:
  622. panic(Fmt("Unexpected vote type %X", type_))
  623. }
  624. }
  625. return nil
  626. }
  627. if ps.Height == height+1 {
  628. if ps.LastCommitRound == round {
  629. switch type_ {
  630. case types.VoteTypePrevote:
  631. return nil
  632. case types.VoteTypePrecommit:
  633. return ps.LastCommit
  634. default:
  635. panic(Fmt("Unexpected vote type %X", type_))
  636. }
  637. }
  638. return nil
  639. }
  640. return nil
  641. }
  642. // NOTE: 'round' is what we know to be the commit round for height.
  643. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  644. if ps.Height != height {
  645. return
  646. }
  647. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  648. panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  649. }
  650. if ps.CatchupCommitRound == round {
  651. return // Nothing to do!
  652. }
  653. ps.CatchupCommitRound = round
  654. if round == ps.Round {
  655. ps.CatchupCommit = ps.Precommits
  656. } else {
  657. ps.CatchupCommit = NewBitArray(numValidators)
  658. }
  659. }
  660. // NOTE: It's important to make sure that numValidators actually matches
  661. // what the node sees as the number of validators for height.
  662. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  663. ps.mtx.Lock()
  664. defer ps.mtx.Unlock()
  665. ps.ensureVoteBitArrays(height, numValidators)
  666. }
  667. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  668. if ps.Height == height {
  669. if ps.Prevotes == nil {
  670. ps.Prevotes = NewBitArray(numValidators)
  671. }
  672. if ps.Precommits == nil {
  673. ps.Precommits = NewBitArray(numValidators)
  674. }
  675. if ps.CatchupCommit == nil {
  676. ps.CatchupCommit = NewBitArray(numValidators)
  677. }
  678. if ps.ProposalPOL == nil {
  679. ps.ProposalPOL = NewBitArray(numValidators)
  680. }
  681. } else if ps.Height == height+1 {
  682. if ps.LastCommit == nil {
  683. ps.LastCommit = NewBitArray(numValidators)
  684. }
  685. }
  686. }
  687. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  688. ps.mtx.Lock()
  689. defer ps.mtx.Unlock()
  690. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  691. }
  692. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  693. log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
  694. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  695. panic("Invalid vote type") // SANITY
  696. }
  697. if ps.Height == height {
  698. if ps.Round == round {
  699. switch type_ {
  700. case types.VoteTypePrevote:
  701. ps.Prevotes.SetIndex(index, true)
  702. log.Debug("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  703. case types.VoteTypePrecommit:
  704. ps.Precommits.SetIndex(index, true)
  705. log.Debug("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  706. }
  707. } else if ps.CatchupCommitRound == round {
  708. switch type_ {
  709. case types.VoteTypePrevote:
  710. case types.VoteTypePrecommit:
  711. ps.CatchupCommit.SetIndex(index, true)
  712. log.Debug("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  713. }
  714. } else if ps.ProposalPOLRound == round {
  715. switch type_ {
  716. case types.VoteTypePrevote:
  717. ps.ProposalPOL.SetIndex(index, true)
  718. log.Debug("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  719. case types.VoteTypePrecommit:
  720. }
  721. }
  722. } else if ps.Height == height+1 {
  723. if ps.LastCommitRound == round {
  724. switch type_ {
  725. case types.VoteTypePrevote:
  726. case types.VoteTypePrecommit:
  727. ps.LastCommit.SetIndex(index, true)
  728. log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  729. }
  730. }
  731. } else {
  732. // Does not apply.
  733. }
  734. }
  735. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  736. ps.mtx.Lock()
  737. defer ps.mtx.Unlock()
  738. // Ignore duplicate messages.
  739. if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step {
  740. return
  741. }
  742. // Just remember these values.
  743. psHeight := ps.Height
  744. psRound := ps.Round
  745. //psStep := ps.Step
  746. psCatchupCommitRound := ps.CatchupCommitRound
  747. psCatchupCommit := ps.CatchupCommit
  748. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  749. ps.Height = msg.Height
  750. ps.Round = msg.Round
  751. ps.Step = msg.Step
  752. ps.StartTime = startTime
  753. if psHeight != msg.Height || psRound != msg.Round {
  754. ps.Proposal = false
  755. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  756. ps.ProposalBlockParts = nil
  757. ps.ProposalPOLRound = -1
  758. ps.ProposalPOL = nil
  759. // We'll update the BitArray capacity later.
  760. ps.Prevotes = nil
  761. ps.Precommits = nil
  762. }
  763. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  764. // Peer caught up to CatchupCommitRound.
  765. // Preserve psCatchupCommit!
  766. // NOTE: We prefer to use prs.Precommits if
  767. // pr.Round matches pr.CatchupCommitRound.
  768. ps.Precommits = psCatchupCommit
  769. }
  770. if psHeight != msg.Height {
  771. // Shift Precommits to LastCommit.
  772. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  773. ps.LastCommitRound = msg.LastCommitRound
  774. ps.LastCommit = ps.Precommits
  775. } else {
  776. ps.LastCommitRound = msg.LastCommitRound
  777. ps.LastCommit = nil
  778. }
  779. // We'll update the BitArray capacity later.
  780. ps.CatchupCommitRound = -1
  781. ps.CatchupCommit = nil
  782. }
  783. }
  784. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  785. ps.mtx.Lock()
  786. defer ps.mtx.Unlock()
  787. if ps.Height != msg.Height {
  788. return
  789. }
  790. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  791. ps.ProposalBlockParts = msg.BlockParts
  792. }
  793. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  794. ps.mtx.Lock()
  795. defer ps.mtx.Unlock()
  796. if ps.Height != msg.Height {
  797. return
  798. }
  799. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  800. }
  801. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  802. ps.mtx.Lock()
  803. defer ps.mtx.Unlock()
  804. if ps.Height != msg.Height {
  805. return
  806. }
  807. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  808. return
  809. }
  810. // TODO: Merge onto existing ps.ProposalPOL?
  811. // We might have sent some prevotes in the meantime.
  812. ps.ProposalPOL = msg.ProposalPOL
  813. }
  814. //-----------------------------------------------------------------------------
  815. // Messages
  816. const (
  817. msgTypeNewRoundStep = byte(0x01)
  818. msgTypeCommitStep = byte(0x02)
  819. msgTypeProposal = byte(0x11)
  820. msgTypeProposalPOL = byte(0x12)
  821. msgTypeBlockPart = byte(0x13) // both block & POL
  822. msgTypeVote = byte(0x14)
  823. msgTypeHasVote = byte(0x15)
  824. )
  825. type ConsensusMessage interface{}
  826. var _ = binary.RegisterInterface(
  827. struct{ ConsensusMessage }{},
  828. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  829. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  830. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  831. binary.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  832. binary.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  833. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  834. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  835. )
  836. // TODO: check for unnecessary extra bytes at the end.
  837. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  838. msgType = bz[0]
  839. n := new(int64)
  840. r := bytes.NewReader(bz)
  841. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  842. return
  843. }
  844. //-------------------------------------
  845. // For every height/round/step transition
  846. type NewRoundStepMessage struct {
  847. Height int
  848. Round int
  849. Step RoundStepType
  850. SecondsSinceStartTime int
  851. LastCommitRound int
  852. }
  853. func (m *NewRoundStepMessage) String() string {
  854. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  855. m.Height, m.Round, m.Step, m.LastCommitRound)
  856. }
  857. //-------------------------------------
  858. type CommitStepMessage struct {
  859. Height int
  860. BlockPartsHeader types.PartSetHeader
  861. BlockParts *BitArray
  862. }
  863. func (m *CommitStepMessage) String() string {
  864. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  865. }
  866. //-------------------------------------
  867. type ProposalMessage struct {
  868. Proposal *Proposal
  869. }
  870. func (m *ProposalMessage) String() string {
  871. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  872. }
  873. //-------------------------------------
  874. type ProposalPOLMessage struct {
  875. Height int
  876. ProposalPOLRound int
  877. ProposalPOL *BitArray
  878. }
  879. func (m *ProposalPOLMessage) String() string {
  880. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  881. }
  882. //-------------------------------------
  883. type BlockPartMessage struct {
  884. Height int
  885. Round int
  886. Part *types.Part
  887. }
  888. func (m *BlockPartMessage) String() string {
  889. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  890. }
  891. //-------------------------------------
  892. type VoteMessage struct {
  893. ValidatorIndex int
  894. Vote *types.Vote
  895. }
  896. func (m *VoteMessage) String() string {
  897. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  898. }
  899. //-------------------------------------
  900. type HasVoteMessage struct {
  901. Height int
  902. Round int
  903. Type byte
  904. Index int
  905. }
  906. func (m *HasVoteMessage) String() string {
  907. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  908. }