You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1030 lines
30 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. PeerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. )
  26. //-----------------------------------------------------------------------------
  27. type ConsensusReactor struct {
  28. sw *p2p.Switch
  29. running uint32
  30. quit chan struct{}
  31. blockStore *bc.BlockStore
  32. conS *ConsensusState
  33. fastSync bool
  34. evsw events.Fireable
  35. }
  36. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
  37. conR := &ConsensusReactor{
  38. quit: make(chan struct{}),
  39. blockStore: blockStore,
  40. conS: consensusState,
  41. fastSync: fastSync,
  42. }
  43. return conR
  44. }
  45. // Implements Reactor
  46. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  47. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  48. log.Info("Starting ConsensusReactor", "fastSync", conR.fastSync)
  49. conR.sw = sw
  50. if !conR.fastSync {
  51. conR.conS.Start()
  52. }
  53. go conR.broadcastNewRoundStepRoutine()
  54. }
  55. }
  56. // Implements Reactor
  57. func (conR *ConsensusReactor) Stop() {
  58. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  59. log.Info("Stopping ConsensusReactor")
  60. conR.conS.Stop()
  61. close(conR.quit)
  62. }
  63. }
  64. func (conR *ConsensusReactor) IsRunning() bool {
  65. return atomic.LoadUint32(&conR.running) == 1
  66. }
  67. // Implements Reactor
  68. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  69. // TODO optimize
  70. return []*p2p.ChannelDescriptor{
  71. &p2p.ChannelDescriptor{
  72. Id: StateChannel,
  73. Priority: 5,
  74. SendQueueCapacity: 100,
  75. },
  76. &p2p.ChannelDescriptor{
  77. Id: DataChannel,
  78. Priority: 5,
  79. SendQueueCapacity: 2,
  80. },
  81. &p2p.ChannelDescriptor{
  82. Id: VoteChannel,
  83. Priority: 5,
  84. SendQueueCapacity: 40,
  85. },
  86. }
  87. }
  88. // Implements Reactor
  89. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  90. if !conR.IsRunning() {
  91. return
  92. }
  93. // Create peerState for peer
  94. peerState := NewPeerState(peer)
  95. peer.Data.Set(PeerStateKey, peerState)
  96. // Begin gossip routines for this peer.
  97. go conR.gossipDataRoutine(peer, peerState)
  98. go conR.gossipVotesRoutine(peer, peerState)
  99. // Send our state to peer.
  100. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  101. if !conR.fastSync {
  102. conR.sendNewRoundStepMessage(peer)
  103. }
  104. }
  105. // Implements Reactor
  106. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. if !conR.IsRunning() {
  108. return
  109. }
  110. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  111. }
  112. // Implements Reactor
  113. // NOTE: We process these messages even when we're fast_syncing.
  114. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  115. log.Debug("Receive", "channel", chId, "peer", peer, "bytes", msgBytes)
  116. if !conR.IsRunning() {
  117. return
  118. }
  119. // Get round state
  120. rs := conR.conS.GetRoundState()
  121. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  122. _, msg_, err := DecodeMessage(msgBytes)
  123. if err != nil {
  124. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  125. return
  126. }
  127. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "rsHeight", rs.Height) //, "bytes", msgBytes)
  128. switch chId {
  129. case StateChannel:
  130. switch msg := msg_.(type) {
  131. case *NewRoundStepMessage:
  132. ps.ApplyNewRoundStepMessage(msg, rs)
  133. case *CommitStepMessage:
  134. ps.ApplyCommitStepMessage(msg)
  135. case *HasVoteMessage:
  136. ps.ApplyHasVoteMessage(msg)
  137. default:
  138. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  139. }
  140. case DataChannel:
  141. if conR.fastSync {
  142. log.Warn("Ignoring message received during fastSync", "msg", msg_)
  143. return
  144. }
  145. switch msg := msg_.(type) {
  146. case *ProposalMessage:
  147. ps.SetHasProposal(msg.Proposal)
  148. err = conR.conS.SetProposal(msg.Proposal)
  149. case *ProposalPOLMessage:
  150. ps.ApplyProposalPOLMessage(msg)
  151. case *BlockPartMessage:
  152. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  153. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
  154. default:
  155. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  156. }
  157. case VoteChannel:
  158. if conR.fastSync {
  159. log.Warn("Ignoring message received during fastSync", "msg", msg_)
  160. return
  161. }
  162. switch msg := msg_.(type) {
  163. case *VoteMessage:
  164. vote := msg.Vote
  165. var validators *sm.ValidatorSet
  166. if rs.Height == vote.Height {
  167. validators = rs.Validators
  168. } else if rs.Height == vote.Height+1 {
  169. if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
  170. return // Wrong height, not a LastCommit straggler commit.
  171. }
  172. validators = rs.LastValidators
  173. } else {
  174. return // Wrong height. Not necessarily a bad peer.
  175. }
  176. // We have vote/validators. Height may not be rs.Height
  177. address, _ := validators.GetByIndex(msg.ValidatorIndex)
  178. added, index, err := conR.conS.AddVote(address, vote, peer.Key)
  179. if err != nil {
  180. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  181. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  182. log.Warn("Found conflicting vote. Publish evidence")
  183. evidenceTx := &types.DupeoutTx{
  184. Address: address,
  185. VoteA: *errDupe.VoteA,
  186. VoteB: *errDupe.VoteB,
  187. }
  188. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  189. } else {
  190. // Probably an invalid signature. Bad peer.
  191. log.Warn("Error attempting to add vote", "error", err)
  192. // TODO: punish peer
  193. }
  194. }
  195. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  196. ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size())
  197. ps.SetHasVote(vote, index)
  198. if added {
  199. // If rs.Height == vote.Height && rs.Round < vote.Round,
  200. // the peer is sending us CatchupCommit precommits.
  201. // We could make note of this and help filter in broadcastHasVoteMessage().
  202. conR.broadcastHasVoteMessage(vote, index)
  203. }
  204. default:
  205. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  206. }
  207. default:
  208. log.Warn(Fmt("Unknown channel %X", chId))
  209. }
  210. if err != nil {
  211. log.Warn("Error in Receive()", "error", err)
  212. }
  213. }
  214. // Broadcasts HasVoteMessage to peers that care.
  215. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  216. msg := &HasVoteMessage{
  217. Height: vote.Height,
  218. Round: vote.Round,
  219. Type: vote.Type,
  220. Index: index,
  221. }
  222. conR.sw.Broadcast(StateChannel, msg)
  223. /*
  224. // TODO: Make this broadcast more selective.
  225. for _, peer := range conR.sw.Peers().List() {
  226. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  227. prs := ps.GetRoundState()
  228. if prs.Height == vote.Height {
  229. // TODO: Also filter on round?
  230. peer.TrySend(StateChannel, msg)
  231. } else {
  232. // Height doesn't match
  233. // TODO: check a field, maybe CatchupCommitRound?
  234. // TODO: But that requires changing the struct field comment.
  235. }
  236. }
  237. */
  238. }
  239. // Sets our private validator account for signing votes.
  240. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  241. conR.conS.SetPrivValidator(priv)
  242. }
  243. // Switch from the fast_sync to the consensus:
  244. // reset the state, turn off fast_sync, start the consensus-state-machine
  245. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  246. log.Info("SwitchToConsensus")
  247. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  248. // broadcast a NewRoundStepMessage.
  249. conR.conS.updateToState(state, false)
  250. conR.fastSync = false
  251. conR.conS.Start()
  252. }
  253. // implements events.Eventable
  254. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  255. conR.evsw = evsw
  256. conR.conS.SetFireable(evsw)
  257. }
  258. //--------------------------------------
  259. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  260. nrsMsg = &NewRoundStepMessage{
  261. Height: rs.Height,
  262. Round: rs.Round,
  263. Step: rs.Step,
  264. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  265. LastCommitRound: rs.LastCommit.Round(),
  266. }
  267. if rs.Step == RoundStepCommit {
  268. csMsg = &CommitStepMessage{
  269. Height: rs.Height,
  270. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  271. BlockParts: rs.ProposalBlockParts.BitArray(),
  272. }
  273. }
  274. return
  275. }
  276. // Listens for changes to the ConsensusState.Step by pulling
  277. // on conR.conS.NewStepCh().
  278. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  279. for {
  280. // Get RoundState with new Step or quit.
  281. var rs *RoundState
  282. select {
  283. case rs = <-conR.conS.NewStepCh():
  284. case <-conR.quit:
  285. return
  286. }
  287. nrsMsg, csMsg := makeRoundStepMessages(rs)
  288. if nrsMsg != nil {
  289. conR.sw.Broadcast(StateChannel, nrsMsg)
  290. }
  291. if csMsg != nil {
  292. conR.sw.Broadcast(StateChannel, csMsg)
  293. }
  294. }
  295. }
  296. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  297. rs := conR.conS.GetRoundState()
  298. nrsMsg, csMsg := makeRoundStepMessages(rs)
  299. if nrsMsg != nil {
  300. peer.Send(StateChannel, nrsMsg)
  301. }
  302. if csMsg != nil {
  303. peer.Send(StateChannel, csMsg)
  304. }
  305. }
  306. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  307. log := log.New("peer", peer.Key)
  308. OUTER_LOOP:
  309. for {
  310. // Manage disconnects from self or peer.
  311. if !peer.IsRunning() || !conR.IsRunning() {
  312. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  313. return
  314. }
  315. rs := conR.conS.GetRoundState()
  316. prs := ps.GetRoundState()
  317. // Send proposal Block parts?
  318. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  319. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  320. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  321. part := rs.ProposalBlockParts.GetPart(index)
  322. msg := &BlockPartMessage{
  323. Height: rs.Height, // This tells peer that this part applies to us.
  324. Round: rs.Round, // This tells peer that this part applies to us.
  325. Part: part,
  326. }
  327. peer.Send(DataChannel, msg)
  328. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  329. continue OUTER_LOOP
  330. }
  331. }
  332. // If the peer is on a previous height, help catch up.
  333. if (0 < prs.Height) && (prs.Height < rs.Height) {
  334. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  335. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  336. // Ensure that the peer's PartSetHeader is correct
  337. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  338. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  339. log.Debug("Peer ProposalBlockPartsHeader mismatch, sleeping",
  340. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  341. time.Sleep(peerGossipSleepDuration)
  342. continue OUTER_LOOP
  343. }
  344. // Load the part
  345. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  346. if part == nil {
  347. log.Warn("Could not load part", "index", index,
  348. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  349. time.Sleep(peerGossipSleepDuration)
  350. continue OUTER_LOOP
  351. }
  352. // Send the part
  353. msg := &BlockPartMessage{
  354. Height: prs.Height, // Not our height, so it doesn't matter.
  355. Round: prs.Round, // Not our height, so it doesn't matter.
  356. Part: part,
  357. }
  358. peer.Send(DataChannel, msg)
  359. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  360. continue OUTER_LOOP
  361. } else {
  362. //log.Debug("No parts to send in catch-up, sleeping")
  363. time.Sleep(peerGossipSleepDuration)
  364. continue OUTER_LOOP
  365. }
  366. }
  367. // If height and round don't match, sleep.
  368. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  369. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  370. time.Sleep(peerGossipSleepDuration)
  371. continue OUTER_LOOP
  372. }
  373. // By here, height and round match.
  374. // Proposal block parts were already matched and sent if any were wanted.
  375. // (These can match on hash so the round doesn't matter)
  376. // Now consider sending other things, like the Proposal itself.
  377. // Send Proposal && ProposalPOL BitArray?
  378. if rs.Proposal != nil && !prs.Proposal {
  379. // Proposal
  380. {
  381. msg := &ProposalMessage{Proposal: rs.Proposal}
  382. peer.Send(DataChannel, msg)
  383. ps.SetHasProposal(rs.Proposal)
  384. }
  385. // ProposalPOL.
  386. // Peer must receive ProposalMessage first.
  387. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  388. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  389. if 0 <= rs.Proposal.POLRound {
  390. msg := &ProposalPOLMessage{
  391. Height: rs.Height,
  392. ProposalPOLRound: rs.Proposal.POLRound,
  393. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  394. }
  395. peer.Send(DataChannel, msg)
  396. }
  397. continue OUTER_LOOP
  398. }
  399. // Nothing to do. Sleep.
  400. time.Sleep(peerGossipSleepDuration)
  401. continue OUTER_LOOP
  402. }
  403. }
  404. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  405. log := log.New("peer", peer.Key)
  406. // Simple hack to throttle logs upon sleep.
  407. var sleeping = 0
  408. OUTER_LOOP:
  409. for {
  410. // Manage disconnects from self or peer.
  411. if !peer.IsRunning() || !conR.IsRunning() {
  412. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  413. return
  414. }
  415. rs := conR.conS.GetRoundState()
  416. prs := ps.GetRoundState()
  417. switch sleeping {
  418. case 1: // First sleep
  419. sleeping = 2
  420. case 2: // No more sleep
  421. sleeping = 0
  422. }
  423. log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  424. "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  425. // If height matches, then send LastCommit, Prevotes, Precommits.
  426. if rs.Height == prs.Height {
  427. // If there are lastCommits to send...
  428. if prs.Step == RoundStepNewHeight {
  429. if ps.PickSendVote(rs.LastCommit) {
  430. log.Debug("Picked rs.LastCommit to send")
  431. continue OUTER_LOOP
  432. }
  433. }
  434. // If there are prevotes to send...
  435. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  436. if ps.PickSendVote(rs.Votes.Prevotes(rs.Round)) {
  437. log.Debug("Picked rs.Prevotes(rs.Round) to send")
  438. continue OUTER_LOOP
  439. }
  440. }
  441. // If there are precommits to send...
  442. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  443. if ps.PickSendVote(rs.Votes.Precommits(rs.Round)) {
  444. log.Debug("Picked rs.Precommits(rs.Round) to send")
  445. continue OUTER_LOOP
  446. }
  447. }
  448. // If there are prevotes to send for the last round...
  449. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrevote {
  450. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  451. log.Debug("Picked rs.Prevotes(prs.Round) to send")
  452. continue OUTER_LOOP
  453. }
  454. }
  455. // If there are precommits to send for the last round...
  456. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrecommit {
  457. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  458. log.Debug("Picked rs.Precommits(prs.Round) to send")
  459. continue OUTER_LOOP
  460. }
  461. }
  462. // If there are POLPrevotes to send...
  463. if 0 <= prs.ProposalPOLRound {
  464. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  465. if ps.PickSendVote(polPrevotes) {
  466. log.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  467. continue OUTER_LOOP
  468. }
  469. }
  470. }
  471. }
  472. // Special catchup logic.
  473. // If peer is lagging by height 1, send LastCommit.
  474. if prs.Height != 0 && rs.Height == prs.Height+1 {
  475. if ps.PickSendVote(rs.LastCommit) {
  476. log.Debug("Picked rs.LastCommit to send")
  477. continue OUTER_LOOP
  478. }
  479. }
  480. // Catchup logic
  481. // If peer is lagging by more than 1, send Validation.
  482. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  483. // Load the block validation for prs.Height,
  484. // which contains precommit signatures for prs.Height.
  485. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  486. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  487. if ps.PickSendVote(validation) {
  488. log.Debug("Picked Catchup validation to send")
  489. continue OUTER_LOOP
  490. }
  491. }
  492. if sleeping == 0 {
  493. // We sent nothing. Sleep...
  494. sleeping = 1
  495. log.Debug("No votes to send, sleeping", "peer", peer,
  496. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  497. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  498. } else if sleeping == 2 {
  499. // Continued sleep...
  500. sleeping = 1
  501. }
  502. time.Sleep(peerGossipSleepDuration)
  503. continue OUTER_LOOP
  504. }
  505. }
  506. //-----------------------------------------------------------------------------
  507. // Read only when returned by PeerState.GetRoundState().
  508. type PeerRoundState struct {
  509. Height int // Height peer is at
  510. Round int // Round peer is at
  511. Step RoundStepType // Step peer is at
  512. StartTime time.Time // Estimated start of round 0 at this height
  513. Proposal bool // True if peer has proposal for this round
  514. ProposalBlockPartsHeader types.PartSetHeader //
  515. ProposalBlockParts *BitArray //
  516. ProposalPOLRound int // -1 if none
  517. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  518. Prevotes *BitArray // All votes peer has for this round
  519. Precommits *BitArray // All precommits peer has for this round
  520. LastCommitRound int // Round of commit for last height.
  521. LastCommit *BitArray // All commit precommits of commit for last height.
  522. CatchupCommitRound int // Round that we believe commit round is.
  523. CatchupCommit *BitArray // All commit precommits peer has for this height
  524. }
  525. //-----------------------------------------------------------------------------
  526. var (
  527. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  528. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  529. )
  530. type PeerState struct {
  531. Peer *p2p.Peer
  532. mtx sync.Mutex
  533. PeerRoundState
  534. }
  535. func NewPeerState(peer *p2p.Peer) *PeerState {
  536. return &PeerState{Peer: peer}
  537. }
  538. // Returns an atomic snapshot of the PeerRoundState.
  539. // There's no point in mutating it since it won't change PeerState.
  540. func (ps *PeerState) GetRoundState() *PeerRoundState {
  541. ps.mtx.Lock()
  542. defer ps.mtx.Unlock()
  543. prs := ps.PeerRoundState // copy
  544. return &prs
  545. }
  546. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  547. ps.mtx.Lock()
  548. defer ps.mtx.Unlock()
  549. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  550. return
  551. }
  552. if ps.Proposal {
  553. return
  554. }
  555. ps.Proposal = true
  556. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  557. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  558. ps.ProposalPOLRound = proposal.POLRound
  559. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  560. }
  561. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  562. ps.mtx.Lock()
  563. defer ps.mtx.Unlock()
  564. if ps.Height != height || ps.Round != round {
  565. return
  566. }
  567. ps.ProposalBlockParts.SetIndex(index, true)
  568. }
  569. // Convenience function to send vote to peer.
  570. // Returns true if vote was sent.
  571. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  572. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  573. msg := &VoteMessage{index, vote}
  574. ps.Peer.Send(VoteChannel, msg)
  575. return true
  576. }
  577. return false
  578. }
  579. // votes: Must be the correct Size() for the Height().
  580. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  581. ps.mtx.Lock()
  582. defer ps.mtx.Unlock()
  583. if votes.Size() == 0 {
  584. return 0, nil, false
  585. }
  586. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  587. // Lazily set data using 'votes'.
  588. if votes.IsCommit() {
  589. ps.ensureCatchupCommitRound(height, round, size)
  590. }
  591. ps.ensureVoteBitArrays(height, size)
  592. psVotes := ps.getVoteBitArray(height, round, type_)
  593. if psVotes == nil {
  594. return 0, nil, false // Not something worth sending
  595. }
  596. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  597. ps.setHasVote(height, round, type_, index)
  598. return index, votes.GetByIndex(index), true
  599. }
  600. return 0, nil, false
  601. }
  602. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  603. if ps.Height == height {
  604. if ps.Round == round {
  605. switch type_ {
  606. case types.VoteTypePrevote:
  607. return ps.Prevotes
  608. case types.VoteTypePrecommit:
  609. return ps.Precommits
  610. default:
  611. panic(Fmt("Unexpected vote type %X", type_))
  612. }
  613. }
  614. if ps.CatchupCommitRound == round {
  615. switch type_ {
  616. case types.VoteTypePrevote:
  617. return nil
  618. case types.VoteTypePrecommit:
  619. return ps.CatchupCommit
  620. default:
  621. panic(Fmt("Unexpected vote type %X", type_))
  622. }
  623. }
  624. return nil
  625. }
  626. if ps.Height == height+1 {
  627. if ps.LastCommitRound == round {
  628. switch type_ {
  629. case types.VoteTypePrevote:
  630. return nil
  631. case types.VoteTypePrecommit:
  632. return ps.LastCommit
  633. default:
  634. panic(Fmt("Unexpected vote type %X", type_))
  635. }
  636. }
  637. return nil
  638. }
  639. return nil
  640. }
  641. // NOTE: 'round' is what we know to be the commit round for height.
  642. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  643. if ps.Height != height {
  644. return
  645. }
  646. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  647. panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  648. }
  649. if ps.CatchupCommitRound == round {
  650. return // Nothing to do!
  651. }
  652. ps.CatchupCommitRound = round
  653. if round == ps.Round {
  654. ps.CatchupCommit = ps.Precommits
  655. } else {
  656. ps.CatchupCommit = NewBitArray(numValidators)
  657. }
  658. }
  659. // NOTE: It's important to make sure that numValidators actually matches
  660. // what the node sees as the number of validators for height.
  661. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  662. ps.mtx.Lock()
  663. defer ps.mtx.Unlock()
  664. ps.ensureVoteBitArrays(height, numValidators)
  665. }
  666. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  667. if ps.Height == height {
  668. if ps.Prevotes == nil {
  669. ps.Prevotes = NewBitArray(numValidators)
  670. }
  671. if ps.Precommits == nil {
  672. ps.Precommits = NewBitArray(numValidators)
  673. }
  674. if ps.CatchupCommit == nil {
  675. ps.CatchupCommit = NewBitArray(numValidators)
  676. }
  677. if ps.ProposalPOL == nil {
  678. ps.ProposalPOL = NewBitArray(numValidators)
  679. }
  680. } else if ps.Height == height+1 {
  681. if ps.LastCommit == nil {
  682. ps.LastCommit = NewBitArray(numValidators)
  683. }
  684. }
  685. }
  686. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  687. ps.mtx.Lock()
  688. defer ps.mtx.Unlock()
  689. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  690. }
  691. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  692. log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
  693. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  694. panic("Invalid vote type") // SANITY
  695. }
  696. if ps.Height == height {
  697. if ps.Round == round {
  698. switch type_ {
  699. case types.VoteTypePrevote:
  700. ps.Prevotes.SetIndex(index, true)
  701. log.Debug("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  702. case types.VoteTypePrecommit:
  703. ps.Precommits.SetIndex(index, true)
  704. log.Debug("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  705. }
  706. } else if ps.CatchupCommitRound == round {
  707. switch type_ {
  708. case types.VoteTypePrevote:
  709. case types.VoteTypePrecommit:
  710. ps.CatchupCommit.SetIndex(index, true)
  711. log.Debug("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  712. }
  713. } else if ps.ProposalPOLRound == round {
  714. switch type_ {
  715. case types.VoteTypePrevote:
  716. ps.ProposalPOL.SetIndex(index, true)
  717. log.Debug("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  718. case types.VoteTypePrecommit:
  719. }
  720. }
  721. } else if ps.Height == height+1 {
  722. if ps.LastCommitRound == round {
  723. switch type_ {
  724. case types.VoteTypePrevote:
  725. case types.VoteTypePrecommit:
  726. ps.LastCommit.SetIndex(index, true)
  727. log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  728. }
  729. }
  730. } else {
  731. // Does not apply.
  732. }
  733. }
  734. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  735. ps.mtx.Lock()
  736. defer ps.mtx.Unlock()
  737. // Ignore duplicate messages.
  738. if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step {
  739. return
  740. }
  741. // Just remember these values.
  742. psHeight := ps.Height
  743. psRound := ps.Round
  744. //psStep := ps.Step
  745. psCatchupCommitRound := ps.CatchupCommitRound
  746. psCatchupCommit := ps.CatchupCommit
  747. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  748. ps.Height = msg.Height
  749. ps.Round = msg.Round
  750. ps.Step = msg.Step
  751. ps.StartTime = startTime
  752. if psHeight != msg.Height || psRound != msg.Round {
  753. ps.Proposal = false
  754. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  755. ps.ProposalBlockParts = nil
  756. ps.ProposalPOLRound = -1
  757. ps.ProposalPOL = nil
  758. // We'll update the BitArray capacity later.
  759. ps.Prevotes = nil
  760. ps.Precommits = nil
  761. }
  762. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  763. // Peer caught up to CatchupCommitRound.
  764. // Preserve psCatchupCommit!
  765. // NOTE: We prefer to use prs.Precommits if
  766. // pr.Round matches pr.CatchupCommitRound.
  767. ps.Precommits = psCatchupCommit
  768. }
  769. if psHeight != msg.Height {
  770. // Shift Precommits to LastCommit.
  771. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  772. ps.LastCommitRound = msg.LastCommitRound
  773. ps.LastCommit = ps.Precommits
  774. } else {
  775. ps.LastCommitRound = msg.LastCommitRound
  776. ps.LastCommit = nil
  777. }
  778. // We'll update the BitArray capacity later.
  779. ps.CatchupCommitRound = -1
  780. ps.CatchupCommit = nil
  781. }
  782. }
  783. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  784. ps.mtx.Lock()
  785. defer ps.mtx.Unlock()
  786. if ps.Height != msg.Height {
  787. return
  788. }
  789. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  790. ps.ProposalBlockParts = msg.BlockParts
  791. }
  792. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  793. ps.mtx.Lock()
  794. defer ps.mtx.Unlock()
  795. if ps.Height != msg.Height {
  796. return
  797. }
  798. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  799. }
  800. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  801. ps.mtx.Lock()
  802. defer ps.mtx.Unlock()
  803. if ps.Height != msg.Height {
  804. return
  805. }
  806. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  807. return
  808. }
  809. // TODO: Merge onto existing ps.ProposalPOL?
  810. // We might have sent some prevotes in the meantime.
  811. ps.ProposalPOL = msg.ProposalPOL
  812. }
  813. //-----------------------------------------------------------------------------
  814. // Messages
  815. const (
  816. msgTypeNewRoundStep = byte(0x01)
  817. msgTypeCommitStep = byte(0x02)
  818. msgTypeProposal = byte(0x11)
  819. msgTypeProposalPOL = byte(0x12)
  820. msgTypeBlockPart = byte(0x13) // both block & POL
  821. msgTypeVote = byte(0x14)
  822. msgTypeHasVote = byte(0x15)
  823. )
  824. type ConsensusMessage interface{}
  825. var _ = binary.RegisterInterface(
  826. struct{ ConsensusMessage }{},
  827. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  828. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  829. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  830. binary.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  831. binary.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  832. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  833. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  834. )
  835. // TODO: check for unnecessary extra bytes at the end.
  836. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  837. msgType = bz[0]
  838. n := new(int64)
  839. r := bytes.NewReader(bz)
  840. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  841. return
  842. }
  843. //-------------------------------------
  844. // For every height/round/step transition
  845. type NewRoundStepMessage struct {
  846. Height int
  847. Round int
  848. Step RoundStepType
  849. SecondsSinceStartTime int
  850. LastCommitRound int
  851. }
  852. func (m *NewRoundStepMessage) String() string {
  853. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  854. m.Height, m.Round, m.Step, m.LastCommitRound)
  855. }
  856. //-------------------------------------
  857. type CommitStepMessage struct {
  858. Height int
  859. BlockPartsHeader types.PartSetHeader
  860. BlockParts *BitArray
  861. }
  862. func (m *CommitStepMessage) String() string {
  863. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  864. }
  865. //-------------------------------------
  866. type ProposalMessage struct {
  867. Proposal *Proposal
  868. }
  869. func (m *ProposalMessage) String() string {
  870. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  871. }
  872. //-------------------------------------
  873. type ProposalPOLMessage struct {
  874. Height int
  875. ProposalPOLRound int
  876. ProposalPOL *BitArray
  877. }
  878. func (m *ProposalPOLMessage) String() string {
  879. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  880. }
  881. //-------------------------------------
  882. type BlockPartMessage struct {
  883. Height int
  884. Round int
  885. Part *types.Part
  886. }
  887. func (m *BlockPartMessage) String() string {
  888. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  889. }
  890. //-------------------------------------
  891. type VoteMessage struct {
  892. ValidatorIndex int
  893. Vote *types.Vote
  894. }
  895. func (m *VoteMessage) String() string {
  896. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  897. }
  898. //-------------------------------------
  899. type HasVoteMessage struct {
  900. Height int
  901. Round int
  902. Type byte
  903. Index int
  904. }
  905. func (m *HasVoteMessage) String() string {
  906. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  907. }