You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1041 lines
30 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. PeerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. rebroadcastRoundStepDuration = 1000 * time.Millisecond // Time to sleep if there's nothing to send.
  26. )
  27. //-----------------------------------------------------------------------------
  28. // The reactor's underlying ConsensusState may change state at any time.
  29. // We atomically copy the RoundState struct before using it.
  30. type ConsensusReactor struct {
  31. sw *p2p.Switch
  32. running uint32
  33. quit chan struct{}
  34. blockStore *bc.BlockStore
  35. conS *ConsensusState
  36. // if fast sync is running we don't really do anything
  37. sync bool
  38. evsw events.Fireable
  39. }
  40. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor {
  41. conR := &ConsensusReactor{
  42. quit: make(chan struct{}),
  43. blockStore: blockStore,
  44. conS: consensusState,
  45. sync: sync,
  46. }
  47. return conR
  48. }
  49. // Implements Reactor
  50. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  51. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  52. log.Info("Starting ConsensusReactor")
  53. conR.sw = sw
  54. if !conR.sync {
  55. conR.conS.Start()
  56. }
  57. go conR.broadcastNewRoundStepRoutine()
  58. go conR.rebroadcastRoundStepRoutine()
  59. }
  60. }
  61. // Implements Reactor
  62. func (conR *ConsensusReactor) Stop() {
  63. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  64. log.Info("Stopping ConsensusReactor")
  65. conR.conS.Stop()
  66. close(conR.quit)
  67. }
  68. }
  69. func (conR *ConsensusReactor) IsRunning() bool {
  70. return atomic.LoadUint32(&conR.running) == 1
  71. }
  72. // Implements Reactor
  73. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  74. // TODO optimize
  75. return []*p2p.ChannelDescriptor{
  76. &p2p.ChannelDescriptor{
  77. Id: StateChannel,
  78. Priority: 5,
  79. SendQueueCapacity: 100,
  80. },
  81. &p2p.ChannelDescriptor{
  82. Id: DataChannel,
  83. Priority: 5,
  84. SendQueueCapacity: 2,
  85. },
  86. &p2p.ChannelDescriptor{
  87. Id: VoteChannel,
  88. Priority: 5,
  89. SendQueueCapacity: 40,
  90. },
  91. }
  92. }
  93. // Implements Reactor
  94. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  95. if !conR.IsRunning() {
  96. return
  97. }
  98. // Create peerState for peer
  99. peerState := NewPeerState(peer)
  100. peer.Data.Set(PeerStateKey, peerState)
  101. // Begin gossip routines for this peer.
  102. go conR.gossipDataRoutine(peer, peerState)
  103. go conR.gossipVotesRoutine(peer, peerState)
  104. // Send our state to peer.
  105. conR.sendNewRoundStepMessage(peer)
  106. }
  107. // Implements Reactor
  108. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  109. if !conR.IsRunning() {
  110. return
  111. }
  112. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  113. }
  114. // Implements Reactor
  115. // NOTE: We process these messages even when we're fast_syncing.
  116. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  117. log.Debug("Receive", "channel", chId, "peer", peer, "bytes", msgBytes)
  118. if !conR.IsRunning() {
  119. return
  120. }
  121. // Get round state
  122. rs := conR.conS.GetRoundState()
  123. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  124. _, msg_, err := DecodeMessage(msgBytes)
  125. if err != nil {
  126. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  127. return
  128. }
  129. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_) //, "bytes", msgBytes)
  130. switch chId {
  131. case StateChannel:
  132. switch msg := msg_.(type) {
  133. case *NewRoundStepMessage:
  134. ps.ApplyNewRoundStepMessage(msg, rs)
  135. case *CommitStepMessage:
  136. ps.ApplyCommitStepMessage(msg)
  137. case *HasVoteMessage:
  138. ps.ApplyHasVoteMessage(msg)
  139. default:
  140. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  141. }
  142. case DataChannel:
  143. switch msg := msg_.(type) {
  144. case *ProposalMessage:
  145. ps.SetHasProposal(msg.Proposal)
  146. err = conR.conS.SetProposal(msg.Proposal)
  147. case *ProposalPOLMessage:
  148. ps.ApplyProposalPOLMessage(msg)
  149. case *BlockPartMessage:
  150. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  151. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
  152. default:
  153. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  154. }
  155. case VoteChannel:
  156. switch msg := msg_.(type) {
  157. case *VoteMessage:
  158. vote := msg.Vote
  159. var validators *sm.ValidatorSet
  160. if rs.Height == vote.Height {
  161. validators = rs.Validators
  162. } else if rs.Height == vote.Height+1 {
  163. if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
  164. return // Wrong height, not a LastCommit straggler commit.
  165. }
  166. validators = rs.LastValidators
  167. } else {
  168. return // Wrong height. Not necessarily a bad peer.
  169. }
  170. // We have vote/validators. Height may not be rs.Height
  171. address, _ := validators.GetByIndex(msg.ValidatorIndex)
  172. added, index, err := conR.conS.AddVote(address, vote, peer.Key)
  173. if err != nil {
  174. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  175. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  176. log.Warn("Found conflicting vote. Publish evidence")
  177. evidenceTx := &types.DupeoutTx{
  178. Address: address,
  179. VoteA: *errDupe.VoteA,
  180. VoteB: *errDupe.VoteB,
  181. }
  182. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  183. } else {
  184. // Probably an invalid signature. Bad peer.
  185. log.Warn("Error attempting to add vote", "error", err)
  186. // TODO: punish peer
  187. }
  188. }
  189. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  190. ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size())
  191. ps.SetHasVote(vote, index)
  192. if added {
  193. // If rs.Height == vote.Height && rs.Round < vote.Round,
  194. // the peer is sending us CatchupCommit precommits.
  195. // We could make note of this and help filter in broadcastHasVoteMessage().
  196. conR.broadcastHasVoteMessage(vote, index)
  197. }
  198. default:
  199. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  200. }
  201. default:
  202. log.Warn(Fmt("Unknown channel %X", chId))
  203. }
  204. if err != nil {
  205. log.Warn("Error in Receive()", "error", err)
  206. }
  207. }
  208. // Broadcasts HasVoteMessage to peers that care.
  209. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  210. msg := &HasVoteMessage{
  211. Height: vote.Height,
  212. Round: vote.Round,
  213. Type: vote.Type,
  214. Index: index,
  215. }
  216. conR.sw.Broadcast(StateChannel, msg)
  217. /*
  218. // TODO: Make this broadcast more selective.
  219. for _, peer := range conR.sw.Peers().List() {
  220. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  221. prs := ps.GetRoundState()
  222. if prs.Height == vote.Height {
  223. // TODO: Also filter on round?
  224. peer.TrySend(StateChannel, msg)
  225. } else {
  226. // Height doesn't match
  227. // TODO: check a field, maybe CatchupCommitRound?
  228. // TODO: But that requires changing the struct field comment.
  229. }
  230. }
  231. */
  232. }
  233. // Sets our private validator account for signing votes.
  234. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  235. conR.conS.SetPrivValidator(priv)
  236. }
  237. // Switch from the fast sync to the consensus:
  238. // reset the state, turn off fast sync, start the consensus-state-machine
  239. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  240. conR.conS.updateToState(state, false)
  241. conR.sync = false
  242. conR.conS.Start()
  243. }
  244. // implements events.Eventable
  245. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  246. conR.evsw = evsw
  247. conR.conS.SetFireable(evsw)
  248. }
  249. //--------------------------------------
  250. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  251. nrsMsg = &NewRoundStepMessage{
  252. Height: rs.Height,
  253. Round: rs.Round,
  254. Step: rs.Step,
  255. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  256. LastCommitRound: rs.LastCommit.Round(),
  257. }
  258. if rs.Step == RoundStepCommit {
  259. csMsg = &CommitStepMessage{
  260. Height: rs.Height,
  261. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  262. BlockParts: rs.ProposalBlockParts.BitArray(),
  263. }
  264. }
  265. return
  266. }
  267. // Listens for changes to the ConsensusState.Step by pulling
  268. // on conR.conS.NewStepCh().
  269. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  270. for {
  271. // Get RoundState with new Step or quit.
  272. var rs *RoundState
  273. select {
  274. case rs = <-conR.conS.NewStepCh():
  275. case <-conR.quit:
  276. return
  277. }
  278. nrsMsg, csMsg := makeRoundStepMessages(rs)
  279. if nrsMsg != nil {
  280. conR.sw.Broadcast(StateChannel, nrsMsg)
  281. }
  282. if csMsg != nil {
  283. conR.sw.Broadcast(StateChannel, csMsg)
  284. }
  285. }
  286. }
  287. // Periodically broadcast NewRoundStepMessage.
  288. // This is a hack. TODO remove the need for it?
  289. // The issue is with Start() happening after a NewRoundStep message
  290. // was received from a peer, for the bootstrapping set.
  291. func (conR *ConsensusReactor) rebroadcastRoundStepRoutine() {
  292. for {
  293. time.Sleep(rebroadcastRoundStepDuration)
  294. rs := conR.conS.GetRoundState()
  295. nrsMsg, csMsg := makeRoundStepMessages(rs)
  296. if nrsMsg != nil {
  297. conR.sw.Broadcast(StateChannel, nrsMsg)
  298. }
  299. if csMsg != nil {
  300. conR.sw.Broadcast(StateChannel, csMsg)
  301. }
  302. }
  303. }
  304. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  305. rs := conR.conS.GetRoundState()
  306. nrsMsg, csMsg := makeRoundStepMessages(rs)
  307. if nrsMsg != nil {
  308. peer.Send(StateChannel, nrsMsg)
  309. }
  310. if csMsg != nil {
  311. peer.Send(StateChannel, csMsg)
  312. }
  313. }
  314. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  315. log := log.New("peer", peer.Key)
  316. OUTER_LOOP:
  317. for {
  318. // Manage disconnects from self or peer.
  319. if !peer.IsRunning() || !conR.IsRunning() {
  320. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  321. return
  322. }
  323. rs := conR.conS.GetRoundState()
  324. prs := ps.GetRoundState()
  325. // Send proposal Block parts?
  326. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  327. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  328. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  329. part := rs.ProposalBlockParts.GetPart(index)
  330. msg := &BlockPartMessage{
  331. Height: rs.Height, // This tells peer that this part applies to us.
  332. Round: rs.Round, // This tells peer that this part applies to us.
  333. Part: part,
  334. }
  335. peer.Send(DataChannel, msg)
  336. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  337. continue OUTER_LOOP
  338. }
  339. }
  340. // If the peer is on a previous height, help catch up.
  341. if (0 < prs.Height) && (prs.Height < rs.Height) {
  342. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  343. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  344. // Ensure that the peer's PartSetHeader is correct
  345. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  346. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  347. log.Debug("Peer ProposalBlockPartsHeader mismatch, sleeping",
  348. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  349. time.Sleep(peerGossipSleepDuration)
  350. continue OUTER_LOOP
  351. }
  352. // Load the part
  353. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  354. if part == nil {
  355. log.Warn("Could not load part", "index", index,
  356. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  357. time.Sleep(peerGossipSleepDuration)
  358. continue OUTER_LOOP
  359. }
  360. // Send the part
  361. msg := &BlockPartMessage{
  362. Height: prs.Height, // Not our height, so it doesn't matter.
  363. Round: prs.Round, // Not our height, so it doesn't matter.
  364. Part: part,
  365. }
  366. peer.Send(DataChannel, msg)
  367. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  368. continue OUTER_LOOP
  369. } else {
  370. //log.Debug("No parts to send in catch-up, sleeping")
  371. time.Sleep(peerGossipSleepDuration)
  372. continue OUTER_LOOP
  373. }
  374. }
  375. // If height and round don't match, sleep.
  376. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  377. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  378. time.Sleep(peerGossipSleepDuration)
  379. continue OUTER_LOOP
  380. }
  381. // By here, height and round match.
  382. // Proposal block parts were already matched and sent if any were wanted.
  383. // (These can match on hash so the round doesn't matter)
  384. // Now consider sending other things, like the Proposal itself.
  385. // Send Proposal && ProposalPOL BitArray?
  386. if rs.Proposal != nil && !prs.Proposal {
  387. // Proposal
  388. {
  389. msg := &ProposalMessage{Proposal: rs.Proposal}
  390. peer.Send(DataChannel, msg)
  391. ps.SetHasProposal(rs.Proposal)
  392. }
  393. // ProposalPOL.
  394. // Peer must receive ProposalMessage first.
  395. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  396. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  397. if 0 <= rs.Proposal.POLRound {
  398. msg := &ProposalPOLMessage{
  399. Height: rs.Height,
  400. ProposalPOLRound: rs.Proposal.POLRound,
  401. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  402. }
  403. peer.Send(DataChannel, msg)
  404. }
  405. continue OUTER_LOOP
  406. }
  407. // Nothing to do. Sleep.
  408. time.Sleep(peerGossipSleepDuration)
  409. continue OUTER_LOOP
  410. }
  411. }
  412. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  413. log := log.New("peer", peer.Key)
  414. // Simple hack to throttle logs upon sleep.
  415. var sleeping = 0
  416. OUTER_LOOP:
  417. for {
  418. // Manage disconnects from self or peer.
  419. if !peer.IsRunning() || !conR.IsRunning() {
  420. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  421. return
  422. }
  423. rs := conR.conS.GetRoundState()
  424. prs := ps.GetRoundState()
  425. switch sleeping {
  426. case 1: // First sleep
  427. sleeping = 2
  428. case 2: // No more sleep
  429. sleeping = 0
  430. }
  431. log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  432. "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  433. // If height matches, then send LastCommit, Prevotes, Precommits.
  434. if rs.Height == prs.Height {
  435. // If there are lastCommits to send...
  436. if prs.Step == RoundStepNewHeight {
  437. if ps.PickSendVote(rs.LastCommit) {
  438. log.Debug("Picked rs.LastCommit to send")
  439. continue OUTER_LOOP
  440. }
  441. }
  442. // If there are prevotes to send...
  443. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  444. if ps.PickSendVote(rs.Votes.Prevotes(rs.Round)) {
  445. log.Debug("Picked rs.Prevotes(rs.Round) to send")
  446. continue OUTER_LOOP
  447. }
  448. }
  449. // If there are precommits to send...
  450. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  451. if ps.PickSendVote(rs.Votes.Precommits(rs.Round)) {
  452. log.Debug("Picked rs.Precommits(rs.Round) to send")
  453. continue OUTER_LOOP
  454. }
  455. }
  456. // If there are prevotes to send for the last round...
  457. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrevote {
  458. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  459. log.Debug("Picked rs.Prevotes(prs.Round) to send")
  460. continue OUTER_LOOP
  461. }
  462. }
  463. // If there are precommits to send for the last round...
  464. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrecommit {
  465. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  466. log.Debug("Picked rs.Precommits(prs.Round) to send")
  467. continue OUTER_LOOP
  468. }
  469. }
  470. // If there are POLPrevotes to send...
  471. if 0 <= prs.ProposalPOLRound {
  472. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  473. if ps.PickSendVote(polPrevotes) {
  474. log.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  475. continue OUTER_LOOP
  476. }
  477. }
  478. }
  479. }
  480. // Special catchup logic.
  481. // If peer is lagging by height 1, send LastCommit.
  482. if prs.Height != 0 && rs.Height == prs.Height+1 {
  483. if ps.PickSendVote(rs.LastCommit) {
  484. log.Debug("Picked rs.LastCommit to send")
  485. continue OUTER_LOOP
  486. }
  487. }
  488. // Catchup logic
  489. // If peer is lagging by more than 1, send Validation.
  490. if prs.Height != 0 && rs.Height <= prs.Height+2 {
  491. // Load the block validation for prs.Height,
  492. // which contains precommit signatures for prs.Height.
  493. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  494. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  495. if ps.PickSendVote(validation) {
  496. log.Debug("Picked Catchup validation to send")
  497. continue OUTER_LOOP
  498. }
  499. }
  500. if sleeping == 0 {
  501. // We sent nothing. Sleep...
  502. sleeping = 1
  503. log.Debug("No votes to send, sleeping", "peer", peer,
  504. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  505. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  506. } else if sleeping == 2 {
  507. // Continued sleep...
  508. sleeping = 1
  509. }
  510. time.Sleep(peerGossipSleepDuration)
  511. continue OUTER_LOOP
  512. }
  513. }
  514. //-----------------------------------------------------------------------------
  515. // Read only when returned by PeerState.GetRoundState().
  516. type PeerRoundState struct {
  517. Height int // Height peer is at
  518. Round int // Round peer is at
  519. Step RoundStepType // Step peer is at
  520. StartTime time.Time // Estimated start of round 0 at this height
  521. Proposal bool // True if peer has proposal for this round
  522. ProposalBlockPartsHeader types.PartSetHeader //
  523. ProposalBlockParts *BitArray //
  524. ProposalPOLRound int // -1 if none
  525. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  526. Prevotes *BitArray // All votes peer has for this round
  527. Precommits *BitArray // All precommits peer has for this round
  528. LastCommitRound int // Round of commit for last height.
  529. LastCommit *BitArray // All commit precommits of commit for last height.
  530. CatchupCommitRound int // Round that we believe commit round is.
  531. CatchupCommit *BitArray // All commit precommits peer has for this height
  532. }
  533. //-----------------------------------------------------------------------------
  534. var (
  535. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  536. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  537. )
  538. type PeerState struct {
  539. Peer *p2p.Peer
  540. mtx sync.Mutex
  541. PeerRoundState
  542. }
  543. func NewPeerState(peer *p2p.Peer) *PeerState {
  544. return &PeerState{Peer: peer}
  545. }
  546. // Returns an atomic snapshot of the PeerRoundState.
  547. // There's no point in mutating it since it won't change PeerState.
  548. func (ps *PeerState) GetRoundState() *PeerRoundState {
  549. ps.mtx.Lock()
  550. defer ps.mtx.Unlock()
  551. prs := ps.PeerRoundState // copy
  552. return &prs
  553. }
  554. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  555. ps.mtx.Lock()
  556. defer ps.mtx.Unlock()
  557. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  558. return
  559. }
  560. if ps.Proposal {
  561. return
  562. }
  563. ps.Proposal = true
  564. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  565. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  566. ps.ProposalPOLRound = proposal.POLRound
  567. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  568. }
  569. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  570. ps.mtx.Lock()
  571. defer ps.mtx.Unlock()
  572. if ps.Height != height || ps.Round != round {
  573. return
  574. }
  575. ps.ProposalBlockParts.SetIndex(index, true)
  576. }
  577. // Convenience function to send vote to peer.
  578. // Returns true if vote was sent.
  579. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  580. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  581. msg := &VoteMessage{index, vote}
  582. ps.Peer.Send(VoteChannel, msg)
  583. return true
  584. }
  585. return false
  586. }
  587. // votes: Must be the correct Size() for the Height().
  588. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  589. ps.mtx.Lock()
  590. defer ps.mtx.Unlock()
  591. if votes.Size() == 0 {
  592. return 0, nil, false
  593. }
  594. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  595. // Lazily set data using 'votes'.
  596. if votes.IsCommit() {
  597. ps.ensureCatchupCommitRound(height, round, size)
  598. }
  599. ps.ensureVoteBitArrays(height, size)
  600. psVotes := ps.getVoteBitArray(height, round, type_)
  601. if psVotes == nil {
  602. return 0, nil, false // Not something worth sending
  603. }
  604. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  605. ps.setHasVote(height, round, type_, index)
  606. return index, votes.GetByIndex(index), true
  607. }
  608. return 0, nil, false
  609. }
  610. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  611. if ps.Height == height {
  612. if ps.Round == round {
  613. switch type_ {
  614. case types.VoteTypePrevote:
  615. return ps.Prevotes
  616. case types.VoteTypePrecommit:
  617. return ps.Precommits
  618. default:
  619. panic(Fmt("Unexpected vote type %X", type_))
  620. }
  621. }
  622. if ps.CatchupCommitRound == round {
  623. switch type_ {
  624. case types.VoteTypePrevote:
  625. return nil
  626. case types.VoteTypePrecommit:
  627. return ps.CatchupCommit
  628. default:
  629. panic(Fmt("Unexpected vote type %X", type_))
  630. }
  631. }
  632. return nil
  633. }
  634. if ps.Height == height+1 {
  635. if ps.LastCommitRound == round {
  636. switch type_ {
  637. case types.VoteTypePrevote:
  638. return nil
  639. case types.VoteTypePrecommit:
  640. return ps.LastCommit
  641. default:
  642. panic(Fmt("Unexpected vote type %X", type_))
  643. }
  644. }
  645. return nil
  646. }
  647. return nil
  648. }
  649. // NOTE: 'round' is what we know to be the commit round for height.
  650. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  651. if ps.Height != height {
  652. return
  653. }
  654. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  655. panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  656. }
  657. if ps.CatchupCommitRound == round {
  658. return // Nothing to do!
  659. }
  660. ps.CatchupCommitRound = round
  661. if round == ps.Round {
  662. ps.CatchupCommit = ps.Precommits
  663. } else {
  664. ps.CatchupCommit = NewBitArray(numValidators)
  665. }
  666. }
  667. // NOTE: It's important to make sure that numValidators actually matches
  668. // what the node sees as the number of validators for height.
  669. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  670. ps.mtx.Lock()
  671. defer ps.mtx.Unlock()
  672. ps.ensureVoteBitArrays(height, numValidators)
  673. }
  674. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  675. if ps.Height == height {
  676. if ps.Prevotes == nil {
  677. ps.Prevotes = NewBitArray(numValidators)
  678. }
  679. if ps.Precommits == nil {
  680. ps.Precommits = NewBitArray(numValidators)
  681. }
  682. if ps.CatchupCommit == nil {
  683. ps.CatchupCommit = NewBitArray(numValidators)
  684. }
  685. if ps.ProposalPOL == nil {
  686. ps.ProposalPOL = NewBitArray(numValidators)
  687. }
  688. } else if ps.Height == height+1 {
  689. if ps.LastCommit == nil {
  690. ps.LastCommit = NewBitArray(numValidators)
  691. }
  692. }
  693. }
  694. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  695. ps.mtx.Lock()
  696. defer ps.mtx.Unlock()
  697. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  698. }
  699. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  700. log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
  701. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  702. panic("Invalid vote type") // SANITY
  703. }
  704. if ps.Height == height {
  705. if ps.Round == round {
  706. switch type_ {
  707. case types.VoteTypePrevote:
  708. ps.Prevotes.SetIndex(index, true)
  709. log.Debug("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  710. case types.VoteTypePrecommit:
  711. ps.Precommits.SetIndex(index, true)
  712. log.Debug("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  713. }
  714. } else if ps.CatchupCommitRound == round {
  715. switch type_ {
  716. case types.VoteTypePrevote:
  717. case types.VoteTypePrecommit:
  718. ps.CatchupCommit.SetIndex(index, true)
  719. log.Debug("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  720. }
  721. } else if ps.ProposalPOLRound == round {
  722. switch type_ {
  723. case types.VoteTypePrevote:
  724. ps.ProposalPOL.SetIndex(index, true)
  725. log.Debug("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  726. case types.VoteTypePrecommit:
  727. }
  728. }
  729. } else if ps.Height == height+1 {
  730. if ps.LastCommitRound == round {
  731. switch type_ {
  732. case types.VoteTypePrevote:
  733. case types.VoteTypePrecommit:
  734. ps.LastCommit.SetIndex(index, true)
  735. log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  736. }
  737. }
  738. } else {
  739. // Does not apply.
  740. }
  741. }
  742. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  743. ps.mtx.Lock()
  744. defer ps.mtx.Unlock()
  745. // Ignore duplicate messages.
  746. // TODO: This is only necessary because rebroadcastRoundStepRoutine.
  747. if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step {
  748. return
  749. }
  750. // Just remember these values.
  751. psHeight := ps.Height
  752. psRound := ps.Round
  753. //psStep := ps.Step
  754. psCatchupCommitRound := ps.CatchupCommitRound
  755. psCatchupCommit := ps.CatchupCommit
  756. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  757. ps.Height = msg.Height
  758. ps.Round = msg.Round
  759. ps.Step = msg.Step
  760. ps.StartTime = startTime
  761. if psHeight != msg.Height || psRound != msg.Round {
  762. ps.Proposal = false
  763. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  764. ps.ProposalBlockParts = nil
  765. ps.ProposalPOLRound = -1
  766. ps.ProposalPOL = nil
  767. // We'll update the BitArray capacity later.
  768. ps.Prevotes = nil
  769. ps.Precommits = nil
  770. }
  771. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  772. // Peer caught up to CatchupCommitRound.
  773. // Preserve psCatchupCommit!
  774. // NOTE: We prefer to use prs.Precommits if
  775. // pr.Round matches pr.CatchupCommitRound.
  776. ps.Precommits = psCatchupCommit
  777. }
  778. if psHeight != msg.Height {
  779. // Shift Precommits to LastCommit.
  780. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  781. ps.LastCommitRound = msg.LastCommitRound
  782. ps.LastCommit = ps.Precommits
  783. } else {
  784. ps.LastCommitRound = msg.LastCommitRound
  785. ps.LastCommit = nil
  786. }
  787. // We'll update the BitArray capacity later.
  788. ps.CatchupCommitRound = -1
  789. ps.CatchupCommit = nil
  790. }
  791. }
  792. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  793. ps.mtx.Lock()
  794. defer ps.mtx.Unlock()
  795. if ps.Height != msg.Height {
  796. return
  797. }
  798. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  799. ps.ProposalBlockParts = msg.BlockParts
  800. }
  801. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  802. ps.mtx.Lock()
  803. defer ps.mtx.Unlock()
  804. if ps.Height != msg.Height {
  805. return
  806. }
  807. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  808. }
  809. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  810. ps.mtx.Lock()
  811. defer ps.mtx.Unlock()
  812. if ps.Height != msg.Height {
  813. return
  814. }
  815. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  816. return
  817. }
  818. // TODO: Merge onto existing ps.ProposalPOL?
  819. // We might have sent some prevotes in the meantime.
  820. ps.ProposalPOL = msg.ProposalPOL
  821. }
  822. //-----------------------------------------------------------------------------
  823. // Messages
  824. const (
  825. msgTypeNewRoundStep = byte(0x01)
  826. msgTypeCommitStep = byte(0x02)
  827. msgTypeProposal = byte(0x11)
  828. msgTypeProposalPOL = byte(0x12)
  829. msgTypeBlockPart = byte(0x13) // both block & POL
  830. msgTypeVote = byte(0x14)
  831. msgTypeHasVote = byte(0x15)
  832. )
  833. type ConsensusMessage interface{}
  834. var _ = binary.RegisterInterface(
  835. struct{ ConsensusMessage }{},
  836. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  837. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  838. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  839. binary.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  840. binary.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  841. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  842. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  843. )
  844. // TODO: check for unnecessary extra bytes at the end.
  845. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  846. msgType = bz[0]
  847. n := new(int64)
  848. r := bytes.NewReader(bz)
  849. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  850. return
  851. }
  852. //-------------------------------------
  853. // For every height/round/step transition
  854. type NewRoundStepMessage struct {
  855. Height int
  856. Round int
  857. Step RoundStepType
  858. SecondsSinceStartTime int
  859. LastCommitRound int
  860. }
  861. func (m *NewRoundStepMessage) String() string {
  862. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  863. m.Height, m.Round, m.Step, m.LastCommitRound)
  864. }
  865. //-------------------------------------
  866. type CommitStepMessage struct {
  867. Height int
  868. BlockPartsHeader types.PartSetHeader
  869. BlockParts *BitArray
  870. }
  871. func (m *CommitStepMessage) String() string {
  872. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  873. }
  874. //-------------------------------------
  875. type ProposalMessage struct {
  876. Proposal *Proposal
  877. }
  878. func (m *ProposalMessage) String() string {
  879. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  880. }
  881. //-------------------------------------
  882. type ProposalPOLMessage struct {
  883. Height int
  884. ProposalPOLRound int
  885. ProposalPOL *BitArray
  886. }
  887. func (m *ProposalPOLMessage) String() string {
  888. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  889. }
  890. //-------------------------------------
  891. type BlockPartMessage struct {
  892. Height int
  893. Round int
  894. Part *types.Part
  895. }
  896. func (m *BlockPartMessage) String() string {
  897. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  898. }
  899. //-------------------------------------
  900. type VoteMessage struct {
  901. ValidatorIndex int
  902. Vote *types.Vote
  903. }
  904. func (m *VoteMessage) String() string {
  905. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  906. }
  907. //-------------------------------------
  908. type HasVoteMessage struct {
  909. Height int
  910. Round int
  911. Type byte
  912. Index int
  913. }
  914. func (m *HasVoteMessage) String() string {
  915. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  916. }