You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

954 lines
28 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. PeerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. )
  26. //-----------------------------------------------------------------------------
  27. // The reactor's underlying ConsensusState may change state at any time.
  28. // We atomically copy the RoundState struct before using it.
  29. type ConsensusReactor struct {
  30. sw *p2p.Switch
  31. running uint32
  32. quit chan struct{}
  33. blockStore *bc.BlockStore
  34. conS *ConsensusState
  35. // if fast sync is running we don't really do anything
  36. sync bool
  37. evsw events.Fireable
  38. }
  39. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor {
  40. conR := &ConsensusReactor{
  41. quit: make(chan struct{}),
  42. blockStore: blockStore,
  43. conS: consensusState,
  44. sync: sync,
  45. }
  46. return conR
  47. }
  48. // Implements Reactor
  49. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  50. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  51. log.Info("Starting ConsensusReactor")
  52. conR.sw = sw
  53. if !conR.sync {
  54. conR.conS.Start()
  55. }
  56. go conR.broadcastNewRoundStepRoutine()
  57. }
  58. }
  59. // Implements Reactor
  60. func (conR *ConsensusReactor) Stop() {
  61. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  62. log.Info("Stopping ConsensusReactor")
  63. conR.conS.Stop()
  64. close(conR.quit)
  65. }
  66. }
  67. func (conR *ConsensusReactor) IsRunning() bool {
  68. return atomic.LoadUint32(&conR.running) == 1
  69. }
  70. // Implements Reactor
  71. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  72. // TODO optimize
  73. return []*p2p.ChannelDescriptor{
  74. &p2p.ChannelDescriptor{
  75. Id: StateChannel,
  76. Priority: 5,
  77. SendQueueCapacity: 100,
  78. },
  79. &p2p.ChannelDescriptor{
  80. Id: DataChannel,
  81. Priority: 5,
  82. SendQueueCapacity: 2,
  83. },
  84. &p2p.ChannelDescriptor{
  85. Id: VoteChannel,
  86. Priority: 5,
  87. SendQueueCapacity: 40,
  88. },
  89. }
  90. }
  91. // Implements Reactor
  92. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  93. if !conR.IsRunning() {
  94. return
  95. }
  96. // Create peerState for peer
  97. peerState := NewPeerState(peer)
  98. peer.Data.Set(PeerStateKey, peerState)
  99. // Begin gossip routines for this peer.
  100. go conR.gossipDataRoutine(peer, peerState)
  101. go conR.gossipVotesRoutine(peer, peerState)
  102. // Send our state to peer.
  103. conR.sendNewRoundStepMessage(peer)
  104. }
  105. // Implements Reactor
  106. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. if !conR.IsRunning() {
  108. return
  109. }
  110. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  111. }
  112. // Implements Reactor
  113. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  114. if conR.sync || !conR.IsRunning() {
  115. return
  116. }
  117. // Get round state
  118. rs := conR.conS.GetRoundState()
  119. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  120. _, msg_, err := DecodeMessage(msgBytes)
  121. if err != nil {
  122. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  123. return
  124. }
  125. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_) //, "bytes", msgBytes)
  126. switch chId {
  127. case StateChannel:
  128. switch msg := msg_.(type) {
  129. case *NewRoundStepMessage:
  130. ps.ApplyNewRoundStepMessage(msg, rs)
  131. case *CommitStepMessage:
  132. ps.ApplyCommitStepMessage(msg)
  133. case *HasVoteMessage:
  134. ps.ApplyHasVoteMessage(msg)
  135. default:
  136. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  137. }
  138. case DataChannel:
  139. switch msg := msg_.(type) {
  140. case *ProposalMessage:
  141. ps.SetHasProposal(msg.Proposal)
  142. err = conR.conS.SetProposal(msg.Proposal)
  143. case *ProposalPOLMessage:
  144. ps.ApplyProposalPOLMessage(msg)
  145. case *BlockPartMessage:
  146. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  147. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
  148. default:
  149. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  150. }
  151. case VoteChannel:
  152. switch msg := msg_.(type) {
  153. case *VoteMessage:
  154. vote := msg.Vote
  155. if rs.Height != vote.Height {
  156. if rs.Height == vote.Height+1 {
  157. if rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit {
  158. goto VOTE_PASS // *ducks*
  159. }
  160. }
  161. return // Wrong height. Not necessarily a bad peer.
  162. }
  163. VOTE_PASS:
  164. address, _ := rs.Validators.GetByIndex(msg.ValidatorIndex)
  165. added, index, err := conR.conS.AddVote(address, vote, peer.Key)
  166. if err != nil {
  167. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  168. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  169. log.Warn("Found conflicting vote. Publish evidence")
  170. evidenceTx := &types.DupeoutTx{
  171. Address: address,
  172. VoteA: *errDupe.VoteA,
  173. VoteB: *errDupe.VoteB,
  174. }
  175. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  176. } else {
  177. // Probably an invalid signature. Bad peer.
  178. log.Warn("Error attempting to add vote", "error", err)
  179. // TODO: punish peer
  180. }
  181. }
  182. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size(), nil)
  183. ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size(), nil)
  184. ps.SetHasVote(vote, index)
  185. if added {
  186. // If rs.Height == vote.Height && rs.Round < vote.Round,
  187. // the peer is sending us CatchupCommit precommits.
  188. // We could make note of this and help filter in broadcastHasVoteMessage().
  189. conR.broadcastHasVoteMessage(vote, index)
  190. }
  191. default:
  192. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  193. }
  194. default:
  195. log.Warn(Fmt("Unknown channel %X", chId))
  196. }
  197. if err != nil {
  198. log.Warn("Error in Receive()", "error", err)
  199. }
  200. }
  201. // Broadcasts HasVoteMessage to peers that care.
  202. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  203. msg := &HasVoteMessage{
  204. Height: vote.Height,
  205. Round: vote.Round,
  206. Type: vote.Type,
  207. Index: index,
  208. }
  209. conR.sw.Broadcast(StateChannel, msg)
  210. /*
  211. // TODO: Make this broadcast more selective.
  212. for _, peer := range conR.sw.Peers().List() {
  213. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  214. prs := ps.GetRoundState()
  215. if prs.Height == vote.Height {
  216. // TODO: Also filter on round?
  217. peer.TrySend(StateChannel, msg)
  218. } else {
  219. // Height doesn't match
  220. // TODO: check a field, maybe CatchupCommitRound?
  221. // TODO: But that requires changing the struct field comment.
  222. }
  223. }
  224. */
  225. }
  226. // Sets our private validator account for signing votes.
  227. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  228. conR.conS.SetPrivValidator(priv)
  229. }
  230. // Switch from the fast sync to the consensus:
  231. // reset the state, turn off fast sync, start the consensus-state-machine
  232. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  233. conR.conS.updateToState(state, false)
  234. conR.sync = false
  235. conR.conS.Start()
  236. }
  237. // implements events.Eventable
  238. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  239. conR.evsw = evsw
  240. conR.conS.SetFireable(evsw)
  241. }
  242. //--------------------------------------
  243. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  244. nrsMsg = &NewRoundStepMessage{
  245. Height: rs.Height,
  246. Round: rs.Round,
  247. Step: rs.Step,
  248. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  249. LastCommitRound: rs.LastCommit.Round(),
  250. }
  251. if rs.Step == RoundStepCommit {
  252. csMsg = &CommitStepMessage{
  253. Height: rs.Height,
  254. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  255. BlockParts: rs.ProposalBlockParts.BitArray(),
  256. }
  257. }
  258. return
  259. }
  260. // Listens for changes to the ConsensusState.Step by pulling
  261. // on conR.conS.NewStepCh().
  262. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  263. for {
  264. // Get RoundState with new Step or quit.
  265. var rs *RoundState
  266. select {
  267. case rs = <-conR.conS.NewStepCh():
  268. case <-conR.quit:
  269. return
  270. }
  271. nrsMsg, csMsg := makeRoundStepMessages(rs)
  272. if nrsMsg != nil {
  273. conR.sw.Broadcast(StateChannel, nrsMsg)
  274. }
  275. if csMsg != nil {
  276. conR.sw.Broadcast(StateChannel, csMsg)
  277. }
  278. }
  279. }
  280. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  281. rs := conR.conS.GetRoundState()
  282. nrsMsg, csMsg := makeRoundStepMessages(rs)
  283. if nrsMsg != nil {
  284. peer.Send(StateChannel, nrsMsg)
  285. }
  286. if csMsg != nil {
  287. peer.Send(StateChannel, csMsg)
  288. }
  289. }
  290. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  291. OUTER_LOOP:
  292. for {
  293. // Manage disconnects from self or peer.
  294. if !peer.IsRunning() || !conR.IsRunning() {
  295. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  296. return
  297. }
  298. rs := conR.conS.GetRoundState()
  299. prs := ps.GetRoundState()
  300. // Send proposal Block parts?
  301. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  302. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  303. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  304. part := rs.ProposalBlockParts.GetPart(index)
  305. msg := &BlockPartMessage{
  306. Height: rs.Height, // This tells peer that this part applies to us.
  307. Round: rs.Round, // This tells peer that this part applies to us.
  308. Part: part,
  309. }
  310. peer.Send(DataChannel, msg)
  311. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  312. continue OUTER_LOOP
  313. }
  314. }
  315. // If the peer is on a previous height, help catch up.
  316. if (0 < prs.Height) && (prs.Height < rs.Height) {
  317. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  318. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  319. // Ensure that the peer's PartSetHeader is correct
  320. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  321. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  322. log.Debug("Peer ProposalBlockPartsHeader mismatch, sleeping",
  323. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  324. time.Sleep(peerGossipSleepDuration)
  325. continue OUTER_LOOP
  326. }
  327. // Load the part
  328. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  329. if part == nil {
  330. log.Warn("Could not load part", "index", index,
  331. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  332. time.Sleep(peerGossipSleepDuration)
  333. continue OUTER_LOOP
  334. }
  335. // Send the part
  336. msg := &BlockPartMessage{
  337. Height: prs.Height, // Not our height, so it doesn't matter.
  338. Round: prs.Round, // Not our height, so it doesn't matter.
  339. Part: part,
  340. }
  341. peer.Send(DataChannel, msg)
  342. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  343. continue OUTER_LOOP
  344. } else {
  345. //log.Debug("No parts to send in catch-up, sleeping")
  346. time.Sleep(peerGossipSleepDuration)
  347. continue OUTER_LOOP
  348. }
  349. }
  350. // If height and round don't match, sleep.
  351. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  352. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  353. time.Sleep(peerGossipSleepDuration)
  354. continue OUTER_LOOP
  355. }
  356. // By here, height and round match.
  357. // Proposal block parts were already matched and sent if any were wanted.
  358. // (These can match on hash so the round doesn't matter)
  359. // Now consider sending other things, like the Proposal itself.
  360. // Send Proposal && ProposalPOL BitArray?
  361. if rs.Proposal != nil && !prs.Proposal {
  362. // Proposal
  363. {
  364. msg := &ProposalMessage{Proposal: rs.Proposal}
  365. peer.Send(DataChannel, msg)
  366. ps.SetHasProposal(rs.Proposal)
  367. }
  368. // ProposalPOL.
  369. // Peer must receive ProposalMessage first.
  370. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  371. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  372. if 0 <= rs.Proposal.POLRound {
  373. msg := &ProposalPOLMessage{
  374. Height: rs.Height,
  375. ProposalPOLRound: rs.Proposal.POLRound,
  376. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  377. }
  378. peer.Send(DataChannel, msg)
  379. }
  380. continue OUTER_LOOP
  381. }
  382. // Nothing to do. Sleep.
  383. time.Sleep(peerGossipSleepDuration)
  384. continue OUTER_LOOP
  385. }
  386. }
  387. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  388. // Simple hack to throttle logs upon sleep.
  389. var sleeping = 0
  390. OUTER_LOOP:
  391. for {
  392. // Manage disconnects from self or peer.
  393. if !peer.IsRunning() || !conR.IsRunning() {
  394. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  395. return
  396. }
  397. rs := conR.conS.GetRoundState()
  398. prs := ps.GetRoundState()
  399. switch sleeping {
  400. case 1: // First sleep
  401. sleeping = 2
  402. case 2: // No more sleep
  403. sleeping = 0
  404. }
  405. // prsVoteSet: a pointer to a VoteSet field of prs.
  406. // Returns true when useful work was done.
  407. trySendVote := func(voteSet *VoteSet, prsVoteSet **BitArray) (sent bool) {
  408. if voteSet == nil {
  409. return false
  410. }
  411. if *prsVoteSet == nil {
  412. ps.EnsureVoteBitArrays(voteSet.Height(), voteSet.Size(), prs)
  413. // We could return true here (useful work was done)
  414. // or, we can continue since prsVoteSet is no longer nil.
  415. if *prsVoteSet == nil {
  416. panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays")
  417. }
  418. }
  419. // TODO: give priority to our vote.
  420. if index, ok := voteSet.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok {
  421. vote := voteSet.GetByIndex(index)
  422. msg := &VoteMessage{index, vote}
  423. peer.Send(VoteChannel, msg)
  424. ps.SetHasVote(vote, index)
  425. return true
  426. }
  427. return false
  428. }
  429. // prsVoteSet: a pointer to a VoteSet field of prs.
  430. // Returns true when useful work was done.
  431. trySendPrecommitFromValidation := func(validation *types.Validation, prsVoteSet **BitArray) (sent bool) {
  432. if validation == nil {
  433. return false
  434. } else if *prsVoteSet == nil {
  435. ps.EnsureVoteBitArrays(validation.Height(), len(validation.Precommits), prs)
  436. // We could return true here (useful work was done)
  437. // or, we can continue since prsVoteSet is no longer nil.
  438. if *prsVoteSet == nil {
  439. panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays")
  440. }
  441. }
  442. if index, ok := validation.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok {
  443. precommit := validation.Precommits[index]
  444. log.Debug("Picked precommit to send", "index", index, "precommit", precommit)
  445. msg := &VoteMessage{index, precommit}
  446. peer.Send(VoteChannel, msg)
  447. ps.SetHasVote(precommit, index)
  448. return true
  449. }
  450. return false
  451. }
  452. // If height matches, then send LastCommit, Prevotes, Precommits.
  453. if rs.Height == prs.Height {
  454. // If there are lastCommits to send...
  455. if prs.Step == RoundStepNewHeight {
  456. if trySendVote(rs.LastCommit, &prs.LastCommit) {
  457. continue OUTER_LOOP
  458. }
  459. }
  460. // If there are prevotes to send...
  461. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  462. if trySendVote(rs.Votes.Prevotes(rs.Round), &prs.Prevotes) {
  463. continue OUTER_LOOP
  464. }
  465. }
  466. // If there are precommits to send...
  467. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  468. if trySendVote(rs.Votes.Precommits(rs.Round), &prs.Precommits) {
  469. continue OUTER_LOOP
  470. }
  471. }
  472. // If there are POLPrevotes to send...
  473. if 0 <= prs.ProposalPOLRound {
  474. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  475. if trySendVote(polPrevotes, &prs.ProposalPOL) {
  476. continue OUTER_LOOP
  477. }
  478. }
  479. }
  480. }
  481. // Special catchup logic.
  482. // If peer is lagging by height 1, send LastCommit.
  483. if prs.Height != 0 && prs.Height == rs.Height-1 {
  484. if prs.Round == rs.LastCommit.Round() {
  485. // NOTE: We prefer to use prs.Precommits if
  486. // prs.Round matches prs.CatchupCommitRound.
  487. if trySendVote(rs.LastCommit, &prs.Precommits) {
  488. continue OUTER_LOOP
  489. }
  490. } else {
  491. ps.EnsureCatchupCommitRound(prs.Height, rs.LastCommit.Round())
  492. if trySendVote(rs.LastCommit, &prs.CatchupCommit) {
  493. continue OUTER_LOOP
  494. }
  495. }
  496. }
  497. // Catchup logic
  498. // If peer is lagging by more than 1, send Validation.
  499. if prs.Height != 0 && prs.Height <= rs.Height-2 {
  500. // Load the block validation for prs.Height,
  501. // which contains precommit signatures for prs.Height.
  502. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  503. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  504. ps.EnsureCatchupCommitRound(prs.Height, validation.Round())
  505. if trySendPrecommitFromValidation(validation, &prs.CatchupCommit) {
  506. continue OUTER_LOOP
  507. }
  508. }
  509. if sleeping == 0 {
  510. // We sent nothing. Sleep...
  511. sleeping = 1
  512. log.Debug("No votes to send, sleeping", "peer", peer,
  513. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  514. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  515. } else if sleeping == 2 {
  516. // Continued sleep...
  517. sleeping = 1
  518. }
  519. time.Sleep(peerGossipSleepDuration)
  520. continue OUTER_LOOP
  521. }
  522. }
  523. //-----------------------------------------------------------------------------
  524. // Read only when returned by PeerState.GetRoundState().
  525. type PeerRoundState struct {
  526. Height int // Height peer is at
  527. Round int // Round peer is at
  528. Step RoundStepType // Step peer is at
  529. StartTime time.Time // Estimated start of round 0 at this height
  530. Proposal bool // True if peer has proposal for this round
  531. ProposalBlockPartsHeader types.PartSetHeader //
  532. ProposalBlockParts *BitArray //
  533. ProposalPOLRound int // -1 if none
  534. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  535. Prevotes *BitArray // All votes peer has for this round
  536. Precommits *BitArray // All precommits peer has for this round
  537. LastCommitRound int // Round of commit for last height.
  538. LastCommit *BitArray // All commit precommits of commit for last height.
  539. CatchupCommitRound int // Round that we believe commit round is.
  540. CatchupCommit *BitArray // All commit precommits peer has for this height
  541. }
  542. //-----------------------------------------------------------------------------
  543. var (
  544. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  545. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  546. )
  547. type PeerState struct {
  548. Key string
  549. mtx sync.Mutex
  550. PeerRoundState
  551. }
  552. func NewPeerState(peer *p2p.Peer) *PeerState {
  553. return &PeerState{Key: peer.Key}
  554. }
  555. // Returns an atomic snapshot of the PeerRoundState.
  556. // There's no point in mutating it since it won't change PeerState.
  557. func (ps *PeerState) GetRoundState() *PeerRoundState {
  558. ps.mtx.Lock()
  559. defer ps.mtx.Unlock()
  560. prs := ps.PeerRoundState // copy
  561. return &prs
  562. }
  563. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  564. ps.mtx.Lock()
  565. defer ps.mtx.Unlock()
  566. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  567. return
  568. }
  569. if ps.Proposal {
  570. return
  571. }
  572. ps.Proposal = true
  573. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  574. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  575. ps.ProposalPOLRound = proposal.POLRound
  576. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  577. }
  578. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  579. ps.mtx.Lock()
  580. defer ps.mtx.Unlock()
  581. if ps.Height != height || ps.Round != round {
  582. return
  583. }
  584. ps.ProposalBlockParts.SetIndex(index, true)
  585. }
  586. // prs: If given, will also update this PeerRoundState copy.
  587. // NOTE: It's important to make sure that numValidators actually matches
  588. // what the node sees as the number of validators for height.
  589. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int, prs *PeerRoundState) {
  590. ps.mtx.Lock()
  591. defer ps.mtx.Unlock()
  592. if ps.Height == height {
  593. if ps.Prevotes == nil {
  594. ps.Prevotes = NewBitArray(numValidators)
  595. }
  596. if ps.Precommits == nil {
  597. ps.Precommits = NewBitArray(numValidators)
  598. }
  599. if ps.CatchupCommit == nil {
  600. ps.CatchupCommit = NewBitArray(numValidators)
  601. }
  602. if ps.ProposalPOL == nil {
  603. ps.ProposalPOL = NewBitArray(numValidators)
  604. }
  605. } else if ps.Height == height+1 {
  606. if ps.LastCommit == nil {
  607. ps.LastCommit = NewBitArray(numValidators)
  608. }
  609. }
  610. // Also, update prs if given.
  611. if prs != nil {
  612. prs.Prevotes = ps.Prevotes
  613. prs.Precommits = ps.Precommits
  614. prs.CatchupCommit = ps.CatchupCommit
  615. prs.ProposalPOL = ps.ProposalPOL
  616. prs.LastCommit = ps.LastCommit
  617. }
  618. }
  619. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  620. ps.mtx.Lock()
  621. defer ps.mtx.Unlock()
  622. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  623. }
  624. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  625. if ps.Height == height+1 && ps.LastCommitRound == round && type_ == types.VoteTypePrecommit {
  626. // Special case for LastCommit.
  627. ps.LastCommit.SetIndex(index, true)
  628. log.Debug("setHasVote", "LastCommit", ps.LastCommit, "index", index)
  629. return
  630. } else if ps.Height != height {
  631. // Does not apply.
  632. return
  633. }
  634. // By here, ps.Height is height.
  635. switch type_ {
  636. case types.VoteTypePrevote:
  637. if ps.ProposalPOLRound == round {
  638. ps.ProposalPOL.SetIndex(index, true)
  639. }
  640. ps.Prevotes.SetIndex(index, true)
  641. log.Debug("SetHasVote", "peer", ps.Key, "prevotes", ps.Prevotes, "index", index)
  642. case types.VoteTypePrecommit:
  643. if ps.CatchupCommitRound == round {
  644. ps.CatchupCommit.SetIndex(index, true)
  645. }
  646. ps.Precommits.SetIndex(index, true)
  647. log.Debug("SetHasVote", "peer", ps.Key, "precommits", ps.Precommits, "index", index)
  648. default:
  649. panic("Invalid vote type")
  650. }
  651. }
  652. // NOTE: 'round' is what we know to be the commit round for height.
  653. func (ps *PeerState) EnsureCatchupCommitRound(height, round int) {
  654. ps.mtx.Lock()
  655. defer ps.mtx.Unlock()
  656. if ps.Height != height {
  657. return
  658. }
  659. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  660. panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  661. }
  662. if ps.CatchupCommitRound == round {
  663. return // Nothing to do!
  664. }
  665. ps.CatchupCommitRound = round
  666. ps.CatchupCommit = nil
  667. }
  668. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  669. ps.mtx.Lock()
  670. defer ps.mtx.Unlock()
  671. // Just remember these values.
  672. psHeight := ps.Height
  673. psRound := ps.Round
  674. //psStep := ps.Step
  675. psCatchupCommitRound := ps.CatchupCommitRound
  676. psCatchupCommit := ps.CatchupCommit
  677. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  678. ps.Height = msg.Height
  679. ps.Round = msg.Round
  680. ps.Step = msg.Step
  681. ps.StartTime = startTime
  682. if psHeight != msg.Height || psRound != msg.Round {
  683. ps.Proposal = false
  684. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  685. ps.ProposalBlockParts = nil
  686. ps.ProposalPOLRound = -1
  687. ps.ProposalPOL = nil
  688. // We'll update the BitArray capacity later.
  689. ps.Prevotes = nil
  690. ps.Precommits = nil
  691. }
  692. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  693. // Peer caught up to CatchupCommitRound.
  694. // Preserve psCatchupCommit!
  695. // NOTE: We prefer to use prs.Precommits if
  696. // pr.Round matches pr.CatchupCommitRound.
  697. ps.Precommits = psCatchupCommit
  698. }
  699. if psHeight != msg.Height {
  700. // Shift Precommits to LastCommit.
  701. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  702. ps.LastCommitRound = msg.LastCommitRound
  703. ps.LastCommit = ps.Precommits
  704. } else {
  705. ps.LastCommitRound = msg.LastCommitRound
  706. ps.LastCommit = nil
  707. }
  708. // We'll update the BitArray capacity later.
  709. ps.CatchupCommitRound = -1
  710. ps.CatchupCommit = nil
  711. }
  712. }
  713. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  714. ps.mtx.Lock()
  715. defer ps.mtx.Unlock()
  716. if ps.Height != msg.Height {
  717. return
  718. }
  719. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  720. ps.ProposalBlockParts = msg.BlockParts
  721. }
  722. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  723. ps.mtx.Lock()
  724. defer ps.mtx.Unlock()
  725. if ps.Height != msg.Height {
  726. return
  727. }
  728. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  729. }
  730. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  731. ps.mtx.Lock()
  732. defer ps.mtx.Unlock()
  733. if ps.Height != msg.Height {
  734. return
  735. }
  736. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  737. return
  738. }
  739. // TODO: Merge onto existing ps.ProposalPOL?
  740. // We might have sent some prevotes in the meantime.
  741. ps.ProposalPOL = msg.ProposalPOL
  742. }
  743. //-----------------------------------------------------------------------------
  744. // Messages
  745. const (
  746. msgTypeNewRoundStep = byte(0x01)
  747. msgTypeCommitStep = byte(0x02)
  748. msgTypeProposal = byte(0x11)
  749. msgTypeProposalPOL = byte(0x12)
  750. msgTypeBlockPart = byte(0x13) // both block & POL
  751. msgTypeVote = byte(0x14)
  752. msgTypeHasVote = byte(0x15)
  753. )
  754. type ConsensusMessage interface{}
  755. var _ = binary.RegisterInterface(
  756. struct{ ConsensusMessage }{},
  757. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  758. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  759. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  760. binary.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  761. binary.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  762. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  763. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  764. )
  765. // TODO: check for unnecessary extra bytes at the end.
  766. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  767. msgType = bz[0]
  768. n := new(int64)
  769. r := bytes.NewReader(bz)
  770. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  771. return
  772. }
  773. //-------------------------------------
  774. // For every height/round/step transition
  775. type NewRoundStepMessage struct {
  776. Height int
  777. Round int
  778. Step RoundStepType
  779. SecondsSinceStartTime int
  780. LastCommitRound int
  781. }
  782. func (m *NewRoundStepMessage) String() string {
  783. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  784. m.Height, m.Round, m.Step, m.LastCommitRound)
  785. }
  786. //-------------------------------------
  787. type CommitStepMessage struct {
  788. Height int
  789. BlockPartsHeader types.PartSetHeader
  790. BlockParts *BitArray
  791. }
  792. func (m *CommitStepMessage) String() string {
  793. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  794. }
  795. //-------------------------------------
  796. type ProposalMessage struct {
  797. Proposal *Proposal
  798. }
  799. func (m *ProposalMessage) String() string {
  800. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  801. }
  802. //-------------------------------------
  803. type ProposalPOLMessage struct {
  804. Height int
  805. ProposalPOLRound int
  806. ProposalPOL *BitArray
  807. }
  808. func (m *ProposalPOLMessage) String() string {
  809. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  810. }
  811. //-------------------------------------
  812. type BlockPartMessage struct {
  813. Height int
  814. Round int
  815. Part *types.Part
  816. }
  817. func (m *BlockPartMessage) String() string {
  818. return fmt.Sprintf("[BlockPart H:%v R:%v T:%X P:%v]", m.Height, m.Round, m.Part)
  819. }
  820. //-------------------------------------
  821. type VoteMessage struct {
  822. ValidatorIndex int
  823. Vote *types.Vote
  824. }
  825. func (m *VoteMessage) String() string {
  826. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  827. }
  828. //-------------------------------------
  829. type HasVoteMessage struct {
  830. Height int
  831. Round int
  832. Type byte
  833. Index int
  834. }
  835. func (m *HasVoteMessage) String() string {
  836. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  837. }