You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1013 lines
29 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. . "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/events"
  15. "github.com/tendermint/tendermint/p2p"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. StateChannel = byte(0x20)
  21. DataChannel = byte(0x21)
  22. VoteChannel = byte(0x22)
  23. PeerStateKey = "ConsensusReactor.peerState"
  24. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
  25. )
  26. //-----------------------------------------------------------------------------
  27. // The reactor's underlying ConsensusState may change state at any time.
  28. // We atomically copy the RoundState struct before using it.
  29. type ConsensusReactor struct {
  30. sw *p2p.Switch
  31. running uint32
  32. quit chan struct{}
  33. blockStore *bc.BlockStore
  34. conS *ConsensusState
  35. // if fast sync is running we don't really do anything
  36. sync bool
  37. evsw events.Fireable
  38. }
  39. func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor {
  40. conR := &ConsensusReactor{
  41. quit: make(chan struct{}),
  42. blockStore: blockStore,
  43. conS: consensusState,
  44. sync: sync,
  45. }
  46. return conR
  47. }
  48. // Implements Reactor
  49. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  50. if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
  51. log.Info("Starting ConsensusReactor")
  52. conR.sw = sw
  53. if !conR.sync {
  54. conR.conS.Start()
  55. }
  56. go conR.broadcastNewRoundStepRoutine()
  57. }
  58. }
  59. // Implements Reactor
  60. func (conR *ConsensusReactor) Stop() {
  61. if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
  62. log.Info("Stopping ConsensusReactor")
  63. conR.conS.Stop()
  64. close(conR.quit)
  65. }
  66. }
  67. func (conR *ConsensusReactor) IsRunning() bool {
  68. return atomic.LoadUint32(&conR.running) == 1
  69. }
  70. // Implements Reactor
  71. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  72. // TODO optimize
  73. return []*p2p.ChannelDescriptor{
  74. &p2p.ChannelDescriptor{
  75. Id: StateChannel,
  76. Priority: 5,
  77. SendQueueCapacity: 100,
  78. },
  79. &p2p.ChannelDescriptor{
  80. Id: DataChannel,
  81. Priority: 5,
  82. SendQueueCapacity: 2,
  83. },
  84. &p2p.ChannelDescriptor{
  85. Id: VoteChannel,
  86. Priority: 5,
  87. SendQueueCapacity: 40,
  88. },
  89. }
  90. }
  91. // Implements Reactor
  92. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  93. if !conR.IsRunning() {
  94. return
  95. }
  96. // Create peerState for peer
  97. peerState := NewPeerState(peer)
  98. peer.Data.Set(PeerStateKey, peerState)
  99. // Begin gossip routines for this peer.
  100. go conR.gossipDataRoutine(peer, peerState)
  101. go conR.gossipVotesRoutine(peer, peerState)
  102. // Send our state to peer.
  103. conR.sendNewRoundStepMessage(peer)
  104. }
  105. // Implements Reactor
  106. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  107. if !conR.IsRunning() {
  108. return
  109. }
  110. //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
  111. }
  112. // Implements Reactor
  113. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  114. if conR.sync || !conR.IsRunning() {
  115. return
  116. }
  117. // Get round state
  118. rs := conR.conS.GetRoundState()
  119. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  120. _, msg_, err := DecodeMessage(msgBytes)
  121. if err != nil {
  122. log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
  123. return
  124. }
  125. log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_) //, "bytes", msgBytes)
  126. switch chId {
  127. case StateChannel:
  128. switch msg := msg_.(type) {
  129. case *NewRoundStepMessage:
  130. ps.ApplyNewRoundStepMessage(msg, rs)
  131. case *CommitStepMessage:
  132. ps.ApplyCommitStepMessage(msg)
  133. case *HasVoteMessage:
  134. ps.ApplyHasVoteMessage(msg)
  135. default:
  136. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  137. }
  138. case DataChannel:
  139. switch msg := msg_.(type) {
  140. case *ProposalMessage:
  141. ps.SetHasProposal(msg.Proposal)
  142. err = conR.conS.SetProposal(msg.Proposal)
  143. case *ProposalPOLMessage:
  144. ps.ApplyProposalPOLMessage(msg)
  145. case *BlockPartMessage:
  146. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
  147. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
  148. default:
  149. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  150. }
  151. case VoteChannel:
  152. switch msg := msg_.(type) {
  153. case *VoteMessage:
  154. vote := msg.Vote
  155. var validators *sm.ValidatorSet
  156. if rs.Height == vote.Height {
  157. validators = rs.Validators
  158. } else if rs.Height == vote.Height+1 {
  159. if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
  160. return // Wrong height, not a LastCommit straggler commit.
  161. }
  162. validators = rs.LastValidators
  163. } else {
  164. return // Wrong height. Not necessarily a bad peer.
  165. }
  166. // We have vote/validators. Height may not be rs.Height
  167. address, _ := validators.GetByIndex(msg.ValidatorIndex)
  168. added, index, err := conR.conS.AddVote(address, vote, peer.Key)
  169. if err != nil {
  170. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
  171. if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
  172. log.Warn("Found conflicting vote. Publish evidence")
  173. evidenceTx := &types.DupeoutTx{
  174. Address: address,
  175. VoteA: *errDupe.VoteA,
  176. VoteB: *errDupe.VoteB,
  177. }
  178. conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
  179. } else {
  180. // Probably an invalid signature. Bad peer.
  181. log.Warn("Error attempting to add vote", "error", err)
  182. // TODO: punish peer
  183. }
  184. }
  185. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  186. ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size())
  187. ps.SetHasVote(vote, index)
  188. if added {
  189. // If rs.Height == vote.Height && rs.Round < vote.Round,
  190. // the peer is sending us CatchupCommit precommits.
  191. // We could make note of this and help filter in broadcastHasVoteMessage().
  192. conR.broadcastHasVoteMessage(vote, index)
  193. }
  194. default:
  195. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  196. }
  197. default:
  198. log.Warn(Fmt("Unknown channel %X", chId))
  199. }
  200. if err != nil {
  201. log.Warn("Error in Receive()", "error", err)
  202. }
  203. }
  204. // Broadcasts HasVoteMessage to peers that care.
  205. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
  206. msg := &HasVoteMessage{
  207. Height: vote.Height,
  208. Round: vote.Round,
  209. Type: vote.Type,
  210. Index: index,
  211. }
  212. conR.sw.Broadcast(StateChannel, msg)
  213. /*
  214. // TODO: Make this broadcast more selective.
  215. for _, peer := range conR.sw.Peers().List() {
  216. ps := peer.Data.Get(PeerStateKey).(*PeerState)
  217. prs := ps.GetRoundState()
  218. if prs.Height == vote.Height {
  219. // TODO: Also filter on round?
  220. peer.TrySend(StateChannel, msg)
  221. } else {
  222. // Height doesn't match
  223. // TODO: check a field, maybe CatchupCommitRound?
  224. // TODO: But that requires changing the struct field comment.
  225. }
  226. }
  227. */
  228. }
  229. // Sets our private validator account for signing votes.
  230. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
  231. conR.conS.SetPrivValidator(priv)
  232. }
  233. // Switch from the fast sync to the consensus:
  234. // reset the state, turn off fast sync, start the consensus-state-machine
  235. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
  236. conR.conS.updateToState(state, false)
  237. conR.sync = false
  238. conR.conS.Start()
  239. }
  240. // implements events.Eventable
  241. func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
  242. conR.evsw = evsw
  243. conR.conS.SetFireable(evsw)
  244. }
  245. //--------------------------------------
  246. func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  247. nrsMsg = &NewRoundStepMessage{
  248. Height: rs.Height,
  249. Round: rs.Round,
  250. Step: rs.Step,
  251. SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
  252. LastCommitRound: rs.LastCommit.Round(),
  253. }
  254. if rs.Step == RoundStepCommit {
  255. csMsg = &CommitStepMessage{
  256. Height: rs.Height,
  257. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  258. BlockParts: rs.ProposalBlockParts.BitArray(),
  259. }
  260. }
  261. return
  262. }
  263. // Listens for changes to the ConsensusState.Step by pulling
  264. // on conR.conS.NewStepCh().
  265. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  266. for {
  267. // Get RoundState with new Step or quit.
  268. var rs *RoundState
  269. select {
  270. case rs = <-conR.conS.NewStepCh():
  271. case <-conR.quit:
  272. return
  273. }
  274. nrsMsg, csMsg := makeRoundStepMessages(rs)
  275. if nrsMsg != nil {
  276. conR.sw.Broadcast(StateChannel, nrsMsg)
  277. }
  278. if csMsg != nil {
  279. conR.sw.Broadcast(StateChannel, csMsg)
  280. }
  281. }
  282. }
  283. func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
  284. rs := conR.conS.GetRoundState()
  285. nrsMsg, csMsg := makeRoundStepMessages(rs)
  286. if nrsMsg != nil {
  287. peer.Send(StateChannel, nrsMsg)
  288. }
  289. if csMsg != nil {
  290. peer.Send(StateChannel, csMsg)
  291. }
  292. }
  293. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  294. log := log.New("peer", peer.Key)
  295. OUTER_LOOP:
  296. for {
  297. // Manage disconnects from self or peer.
  298. if !peer.IsRunning() || !conR.IsRunning() {
  299. log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
  300. return
  301. }
  302. rs := conR.conS.GetRoundState()
  303. prs := ps.GetRoundState()
  304. // Send proposal Block parts?
  305. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  306. //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
  307. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  308. part := rs.ProposalBlockParts.GetPart(index)
  309. msg := &BlockPartMessage{
  310. Height: rs.Height, // This tells peer that this part applies to us.
  311. Round: rs.Round, // This tells peer that this part applies to us.
  312. Part: part,
  313. }
  314. peer.Send(DataChannel, msg)
  315. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  316. continue OUTER_LOOP
  317. }
  318. }
  319. // If the peer is on a previous height, help catch up.
  320. if (0 < prs.Height) && (prs.Height < rs.Height) {
  321. //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
  322. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  323. // Ensure that the peer's PartSetHeader is correct
  324. blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
  325. if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  326. log.Debug("Peer ProposalBlockPartsHeader mismatch, sleeping",
  327. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  328. time.Sleep(peerGossipSleepDuration)
  329. continue OUTER_LOOP
  330. }
  331. // Load the part
  332. part := conR.blockStore.LoadBlockPart(prs.Height, index)
  333. if part == nil {
  334. log.Warn("Could not load part", "index", index,
  335. "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  336. time.Sleep(peerGossipSleepDuration)
  337. continue OUTER_LOOP
  338. }
  339. // Send the part
  340. msg := &BlockPartMessage{
  341. Height: prs.Height, // Not our height, so it doesn't matter.
  342. Round: prs.Round, // Not our height, so it doesn't matter.
  343. Part: part,
  344. }
  345. peer.Send(DataChannel, msg)
  346. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  347. continue OUTER_LOOP
  348. } else {
  349. //log.Debug("No parts to send in catch-up, sleeping")
  350. time.Sleep(peerGossipSleepDuration)
  351. continue OUTER_LOOP
  352. }
  353. }
  354. // If height and round don't match, sleep.
  355. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  356. //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  357. time.Sleep(peerGossipSleepDuration)
  358. continue OUTER_LOOP
  359. }
  360. // By here, height and round match.
  361. // Proposal block parts were already matched and sent if any were wanted.
  362. // (These can match on hash so the round doesn't matter)
  363. // Now consider sending other things, like the Proposal itself.
  364. // Send Proposal && ProposalPOL BitArray?
  365. if rs.Proposal != nil && !prs.Proposal {
  366. // Proposal
  367. {
  368. msg := &ProposalMessage{Proposal: rs.Proposal}
  369. peer.Send(DataChannel, msg)
  370. ps.SetHasProposal(rs.Proposal)
  371. }
  372. // ProposalPOL.
  373. // Peer must receive ProposalMessage first.
  374. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  375. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  376. if 0 <= rs.Proposal.POLRound {
  377. msg := &ProposalPOLMessage{
  378. Height: rs.Height,
  379. ProposalPOLRound: rs.Proposal.POLRound,
  380. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  381. }
  382. peer.Send(DataChannel, msg)
  383. }
  384. continue OUTER_LOOP
  385. }
  386. // Nothing to do. Sleep.
  387. time.Sleep(peerGossipSleepDuration)
  388. continue OUTER_LOOP
  389. }
  390. }
  391. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  392. log := log.New("peer", peer.Key)
  393. // Simple hack to throttle logs upon sleep.
  394. var sleeping = 0
  395. OUTER_LOOP:
  396. for {
  397. // Manage disconnects from self or peer.
  398. if !peer.IsRunning() || !conR.IsRunning() {
  399. log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
  400. return
  401. }
  402. rs := conR.conS.GetRoundState()
  403. prs := ps.GetRoundState()
  404. switch sleeping {
  405. case 1: // First sleep
  406. sleeping = 2
  407. case 2: // No more sleep
  408. sleeping = 0
  409. }
  410. log.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  411. "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  412. // If height matches, then send LastCommit, Prevotes, Precommits.
  413. if rs.Height == prs.Height {
  414. // If there are lastCommits to send...
  415. if prs.Step == RoundStepNewHeight {
  416. if ps.PickSendVote(rs.LastCommit) {
  417. log.Debug("Picked rs.LastCommit to send")
  418. continue OUTER_LOOP
  419. }
  420. }
  421. // If there are prevotes to send...
  422. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  423. if ps.PickSendVote(rs.Votes.Prevotes(rs.Round)) {
  424. log.Debug("Picked rs.Prevotes(rs.Round) to send")
  425. continue OUTER_LOOP
  426. }
  427. }
  428. // If there are precommits to send...
  429. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  430. if ps.PickSendVote(rs.Votes.Precommits(rs.Round)) {
  431. log.Debug("Picked rs.Precommits(rs.Round) to send")
  432. continue OUTER_LOOP
  433. }
  434. }
  435. // If there are prevotes to send for the last round...
  436. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrevote {
  437. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  438. log.Debug("Picked rs.Prevotes(prs.Round) to send")
  439. continue OUTER_LOOP
  440. }
  441. }
  442. // If there are precommits to send for the last round...
  443. if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrecommit {
  444. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  445. log.Debug("Picked rs.Precommits(prs.Round) to send")
  446. continue OUTER_LOOP
  447. }
  448. }
  449. // If there are POLPrevotes to send...
  450. if 0 <= prs.ProposalPOLRound {
  451. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  452. if ps.PickSendVote(polPrevotes) {
  453. log.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
  454. continue OUTER_LOOP
  455. }
  456. }
  457. }
  458. }
  459. // Special catchup logic.
  460. // If peer is lagging by height 1, send LastCommit.
  461. if prs.Height != 0 && rs.Height == prs.Height+1 {
  462. if ps.PickSendVote(rs.LastCommit) {
  463. log.Debug("Picked rs.LastCommit to send")
  464. continue OUTER_LOOP
  465. }
  466. }
  467. // Catchup logic
  468. // If peer is lagging by more than 1, send Validation.
  469. if prs.Height != 0 && rs.Height <= prs.Height+2 {
  470. // Load the block validation for prs.Height,
  471. // which contains precommit signatures for prs.Height.
  472. validation := conR.blockStore.LoadBlockValidation(prs.Height)
  473. log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
  474. if ps.PickSendVote(validation) {
  475. log.Debug("Picked Catchup validation to send")
  476. continue OUTER_LOOP
  477. }
  478. }
  479. if sleeping == 0 {
  480. // We sent nothing. Sleep...
  481. sleeping = 1
  482. log.Debug("No votes to send, sleeping", "peer", peer,
  483. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  484. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  485. } else if sleeping == 2 {
  486. // Continued sleep...
  487. sleeping = 1
  488. }
  489. time.Sleep(peerGossipSleepDuration)
  490. continue OUTER_LOOP
  491. }
  492. }
  493. //-----------------------------------------------------------------------------
  494. // Read only when returned by PeerState.GetRoundState().
  495. type PeerRoundState struct {
  496. Height int // Height peer is at
  497. Round int // Round peer is at
  498. Step RoundStepType // Step peer is at
  499. StartTime time.Time // Estimated start of round 0 at this height
  500. Proposal bool // True if peer has proposal for this round
  501. ProposalBlockPartsHeader types.PartSetHeader //
  502. ProposalBlockParts *BitArray //
  503. ProposalPOLRound int // -1 if none
  504. ProposalPOL *BitArray // nil until ProposalPOLMessage received.
  505. Prevotes *BitArray // All votes peer has for this round
  506. Precommits *BitArray // All precommits peer has for this round
  507. LastCommitRound int // Round of commit for last height.
  508. LastCommit *BitArray // All commit precommits of commit for last height.
  509. CatchupCommitRound int // Round that we believe commit round is.
  510. CatchupCommit *BitArray // All commit precommits peer has for this height
  511. }
  512. //-----------------------------------------------------------------------------
  513. var (
  514. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  515. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  516. )
  517. type PeerState struct {
  518. Peer *p2p.Peer
  519. mtx sync.Mutex
  520. PeerRoundState
  521. }
  522. func NewPeerState(peer *p2p.Peer) *PeerState {
  523. return &PeerState{Peer: peer}
  524. }
  525. // Returns an atomic snapshot of the PeerRoundState.
  526. // There's no point in mutating it since it won't change PeerState.
  527. func (ps *PeerState) GetRoundState() *PeerRoundState {
  528. ps.mtx.Lock()
  529. defer ps.mtx.Unlock()
  530. prs := ps.PeerRoundState // copy
  531. return &prs
  532. }
  533. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  534. ps.mtx.Lock()
  535. defer ps.mtx.Unlock()
  536. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  537. return
  538. }
  539. if ps.Proposal {
  540. return
  541. }
  542. ps.Proposal = true
  543. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  544. ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
  545. ps.ProposalPOLRound = proposal.POLRound
  546. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  547. }
  548. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  549. ps.mtx.Lock()
  550. defer ps.mtx.Unlock()
  551. if ps.Height != height || ps.Round != round {
  552. return
  553. }
  554. ps.ProposalBlockParts.SetIndex(index, true)
  555. }
  556. // Convenience function to send vote to peer.
  557. // Returns true if vote was sent.
  558. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
  559. if index, vote, ok := ps.PickVoteToSend(votes); ok {
  560. msg := &VoteMessage{index, vote}
  561. ps.Peer.Send(VoteChannel, msg)
  562. return true
  563. }
  564. return false
  565. }
  566. // votes: Must be the correct Size() for the Height().
  567. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
  568. ps.mtx.Lock()
  569. defer ps.mtx.Unlock()
  570. if votes.Size() == 0 {
  571. return 0, nil, false
  572. }
  573. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  574. // Lazily set data using 'votes'.
  575. if votes.IsCommit() {
  576. ps.ensureCatchupCommitRound(height, round, size)
  577. }
  578. ps.ensureVoteBitArrays(height, size)
  579. psVotes := ps.getVoteBitArray(height, round, type_)
  580. if psVotes == nil {
  581. return 0, nil, false // Not something worth sending
  582. }
  583. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  584. ps.setHasVote(height, round, type_, index)
  585. return index, votes.GetByIndex(index), true
  586. }
  587. return 0, nil, false
  588. }
  589. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
  590. if ps.Height == height {
  591. if ps.Round == round {
  592. switch type_ {
  593. case types.VoteTypePrevote:
  594. return ps.Prevotes
  595. case types.VoteTypePrecommit:
  596. return ps.Precommits
  597. default:
  598. panic(Fmt("Unexpected vote type %X", type_))
  599. }
  600. }
  601. if ps.CatchupCommitRound == round {
  602. switch type_ {
  603. case types.VoteTypePrevote:
  604. return nil
  605. case types.VoteTypePrecommit:
  606. return ps.CatchupCommit
  607. default:
  608. panic(Fmt("Unexpected vote type %X", type_))
  609. }
  610. }
  611. return nil
  612. }
  613. if ps.Height == height+1 {
  614. if ps.LastCommitRound == round {
  615. switch type_ {
  616. case types.VoteTypePrevote:
  617. return nil
  618. case types.VoteTypePrecommit:
  619. return ps.LastCommit
  620. default:
  621. panic(Fmt("Unexpected vote type %X", type_))
  622. }
  623. }
  624. return nil
  625. }
  626. return nil
  627. }
  628. // NOTE: 'round' is what we know to be the commit round for height.
  629. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  630. if ps.Height != height {
  631. return
  632. }
  633. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  634. panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  635. }
  636. if ps.CatchupCommitRound == round {
  637. return // Nothing to do!
  638. }
  639. ps.CatchupCommitRound = round
  640. if round == ps.Round {
  641. ps.CatchupCommit = ps.Precommits
  642. } else {
  643. ps.CatchupCommit = NewBitArray(numValidators)
  644. }
  645. }
  646. // NOTE: It's important to make sure that numValidators actually matches
  647. // what the node sees as the number of validators for height.
  648. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  649. ps.mtx.Lock()
  650. defer ps.mtx.Unlock()
  651. ps.ensureVoteBitArrays(height, numValidators)
  652. }
  653. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  654. if ps.Height == height {
  655. if ps.Prevotes == nil {
  656. ps.Prevotes = NewBitArray(numValidators)
  657. }
  658. if ps.Precommits == nil {
  659. ps.Precommits = NewBitArray(numValidators)
  660. }
  661. if ps.CatchupCommit == nil {
  662. ps.CatchupCommit = NewBitArray(numValidators)
  663. }
  664. if ps.ProposalPOL == nil {
  665. ps.ProposalPOL = NewBitArray(numValidators)
  666. }
  667. } else if ps.Height == height+1 {
  668. if ps.LastCommit == nil {
  669. ps.LastCommit = NewBitArray(numValidators)
  670. }
  671. }
  672. }
  673. func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
  674. ps.mtx.Lock()
  675. defer ps.mtx.Unlock()
  676. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  677. }
  678. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  679. log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
  680. if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
  681. panic("Invalid vote type") // SANITY
  682. }
  683. if ps.Height == height {
  684. if ps.Round == round {
  685. switch type_ {
  686. case types.VoteTypePrevote:
  687. ps.Prevotes.SetIndex(index, true)
  688. log.Debug("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
  689. case types.VoteTypePrecommit:
  690. ps.Precommits.SetIndex(index, true)
  691. log.Debug("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
  692. }
  693. } else if ps.CatchupCommitRound == round {
  694. switch type_ {
  695. case types.VoteTypePrevote:
  696. case types.VoteTypePrecommit:
  697. ps.CatchupCommit.SetIndex(index, true)
  698. log.Debug("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
  699. }
  700. } else if ps.ProposalPOLRound == round {
  701. switch type_ {
  702. case types.VoteTypePrevote:
  703. ps.ProposalPOL.SetIndex(index, true)
  704. log.Debug("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
  705. case types.VoteTypePrecommit:
  706. }
  707. }
  708. } else if ps.Height == height+1 {
  709. if ps.LastCommitRound == round {
  710. switch type_ {
  711. case types.VoteTypePrevote:
  712. case types.VoteTypePrecommit:
  713. ps.LastCommit.SetIndex(index, true)
  714. log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
  715. }
  716. }
  717. } else {
  718. // Does not apply.
  719. }
  720. }
  721. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  722. ps.mtx.Lock()
  723. defer ps.mtx.Unlock()
  724. // Just remember these values.
  725. psHeight := ps.Height
  726. psRound := ps.Round
  727. //psStep := ps.Step
  728. psCatchupCommitRound := ps.CatchupCommitRound
  729. psCatchupCommit := ps.CatchupCommit
  730. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  731. ps.Height = msg.Height
  732. ps.Round = msg.Round
  733. ps.Step = msg.Step
  734. ps.StartTime = startTime
  735. if psHeight != msg.Height || psRound != msg.Round {
  736. ps.Proposal = false
  737. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  738. ps.ProposalBlockParts = nil
  739. ps.ProposalPOLRound = -1
  740. ps.ProposalPOL = nil
  741. // We'll update the BitArray capacity later.
  742. ps.Prevotes = nil
  743. ps.Precommits = nil
  744. }
  745. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  746. // Peer caught up to CatchupCommitRound.
  747. // Preserve psCatchupCommit!
  748. // NOTE: We prefer to use prs.Precommits if
  749. // pr.Round matches pr.CatchupCommitRound.
  750. ps.Precommits = psCatchupCommit
  751. }
  752. if psHeight != msg.Height {
  753. // Shift Precommits to LastCommit.
  754. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  755. ps.LastCommitRound = msg.LastCommitRound
  756. ps.LastCommit = ps.Precommits
  757. } else {
  758. ps.LastCommitRound = msg.LastCommitRound
  759. ps.LastCommit = nil
  760. }
  761. // We'll update the BitArray capacity later.
  762. ps.CatchupCommitRound = -1
  763. ps.CatchupCommit = nil
  764. }
  765. }
  766. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  767. ps.mtx.Lock()
  768. defer ps.mtx.Unlock()
  769. if ps.Height != msg.Height {
  770. return
  771. }
  772. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  773. ps.ProposalBlockParts = msg.BlockParts
  774. }
  775. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  776. ps.mtx.Lock()
  777. defer ps.mtx.Unlock()
  778. if ps.Height != msg.Height {
  779. return
  780. }
  781. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  782. }
  783. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  784. ps.mtx.Lock()
  785. defer ps.mtx.Unlock()
  786. if ps.Height != msg.Height {
  787. return
  788. }
  789. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  790. return
  791. }
  792. // TODO: Merge onto existing ps.ProposalPOL?
  793. // We might have sent some prevotes in the meantime.
  794. ps.ProposalPOL = msg.ProposalPOL
  795. }
  796. //-----------------------------------------------------------------------------
  797. // Messages
  798. const (
  799. msgTypeNewRoundStep = byte(0x01)
  800. msgTypeCommitStep = byte(0x02)
  801. msgTypeProposal = byte(0x11)
  802. msgTypeProposalPOL = byte(0x12)
  803. msgTypeBlockPart = byte(0x13) // both block & POL
  804. msgTypeVote = byte(0x14)
  805. msgTypeHasVote = byte(0x15)
  806. )
  807. type ConsensusMessage interface{}
  808. var _ = binary.RegisterInterface(
  809. struct{ ConsensusMessage }{},
  810. binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  811. binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  812. binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  813. binary.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  814. binary.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  815. binary.ConcreteType{&VoteMessage{}, msgTypeVote},
  816. binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  817. )
  818. // TODO: check for unnecessary extra bytes at the end.
  819. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  820. msgType = bz[0]
  821. n := new(int64)
  822. r := bytes.NewReader(bz)
  823. msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
  824. return
  825. }
  826. //-------------------------------------
  827. // For every height/round/step transition
  828. type NewRoundStepMessage struct {
  829. Height int
  830. Round int
  831. Step RoundStepType
  832. SecondsSinceStartTime int
  833. LastCommitRound int
  834. }
  835. func (m *NewRoundStepMessage) String() string {
  836. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  837. m.Height, m.Round, m.Step, m.LastCommitRound)
  838. }
  839. //-------------------------------------
  840. type CommitStepMessage struct {
  841. Height int
  842. BlockPartsHeader types.PartSetHeader
  843. BlockParts *BitArray
  844. }
  845. func (m *CommitStepMessage) String() string {
  846. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  847. }
  848. //-------------------------------------
  849. type ProposalMessage struct {
  850. Proposal *Proposal
  851. }
  852. func (m *ProposalMessage) String() string {
  853. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  854. }
  855. //-------------------------------------
  856. type ProposalPOLMessage struct {
  857. Height int
  858. ProposalPOLRound int
  859. ProposalPOL *BitArray
  860. }
  861. func (m *ProposalPOLMessage) String() string {
  862. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  863. }
  864. //-------------------------------------
  865. type BlockPartMessage struct {
  866. Height int
  867. Round int
  868. Part *types.Part
  869. }
  870. func (m *BlockPartMessage) String() string {
  871. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  872. }
  873. //-------------------------------------
  874. type VoteMessage struct {
  875. ValidatorIndex int
  876. Vote *types.Vote
  877. }
  878. func (m *VoteMessage) String() string {
  879. return fmt.Sprintf("[Vote VI:%v V:%v VI:%v]", m.ValidatorIndex, m.Vote, m.ValidatorIndex)
  880. }
  881. //-------------------------------------
  882. type HasVoteMessage struct {
  883. Height int
  884. Round int
  885. Type byte
  886. Index int
  887. }
  888. func (m *HasVoteMessage) String() string {
  889. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  890. }