You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1043 lines
30 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. PeerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. rebroadcastRoundStepDuration = 1000 * time.Millisecond // Time to sleep if there's nothing to send.
  26. )
  27. //-----------------------------------------------------------------------------
  28. // The reactor's underlying ConsensusState may change state at any time.
  29. // We atomically copy the RoundState struct before using it.
  30. type ConsensusReactor struct {
  31. sw *p2p.Switch
  32. running uint32
  33. quit chan struct{}
  34. blockStore *bc.BlockStore
  35. conS *ConsensusState
  36. // if fast sync is running we don't really do anything
  37. sync bool
  38. evsw events.Fireable
  39. }
  40. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor {
  41. conR := &ConsensusReactor{
  42. quit: make(chan struct{}),
  43. blockStore: blockStore,
  44. conS: consensusState,
  45. sync: sync,
  46. }
  47. return conR
  48. }
  49. // Implements Reactor
  50. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  51. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  52. log.Info("Starting ConsensusReactor")
  53. conR.sw = sw
  54. if !conR.sync {
  55. conR.conS.Start()
  56. }
  57. go conR.broadcastNewRoundStepRoutine()
  58. // go conR.rebroadcastRoundStepRoutine()
  59. }
  60. }
  61. // Implements Reactor
  62. func (conR *ConsensusReactor) Stop() {
  63. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  64. log.Info("Stopping ConsensusReactor")
  65. conR.conS.Stop()
  66. close(conR.quit)
  67. }
  68. }
  69. func (conR *ConsensusReactor) IsRunning() bool {
  70. return atomic.LoadUint32(&conR.running) == 1
  71. }
  72. // Implements Reactor
  73. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  74. // TODO optimize
  75. return []*p2p.ChannelDescriptor{
  76. &p2p.ChannelDescriptor{
  77. Id: StateChannel,
  78. Priority: 5,
  79. SendQueueCapacity: 100,
  80. },
  81. &p2p.ChannelDescriptor{
  82. Id: DataChannel,
  83. Priority: 5,
  84. SendQueueCapacity: 2,
  85. },
  86. &p2p.ChannelDescriptor{
  87. Id: VoteChannel,
  88. Priority: 5,
  89. SendQueueCapacity: 40,
  90. },
  91. }
  92. }
  93. // Implements Reactor
  94. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  95. if !conR.IsRunning() {
  96. return
  97. }
  98. // Create peerState for peer
  99. peerState := NewPeerState(peer)
  100. peer.Data.Set(PeerStateKey, peerState)
  101. // Begin gossip routines for this peer.
  102. go conR.gossipDataRoutine(peer, peerState)
  103. go conR.gossipVotesRoutine(peer, peerState)
  104. // Send our state to peer.
  105. conR.sendNewRoundStepMessage(peer)
  106. }
  107. // Implements Reactor
  108. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  109. if !conR.IsRunning() {
  110. return
  111. }
  112. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  113. }
  114. // Implements Reactor
  115. // NOTE: We process these messages even when we're fast_syncing.
  116. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  117. log.Debug("Receive", "channel", chId, "peer", peer, "bytes", msgBytes)
  118. if !conR.IsRunning() {
  119. return
  120. }
  121. // Get round state
  122. rs := conR.conS.GetRoundState()
  123. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  124. _, msg_, err := DecodeMessage(msgBytes)
  125. if err != nil {
  126. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  127. return
  128. }
  129. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_) //, "bytes", msgBytes)
  130. switch chId {
  131. case StateChannel:
  132. switch msg := msg_.(type) {
  133. case *NewRoundStepMessage:
  134. ps.ApplyNewRoundStepMessage(msg, rs)
  135. case *CommitStepMessage:
  136. ps.ApplyCommitStepMessage(msg)
  137. case *HasVoteMessage:
  138. ps.ApplyHasVoteMessage(msg)
  139. default:
  140. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  141. }
  142. case DataChannel:
  143. switch msg := msg_.(type) {
  144. case *ProposalMessage:
  145. ps.SetHasProposal(msg.Proposal)
  146. err = conR.conS.SetProposal(msg.Proposal)
  147. case *ProposalPOLMessage:
  148. ps.ApplyProposalPOLMessage(msg)
  149. case *BlockPartMessage:
  150. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  151. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
  152. default:
  153. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  154. }
  155. case VoteChannel:
  156. switch msg := msg_.(type) {
  157. case *VoteMessage:
  158. vote := msg.Vote
  159. var validators *sm.ValidatorSet
  160. if rs.Height == vote.Height {
  161. validators = rs.Validators
  162. } else if rs.Height == vote.Height+1 {
  163. if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
  164. return // Wrong height, not a LastCommit straggler commit.
  165. }
  166. validators = rs.LastValidators
  167. } else {
  168. return // Wrong height. Not necessarily a bad peer.
  169. }
  170. // We have vote/validators. Height may not be rs.Height
  171. address, _ := validators.GetByIndex(msg.ValidatorIndex)
  172. added, index, err := conR.conS.AddVote(address, vote, peer.Key)
  173. if err != nil {
  174. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  175. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  176. log.Warn("Found conflicting vote. Publish evidence")
  177. evidenceTx := &types.DupeoutTx{
  178. Address: address,
  179. VoteA: *errDupe.VoteA,
  180. VoteB: *errDupe.VoteB,
  181. }
  182. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  183. } else {
  184. // Probably an invalid signature. Bad peer.
  185. log.Warn("Error attempting to add vote", "error", err)
  186. // TODO: punish peer
  187. }
  188. }
  189. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  190. ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size())
  191. ps.SetHasVote(vote, index)
  192. if added {
  193. // If rs.Height == vote.Height && rs.Round < vote.Round,
  194. // the peer is sending us CatchupCommit precommits.
  195. // We could make note of this and help filter in broadcastHasVoteMessage().
  196. conR.broadcastHasVoteMessage(vote, index)
  197. }
  198. default:
  199. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  200. }
  201. default:
  202. log.Warn(Fmt("Unknown channel %X", chId))
  203. }
  204. if err != nil {
  205. log.Warn("Error in Receive()", "error", err)
  206. }
  207. }
  208. // Broadcasts HasVoteMessage to peers that care.
  209. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  210. msg := &HasVoteMessage{
  211. Height: vote.Height,
  212. Round: vote.Round,
  213. Type: vote.Type,
  214. Index: index,
  215. }
  216. conR.sw.Broadcast(StateChannel, msg)
  217. /*
  218. // TODO: Make this broadcast more selective.
  219. for _, peer := range conR.sw.Peers().List() {
  220. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  221. prs := ps.GetRoundState()
  222. if prs.Height == vote.Height {
  223. // TODO: Also filter on round?
  224. peer.TrySend(StateChannel, msg)
  225. } else {
  226. // Height doesn't match
  227. // TODO: check a field, maybe CatchupCommitRound?
  228. // TODO: But that requires changing the struct field comment.
  229. }
  230. }
  231. */
  232. }
  233. // Sets our private validator account for signing votes.
  234. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  235. conR.conS.SetPrivValidator(priv)
  236. }
  237. // Switch from the fast sync to the consensus:
  238. // reset the state, turn off fast sync, start the consensus-state-machine
  239. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  240. conR.conS.updateToState(state, false)
  241. conR.sync = false
  242. conR.conS.Start()
  243. }
  244. // implements events.Eventable
  245. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  246. conR.evsw = evsw
  247. conR.conS.SetFireable(evsw)
  248. }
  249. //--------------------------------------
  250. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  251. nrsMsg = &NewRoundStepMessage{
  252. Height: rs.Height,
  253. Round: rs.Round,
  254. Step: rs.Step,
  255. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  256. LastCommitRound: rs.LastCommit.Round(),
  257. }
  258. if rs.Step == RoundStepCommit {
  259. csMsg = &CommitStepMessage{
  260. Height: rs.Height,
  261. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  262. BlockParts: rs.ProposalBlockParts.BitArray(),
  263. }
  264. }
  265. return
  266. }
  267. // Listens for changes to the ConsensusState.Step by pulling
  268. // on conR.conS.NewStepCh().
  269. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  270. for {
  271. // Get RoundState with new Step or quit.
  272. var rs *RoundState
  273. select {
  274. case rs = <-conR.conS.NewStepCh():
  275. case <-conR.quit:
  276. return
  277. }
  278. nrsMsg, csMsg := makeRoundStepMessages(rs)
  279. if nrsMsg != nil {
  280. conR.sw.Broadcast(StateChannel, nrsMsg)
  281. }
  282. if csMsg != nil {
  283. conR.sw.Broadcast(StateChannel, csMsg)
  284. }
  285. }
  286. }
  287. /* TODO delete
  288. // Periodically broadcast NewRoundStepMessage.
  289. // This is a hack. TODO remove the need for it?
  290. // The issue is with Start() happening after a NewRoundStep message
  291. // was received from a peer, for the bootstrapping set.
  292. func (conR *ConsensusReactor) rebroadcastRoundStepRoutine() {
  293. for {
  294. time.Sleep(rebroadcastRoundStepDuration)
  295. rs := conR.conS.GetRoundState()
  296. nrsMsg, csMsg := makeRoundStepMessages(rs)
  297. if nrsMsg != nil {
  298. conR.sw.Broadcast(StateChannel, nrsMsg)
  299. }
  300. if csMsg != nil {
  301. conR.sw.Broadcast(StateChannel, csMsg)
  302. }
  303. }
  304. }
  305. */
  306. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  307. rs := conR.conS.GetRoundState()
  308. nrsMsg, csMsg := makeRoundStepMessages(rs)
  309. if nrsMsg != nil {
  310. peer.Send(StateChannel, nrsMsg)
  311. }
  312. if csMsg != nil {
  313. peer.Send(StateChannel, csMsg)
  314. }
  315. }
  316. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  317. log := log.New("peer", peer.Key)
  318. OUTER_LOOP:
  319. for {
  320. // Manage disconnects from self or peer.
  321. if !peer.IsRunning() || !conR.IsRunning() {
  322. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  323. return
  324. }
  325. rs := conR.conS.GetRoundState()
  326. prs := ps.GetRoundState()
  327. // Send proposal Block parts?
  328. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  329. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  330. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  331. part := rs.ProposalBlockParts.GetPart(index)
  332. msg := &BlockPartMessage{
  333. Height: rs.Height, // This tells peer that this part applies to us.
  334. Round: rs.Round, // This tells peer that this part applies to us.
  335. Part: part,
  336. }
  337. peer.Send(DataChannel, msg)
  338. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  339. continue OUTER_LOOP
  340. }
  341. }
  342. // If the peer is on a previous height, help catch up.
  343. if (0 < prs.Height) && (prs.Height < rs.Height) {
  344. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  345. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  346. // Ensure that the peer's PartSetHeader is correct
  347. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  348. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  349. log.Debug("Peer ProposalBlockPartsHeader mismatch, sleeping",
  350. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  351. time.Sleep(peerGossipSleepDuration)
  352. continue OUTER_LOOP
  353. }
  354. // Load the part
  355. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  356. if part == nil {
  357. log.Warn("Could not load part", "index", index,
  358. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  359. time.Sleep(peerGossipSleepDuration)
  360. continue OUTER_LOOP
  361. }
  362. // Send the part
  363. msg := &BlockPartMessage{
  364. Height: prs.Height, // Not our height, so it doesn't matter.
  365. Round: prs.Round, // Not our height, so it doesn't matter.
  366. Part: part,
  367. }
  368. peer.Send(DataChannel, msg)
  369. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  370. continue OUTER_LOOP
  371. } else {
  372. //log.Debug("No parts to send in catch-up, sleeping")
  373. time.Sleep(peerGossipSleepDuration)
  374. continue OUTER_LOOP
  375. }
  376. }
  377. // If height and round don't match, sleep.
  378. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  379. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  380. time.Sleep(peerGossipSleepDuration)
  381. continue OUTER_LOOP
  382. }
  383. // By here, height and round match.
  384. // Proposal block parts were already matched and sent if any were wanted.
  385. // (These can match on hash so the round doesn't matter)
  386. // Now consider sending other things, like the Proposal itself.
  387. // Send Proposal && ProposalPOL BitArray?
  388. if rs.Proposal != nil && !prs.Proposal {
  389. // Proposal
  390. {
  391. msg := &ProposalMessage{Proposal: rs.Proposal}
  392. peer.Send(DataChannel, msg)
  393. ps.SetHasProposal(rs.Proposal)
  394. }
  395. // ProposalPOL.
  396. // Peer must receive ProposalMessage first.
  397. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  398. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  399. if 0 <= rs.Proposal.POLRound {
  400. msg := &ProposalPOLMessage{
  401. Height: rs.Height,
  402. ProposalPOLRound: rs.Proposal.POLRound,
  403. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  404. }
  405. peer.Send(DataChannel, msg)
  406. }
  407. continue OUTER_LOOP
  408. }
  409. // Nothing to do. Sleep.
  410. time.Sleep(peerGossipSleepDuration)
  411. continue OUTER_LOOP
  412. }
  413. }
  414. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  415. log := log.New("peer", peer.Key)
  416. // Simple hack to throttle logs upon sleep.
  417. var sleeping = 0
  418. OUTER_LOOP:
  419. for {
  420. // Manage disconnects from self or peer.
  421. if !peer.IsRunning() || !conR.IsRunning() {
  422. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  423. return
  424. }
  425. rs := conR.conS.GetRoundState()
  426. prs := ps.GetRoundState()
  427. switch sleeping {
  428. case 1: // First sleep
  429. sleeping = 2
  430. case 2: // No more sleep
  431. sleeping = 0
  432. }
  433. log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  434. "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  435. // If height matches, then send LastCommit, Prevotes, Precommits.
  436. if rs.Height == prs.Height {
  437. // If there are lastCommits to send...
  438. if prs.Step == RoundStepNewHeight {
  439. if ps.PickSendVote(rs.LastCommit) {
  440. log.Debug("Picked rs.LastCommit to send")
  441. continue OUTER_LOOP
  442. }
  443. }
  444. // If there are prevotes to send...
  445. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  446. if ps.PickSendVote(rs.Votes.Prevotes(rs.Round)) {
  447. log.Debug("Picked rs.Prevotes(rs.Round) to send")
  448. continue OUTER_LOOP
  449. }
  450. }
  451. // If there are precommits to send...
  452. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  453. if ps.PickSendVote(rs.Votes.Precommits(rs.Round)) {
  454. log.Debug("Picked rs.Precommits(rs.Round) to send")
  455. continue OUTER_LOOP
  456. }
  457. }
  458. // If there are prevotes to send for the last round...
  459. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrevote {
  460. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  461. log.Debug("Picked rs.Prevotes(prs.Round) to send")
  462. continue OUTER_LOOP
  463. }
  464. }
  465. // If there are precommits to send for the last round...
  466. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrecommit {
  467. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  468. log.Debug("Picked rs.Precommits(prs.Round) to send")
  469. continue OUTER_LOOP
  470. }
  471. }
  472. // If there are POLPrevotes to send...
  473. if 0 <= prs.ProposalPOLRound {
  474. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  475. if ps.PickSendVote(polPrevotes) {
  476. log.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  477. continue OUTER_LOOP
  478. }
  479. }
  480. }
  481. }
  482. // Special catchup logic.
  483. // If peer is lagging by height 1, send LastCommit.
  484. if prs.Height != 0 && rs.Height == prs.Height+1 {
  485. if ps.PickSendVote(rs.LastCommit) {
  486. log.Debug("Picked rs.LastCommit to send")
  487. continue OUTER_LOOP
  488. }
  489. }
  490. // Catchup logic
  491. // If peer is lagging by more than 1, send Validation.
  492. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  493. // Load the block validation for prs.Height,
  494. // which contains precommit signatures for prs.Height.
  495. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  496. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  497. if ps.PickSendVote(validation) {
  498. log.Debug("Picked Catchup validation to send")
  499. continue OUTER_LOOP
  500. }
  501. }
  502. if sleeping == 0 {
  503. // We sent nothing. Sleep...
  504. sleeping = 1
  505. log.Debug("No votes to send, sleeping", "peer", peer,
  506. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  507. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  508. } else if sleeping == 2 {
  509. // Continued sleep...
  510. sleeping = 1
  511. }
  512. time.Sleep(peerGossipSleepDuration)
  513. continue OUTER_LOOP
  514. }
  515. }
  516. //-----------------------------------------------------------------------------
  517. // Read only when returned by PeerState.GetRoundState().
  518. type PeerRoundState struct {
  519. Height int // Height peer is at
  520. Round int // Round peer is at
  521. Step RoundStepType // Step peer is at
  522. StartTime time.Time // Estimated start of round 0 at this height
  523. Proposal bool // True if peer has proposal for this round
  524. ProposalBlockPartsHeader types.PartSetHeader //
  525. ProposalBlockParts *BitArray //
  526. ProposalPOLRound int // -1 if none
  527. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  528. Prevotes *BitArray // All votes peer has for this round
  529. Precommits *BitArray // All precommits peer has for this round
  530. LastCommitRound int // Round of commit for last height.
  531. LastCommit *BitArray // All commit precommits of commit for last height.
  532. CatchupCommitRound int // Round that we believe commit round is.
  533. CatchupCommit *BitArray // All commit precommits peer has for this height
  534. }
  535. //-----------------------------------------------------------------------------
  536. var (
  537. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  538. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  539. )
  540. type PeerState struct {
  541. Peer *p2p.Peer
  542. mtx sync.Mutex
  543. PeerRoundState
  544. }
  545. func NewPeerState(peer *p2p.Peer) *PeerState {
  546. return &PeerState{Peer: peer}
  547. }
  548. // Returns an atomic snapshot of the PeerRoundState.
  549. // There's no point in mutating it since it won't change PeerState.
  550. func (ps *PeerState) GetRoundState() *PeerRoundState {
  551. ps.mtx.Lock()
  552. defer ps.mtx.Unlock()
  553. prs := ps.PeerRoundState // copy
  554. return &prs
  555. }
  556. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  557. ps.mtx.Lock()
  558. defer ps.mtx.Unlock()
  559. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  560. return
  561. }
  562. if ps.Proposal {
  563. return
  564. }
  565. ps.Proposal = true
  566. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  567. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  568. ps.ProposalPOLRound = proposal.POLRound
  569. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  570. }
  571. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  572. ps.mtx.Lock()
  573. defer ps.mtx.Unlock()
  574. if ps.Height != height || ps.Round != round {
  575. return
  576. }
  577. ps.ProposalBlockParts.SetIndex(index, true)
  578. }
  579. // Convenience function to send vote to peer.
  580. // Returns true if vote was sent.
  581. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  582. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  583. msg := &VoteMessage{index, vote}
  584. ps.Peer.Send(VoteChannel, msg)
  585. return true
  586. }
  587. return false
  588. }
  589. // votes: Must be the correct Size() for the Height().
  590. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  591. ps.mtx.Lock()
  592. defer ps.mtx.Unlock()
  593. if votes.Size() == 0 {
  594. return 0, nil, false
  595. }
  596. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  597. // Lazily set data using 'votes'.
  598. if votes.IsCommit() {
  599. ps.ensureCatchupCommitRound(height, round, size)
  600. }
  601. ps.ensureVoteBitArrays(height, size)
  602. psVotes := ps.getVoteBitArray(height, round, type_)
  603. if psVotes == nil {
  604. return 0, nil, false // Not something worth sending
  605. }
  606. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  607. ps.setHasVote(height, round, type_, index)
  608. return index, votes.GetByIndex(index), true
  609. }
  610. return 0, nil, false
  611. }
  612. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  613. if ps.Height == height {
  614. if ps.Round == round {
  615. switch type_ {
  616. case types.VoteTypePrevote:
  617. return ps.Prevotes
  618. case types.VoteTypePrecommit:
  619. return ps.Precommits
  620. default:
  621. panic(Fmt("Unexpected vote type %X", type_))
  622. }
  623. }
  624. if ps.CatchupCommitRound == round {
  625. switch type_ {
  626. case types.VoteTypePrevote:
  627. return nil
  628. case types.VoteTypePrecommit:
  629. return ps.CatchupCommit
  630. default:
  631. panic(Fmt("Unexpected vote type %X", type_))
  632. }
  633. }
  634. return nil
  635. }
  636. if ps.Height == height+1 {
  637. if ps.LastCommitRound == round {
  638. switch type_ {
  639. case types.VoteTypePrevote:
  640. return nil
  641. case types.VoteTypePrecommit:
  642. return ps.LastCommit
  643. default:
  644. panic(Fmt("Unexpected vote type %X", type_))
  645. }
  646. }
  647. return nil
  648. }
  649. return nil
  650. }
  651. // NOTE: 'round' is what we know to be the commit round for height.
  652. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  653. if ps.Height != height {
  654. return
  655. }
  656. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  657. panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  658. }
  659. if ps.CatchupCommitRound == round {
  660. return // Nothing to do!
  661. }
  662. ps.CatchupCommitRound = round
  663. if round == ps.Round {
  664. ps.CatchupCommit = ps.Precommits
  665. } else {
  666. ps.CatchupCommit = NewBitArray(numValidators)
  667. }
  668. }
  669. // NOTE: It's important to make sure that numValidators actually matches
  670. // what the node sees as the number of validators for height.
  671. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  672. ps.mtx.Lock()
  673. defer ps.mtx.Unlock()
  674. ps.ensureVoteBitArrays(height, numValidators)
  675. }
  676. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  677. if ps.Height == height {
  678. if ps.Prevotes == nil {
  679. ps.Prevotes = NewBitArray(numValidators)
  680. }
  681. if ps.Precommits == nil {
  682. ps.Precommits = NewBitArray(numValidators)
  683. }
  684. if ps.CatchupCommit == nil {
  685. ps.CatchupCommit = NewBitArray(numValidators)
  686. }
  687. if ps.ProposalPOL == nil {
  688. ps.ProposalPOL = NewBitArray(numValidators)
  689. }
  690. } else if ps.Height == height+1 {
  691. if ps.LastCommit == nil {
  692. ps.LastCommit = NewBitArray(numValidators)
  693. }
  694. }
  695. }
  696. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  697. ps.mtx.Lock()
  698. defer ps.mtx.Unlock()
  699. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  700. }
  701. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  702. log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
  703. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  704. panic("Invalid vote type") // SANITY
  705. }
  706. if ps.Height == height {
  707. if ps.Round == round {
  708. switch type_ {
  709. case types.VoteTypePrevote:
  710. ps.Prevotes.SetIndex(index, true)
  711. log.Debug("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  712. case types.VoteTypePrecommit:
  713. ps.Precommits.SetIndex(index, true)
  714. log.Debug("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  715. }
  716. } else if ps.CatchupCommitRound == round {
  717. switch type_ {
  718. case types.VoteTypePrevote:
  719. case types.VoteTypePrecommit:
  720. ps.CatchupCommit.SetIndex(index, true)
  721. log.Debug("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  722. }
  723. } else if ps.ProposalPOLRound == round {
  724. switch type_ {
  725. case types.VoteTypePrevote:
  726. ps.ProposalPOL.SetIndex(index, true)
  727. log.Debug("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  728. case types.VoteTypePrecommit:
  729. }
  730. }
  731. } else if ps.Height == height+1 {
  732. if ps.LastCommitRound == round {
  733. switch type_ {
  734. case types.VoteTypePrevote:
  735. case types.VoteTypePrecommit:
  736. ps.LastCommit.SetIndex(index, true)
  737. log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  738. }
  739. }
  740. } else {
  741. // Does not apply.
  742. }
  743. }
  744. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  745. ps.mtx.Lock()
  746. defer ps.mtx.Unlock()
  747. // Ignore duplicate messages.
  748. // TODO: This is only necessary because rebroadcastRoundStepRoutine.
  749. if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step {
  750. return
  751. }
  752. // Just remember these values.
  753. psHeight := ps.Height
  754. psRound := ps.Round
  755. //psStep := ps.Step
  756. psCatchupCommitRound := ps.CatchupCommitRound
  757. psCatchupCommit := ps.CatchupCommit
  758. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  759. ps.Height = msg.Height
  760. ps.Round = msg.Round
  761. ps.Step = msg.Step
  762. ps.StartTime = startTime
  763. if psHeight != msg.Height || psRound != msg.Round {
  764. ps.Proposal = false
  765. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  766. ps.ProposalBlockParts = nil
  767. ps.ProposalPOLRound = -1
  768. ps.ProposalPOL = nil
  769. // We'll update the BitArray capacity later.
  770. ps.Prevotes = nil
  771. ps.Precommits = nil
  772. }
  773. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  774. // Peer caught up to CatchupCommitRound.
  775. // Preserve psCatchupCommit!
  776. // NOTE: We prefer to use prs.Precommits if
  777. // pr.Round matches pr.CatchupCommitRound.
  778. ps.Precommits = psCatchupCommit
  779. }
  780. if psHeight != msg.Height {
  781. // Shift Precommits to LastCommit.
  782. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  783. ps.LastCommitRound = msg.LastCommitRound
  784. ps.LastCommit = ps.Precommits
  785. } else {
  786. ps.LastCommitRound = msg.LastCommitRound
  787. ps.LastCommit = nil
  788. }
  789. // We'll update the BitArray capacity later.
  790. ps.CatchupCommitRound = -1
  791. ps.CatchupCommit = nil
  792. }
  793. }
  794. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  795. ps.mtx.Lock()
  796. defer ps.mtx.Unlock()
  797. if ps.Height != msg.Height {
  798. return
  799. }
  800. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  801. ps.ProposalBlockParts = msg.BlockParts
  802. }
  803. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  804. ps.mtx.Lock()
  805. defer ps.mtx.Unlock()
  806. if ps.Height != msg.Height {
  807. return
  808. }
  809. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  810. }
  811. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  812. ps.mtx.Lock()
  813. defer ps.mtx.Unlock()
  814. if ps.Height != msg.Height {
  815. return
  816. }
  817. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  818. return
  819. }
  820. // TODO: Merge onto existing ps.ProposalPOL?
  821. // We might have sent some prevotes in the meantime.
  822. ps.ProposalPOL = msg.ProposalPOL
  823. }
  824. //-----------------------------------------------------------------------------
  825. // Messages
  826. const (
  827. msgTypeNewRoundStep = byte(0x01)
  828. msgTypeCommitStep = byte(0x02)
  829. msgTypeProposal = byte(0x11)
  830. msgTypeProposalPOL = byte(0x12)
  831. msgTypeBlockPart = byte(0x13) // both block & POL
  832. msgTypeVote = byte(0x14)
  833. msgTypeHasVote = byte(0x15)
  834. )
  835. type ConsensusMessage interface{}
  836. var _ = binary.RegisterInterface(
  837. struct{ ConsensusMessage }{},
  838. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  839. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  840. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  841. binary.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  842. binary.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  843. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  844. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  845. )
  846. // TODO: check for unnecessary extra bytes at the end.
  847. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  848. msgType = bz[0]
  849. n := new(int64)
  850. r := bytes.NewReader(bz)
  851. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  852. return
  853. }
  854. //-------------------------------------
  855. // For every height/round/step transition
  856. type NewRoundStepMessage struct {
  857. Height int
  858. Round int
  859. Step RoundStepType
  860. SecondsSinceStartTime int
  861. LastCommitRound int
  862. }
  863. func (m *NewRoundStepMessage) String() string {
  864. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  865. m.Height, m.Round, m.Step, m.LastCommitRound)
  866. }
  867. //-------------------------------------
  868. type CommitStepMessage struct {
  869. Height int
  870. BlockPartsHeader types.PartSetHeader
  871. BlockParts *BitArray
  872. }
  873. func (m *CommitStepMessage) String() string {
  874. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  875. }
  876. //-------------------------------------
  877. type ProposalMessage struct {
  878. Proposal *Proposal
  879. }
  880. func (m *ProposalMessage) String() string {
  881. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  882. }
  883. //-------------------------------------
  884. type ProposalPOLMessage struct {
  885. Height int
  886. ProposalPOLRound int
  887. ProposalPOL *BitArray
  888. }
  889. func (m *ProposalPOLMessage) String() string {
  890. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  891. }
  892. //-------------------------------------
  893. type BlockPartMessage struct {
  894. Height int
  895. Round int
  896. Part *types.Part
  897. }
  898. func (m *BlockPartMessage) String() string {
  899. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  900. }
  901. //-------------------------------------
  902. type VoteMessage struct {
  903. ValidatorIndex int
  904. Vote *types.Vote
  905. }
  906. func (m *VoteMessage) String() string {
  907. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  908. }
  909. //-------------------------------------
  910. type HasVoteMessage struct {
  911. Height int
  912. Round int
  913. Type byte
  914. Index int
  915. }
  916. func (m *HasVoteMessage) String() string {
  917. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  918. }