You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1419 lines
42 KiB

  1. package consensus
  2. import (
  3. "errors"
  4. "fmt"
  5. "reflect"
  6. "sync"
  7. "time"
  8. ""
  9. tmcon ""
  10. cstypes ""
  11. ""
  12. tmevents ""
  13. tmjson ""
  14. ""
  15. tmsync ""
  16. ""
  17. tmcons ""
  18. tmproto ""
  19. sm ""
  20. ""
  21. tmtime ""
  22. )
  23. const (
  24. StateChannel = byte(0x20)
  25. DataChannel = byte(0x21)
  26. VoteChannel = byte(0x22)
  27. VoteSetBitsChannel = byte(0x23)
  28. maxMsgSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes.
  29. blocksToContributeToBecomeGoodPeer = 10000
  30. votesToContributeToBecomeGoodPeer = 10000
  31. )
  32. //-----------------------------------------------------------------------------
  33. // Reactor defines a reactor for the consensus service.
  34. type Reactor struct {
  35. p2p.BaseReactor // BaseService + p2p.Switch
  36. conS *State
  37. mtx tmsync.RWMutex
  38. waitSync bool
  39. eventBus *types.EventBus
  40. Metrics *tmcon.Metrics
  41. }
  42. type ReactorOption func(*Reactor)
  43. // NewReactor returns a new Reactor with the given
  44. // consensusState.
  45. func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor {
  46. conR := &Reactor{
  47. conS: consensusState,
  48. waitSync: waitSync,
  49. Metrics: tmcon.NopMetrics(),
  50. }
  51. conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
  52. for _, option := range options {
  53. option(conR)
  54. }
  55. return conR
  56. }
  57. // OnStart implements BaseService by subscribing to events, which later will be
  58. // broadcasted to other peers and starting state if we're not in fast sync.
  59. func (conR *Reactor) OnStart() error {
  60. conR.Logger.Info("Reactor ", "waitSync", conR.WaitSync())
  61. // start routine that computes peer statistics for evaluating peer quality
  62. go conR.peerStatsRoutine()
  63. conR.subscribeToBroadcastEvents()
  64. if !conR.WaitSync() {
  65. conR.conS.SetSwitch(conR.Switch)
  66. err := conR.conS.Start()
  67. if err != nil {
  68. return err
  69. }
  70. }
  71. return nil
  72. }
  73. // OnStop implements BaseService by unsubscribing from events and stopping
  74. // state.
  75. func (conR *Reactor) OnStop() {
  76. conR.unsubscribeFromBroadcastEvents()
  77. if err := conR.conS.Stop(); err != nil {
  78. conR.Logger.Error("Error stopping consensus state", "err", err)
  79. }
  80. if !conR.WaitSync() {
  81. conR.conS.Wait()
  82. }
  83. }
  84. // SwitchToConsensus switches from fast_sync mode to consensus mode.
  85. // It resets the state, turns off fast_sync, and starts the consensus state-machine
  86. func (conR *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) {
  87. conR.Logger.Info("SwitchToConsensus")
  88. // We have no votes, so reconstruct LastCommit from SeenCommit.
  89. if state.LastBlockHeight > 0 {
  90. conR.conS.reconstructLastCommit(state)
  91. }
  92. // NOTE: The line below causes broadcastNewRoundStepRoutine() to broadcast a
  93. // NewRoundStepMessage.
  94. conR.conS.updateToState(state)
  95. conR.mtx.Lock()
  96. conR.waitSync = false
  97. conR.mtx.Unlock()
  98. conR.Metrics.FastSyncing.Set(0)
  99. conR.Metrics.StateSyncing.Set(0)
  100. if skipWAL {
  101. conR.conS.doWALCatchup = false
  102. }
  103. conR.conS.SetSwitch(conR.Switch)
  104. err := conR.conS.Start()
  105. if err != nil {
  106. panic(fmt.Sprintf(`Failed to start consensus state: %v
  107. conS:
  108. %+v
  109. conR:
  110. %+v`, err, conR.conS, conR))
  111. }
  112. }
  113. // GetChannels implements Reactor
  114. func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
  115. // TODO optimize
  116. return []*p2p.ChannelDescriptor{
  117. {
  118. ID: StateChannel,
  119. Priority: 6,
  120. SendQueueCapacity: 100,
  121. RecvMessageCapacity: maxMsgSize,
  122. },
  123. {
  124. ID: DataChannel, // maybe split between gossiping current block and catchup stuff
  125. // once we gossip the whole block there's nothing left to send until next height or round
  126. Priority: 10,
  127. SendQueueCapacity: 100,
  128. RecvBufferCapacity: 50 * 4096,
  129. RecvMessageCapacity: maxMsgSize,
  130. },
  131. {
  132. ID: VoteChannel,
  133. Priority: 7,
  134. SendQueueCapacity: 100,
  135. RecvBufferCapacity: 100 * 100,
  136. RecvMessageCapacity: maxMsgSize,
  137. },
  138. {
  139. ID: VoteSetBitsChannel,
  140. Priority: 1,
  141. SendQueueCapacity: 2,
  142. RecvBufferCapacity: 1024,
  143. RecvMessageCapacity: maxMsgSize,
  144. },
  145. }
  146. }
  147. // InitPeer implements Reactor by creating a state for the peer.
  148. func (conR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer {
  149. peerState := NewPeerState(peer).SetLogger(conR.Logger)
  150. peer.Set(types.PeerStateKey, peerState)
  151. return peer
  152. }
  153. // AddPeer implements Reactor by spawning multiple gossiping goroutines for the
  154. // peer.
  155. func (conR *Reactor) AddPeer(peer p2p.Peer) {
  156. if !conR.IsRunning() {
  157. return
  158. }
  159. peerState, ok := peer.Get(types.PeerStateKey).(*PeerState)
  160. if !ok {
  161. panic(fmt.Sprintf("peer %v has no state", peer))
  162. }
  163. // Begin routines for this peer.
  164. go conR.gossipDataRoutine(peer, peerState)
  165. go conR.gossipVotesRoutine(peer, peerState)
  166. go conR.queryMaj23Routine(peer, peerState)
  167. // Send our state to peer.
  168. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  169. if !conR.WaitSync() {
  170. conR.sendNewRoundStepMessage(peer)
  171. }
  172. }
  173. // RemovePeer is a noop.
  174. func (conR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  175. if !conR.IsRunning() {
  176. return
  177. }
  178. // TODO
  179. // ps, ok := peer.Get(PeerStateKey).(*PeerState)
  180. // if !ok {
  181. // panic(fmt.Sprintf("Peer %v has no state", peer))
  182. // }
  183. // ps.Disconnect()
  184. }
  185. // Receive implements Reactor
  186. // NOTE: We process these messages even when we're fast_syncing.
  187. // Messages affect either a peer state or the consensus state.
  188. // Peer state updates can happen in parallel, but processing of
  189. // proposals, block parts, and votes are ordered by the receiveRoutine
  190. // NOTE: blocks on consensus state for proposals, block parts, and votes
  191. func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  192. if !conR.IsRunning() {
  193. conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
  194. return
  195. }
  196. msg, err := decodeMsg(msgBytes)
  197. if err != nil {
  198. conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
  199. conR.Switch.StopPeerForError(src, err)
  200. return
  201. }
  202. if err = msg.ValidateBasic(); err != nil {
  203. conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
  204. conR.Switch.StopPeerForError(src, err)
  205. return
  206. }
  207. conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  208. // Get peer states
  209. ps, ok := src.Get(types.PeerStateKey).(*PeerState)
  210. if !ok {
  211. panic(fmt.Sprintf("Peer %v has no state", src))
  212. }
  213. switch chID {
  214. case StateChannel:
  215. switch msg := msg.(type) {
  216. case *tmcon.NewRoundStepMessage:
  217. conR.conS.mtx.Lock()
  218. initialHeight := conR.conS.state.InitialHeight
  219. conR.conS.mtx.Unlock()
  220. if err = msg.ValidateHeight(initialHeight); err != nil {
  221. conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
  222. conR.Switch.StopPeerForError(src, err)
  223. return
  224. }
  225. ps.ApplyNewRoundStepMessage(msg)
  226. case *tmcon.NewValidBlockMessage:
  227. ps.ApplyNewValidBlockMessage(msg)
  228. case *tmcon.HasVoteMessage:
  229. ps.ApplyHasVoteMessage(msg)
  230. case *tmcon.VoteSetMaj23Message:
  231. cs := conR.conS
  232. cs.mtx.Lock()
  233. height, votes := cs.Height, cs.Votes
  234. cs.mtx.Unlock()
  235. if height != msg.Height {
  236. return
  237. }
  238. // Peer claims to have a maj23 for some BlockID at H,R,S,
  239. err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID)
  240. if err != nil {
  241. conR.Switch.StopPeerForError(src, err)
  242. return
  243. }
  244. // Respond with a VoteSetBitsMessage showing which votes we have.
  245. // (and consequently shows which we don't have)
  246. var ourVotes *bits.BitArray
  247. switch msg.Type {
  248. case tmproto.PrevoteType:
  249. ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
  250. case tmproto.PrecommitType:
  251. ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
  252. default:
  253. panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
  254. }
  255. src.TrySend(VoteSetBitsChannel, tmcon.MustEncode(&tmcon.VoteSetBitsMessage{
  256. Height: msg.Height,
  257. Round: msg.Round,
  258. Type: msg.Type,
  259. BlockID: msg.BlockID,
  260. Votes: ourVotes,
  261. }))
  262. default:
  263. conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  264. }
  265. case DataChannel:
  266. if conR.WaitSync() {
  267. conR.Logger.Info("Ignoring message received during sync", "msg", msg)
  268. return
  269. }
  270. switch msg := msg.(type) {
  271. case *tmcon.ProposalMessage:
  272. ps.SetHasProposal(msg.Proposal)
  273. conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
  274. case *tmcon.ProposalPOLMessage:
  275. ps.ApplyProposalPOLMessage(msg)
  276. case *tmcon.BlockPartMessage:
  277. ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
  278. conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
  279. conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
  280. default:
  281. conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  282. }
  283. case VoteChannel:
  284. if conR.WaitSync() {
  285. conR.Logger.Info("Ignoring message received during sync", "msg", msg)
  286. return
  287. }
  288. switch msg := msg.(type) {
  289. case *tmcon.VoteMessage:
  290. cs := conR.conS
  291. cs.mtx.RLock()
  292. height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
  293. cs.mtx.RUnlock()
  294. ps.EnsureVoteBitArrays(height, valSize)
  295. ps.EnsureVoteBitArrays(height-1, lastCommitSize)
  296. ps.SetHasVote(msg.Vote)
  297. cs.peerMsgQueue <- msgInfo{msg, src.ID()}
  298. default:
  299. // don't punish (leave room for soft upgrades)
  300. conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  301. }
  302. case VoteSetBitsChannel:
  303. if conR.WaitSync() {
  304. conR.Logger.Info("Ignoring message received during sync", "msg", msg)
  305. return
  306. }
  307. switch msg := msg.(type) {
  308. case *tmcon.VoteSetBitsMessage:
  309. cs := conR.conS
  310. cs.mtx.Lock()
  311. height, votes := cs.Height, cs.Votes
  312. cs.mtx.Unlock()
  313. if height == msg.Height {
  314. var ourVotes *bits.BitArray
  315. switch msg.Type {
  316. case tmproto.PrevoteType:
  317. ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
  318. case tmproto.PrecommitType:
  319. ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
  320. default:
  321. panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
  322. }
  323. ps.ApplyVoteSetBitsMessage(msg, ourVotes)
  324. } else {
  325. ps.ApplyVoteSetBitsMessage(msg, nil)
  326. }
  327. default:
  328. // don't punish (leave room for soft upgrades)
  329. conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  330. }
  331. default:
  332. conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID))
  333. }
  334. }
  335. // SetEventBus sets event bus.
  336. func (conR *Reactor) SetEventBus(b *types.EventBus) {
  337. conR.eventBus = b
  338. conR.conS.SetEventBus(b)
  339. }
  340. // WaitSync returns whether the consensus reactor is waiting for state/fast sync.
  341. func (conR *Reactor) WaitSync() bool {
  342. conR.mtx.RLock()
  343. defer conR.mtx.RUnlock()
  344. return conR.waitSync
  345. }
  346. //--------------------------------------
  347. // subscribeToBroadcastEvents subscribes for new round steps and votes
  348. // using internal pubsub defined on state to broadcast
  349. // them to peers upon receiving.
  350. func (conR *Reactor) subscribeToBroadcastEvents() {
  351. const subscriber = "consensus-reactor"
  352. if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep,
  353. func(data tmevents.EventData) {
  354. conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState))
  355. }); err != nil {
  356. conR.Logger.Error("Error adding listener for events", "err", err)
  357. }
  358. if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventValidBlock,
  359. func(data tmevents.EventData) {
  360. conR.broadcastNewValidBlockMessage(data.(*cstypes.RoundState))
  361. }); err != nil {
  362. conR.Logger.Error("Error adding listener for events", "err", err)
  363. }
  364. if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote,
  365. func(data tmevents.EventData) {
  366. conR.broadcastHasVoteMessage(data.(*types.Vote))
  367. }); err != nil {
  368. conR.Logger.Error("Error adding listener for events", "err", err)
  369. }
  370. }
  371. func (conR *Reactor) unsubscribeFromBroadcastEvents() {
  372. const subscriber = "consensus-reactor"
  373. conR.conS.evsw.RemoveListener(subscriber)
  374. }
  375. func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
  376. nrsMsg := makeRoundStepMessage(rs)
  377. conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(nrsMsg))
  378. }
  379. func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
  380. csMsg := &tmcon.NewValidBlockMessage{
  381. Height: rs.Height,
  382. Round: rs.Round,
  383. BlockPartSetHeader: rs.ProposalBlockParts.Header(),
  384. BlockParts: rs.ProposalBlockParts.BitArray(),
  385. IsCommit: rs.Step == cstypes.RoundStepCommit,
  386. }
  387. conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(csMsg))
  388. }
  389. // Broadcasts HasVoteMessage to peers that care.
  390. func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
  391. msg := &tmcon.HasVoteMessage{
  392. Height: vote.Height,
  393. Round: vote.Round,
  394. Type: vote.Type,
  395. Index: vote.ValidatorIndex,
  396. }
  397. conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(msg))
  398. /*
  399. // TODO: Make this broadcast more selective.
  400. for _, peer := range conR.Switch.Peers().List() {
  401. ps, ok := peer.Get(PeerStateKey).(*PeerState)
  402. if !ok {
  403. panic(fmt.Sprintf("Peer %v has no state", peer))
  404. }
  405. prs := ps.GetRoundState()
  406. if prs.Height == vote.Height {
  407. // TODO: Also filter on round?
  408. peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
  409. } else {
  410. // Height doesn't match
  411. // TODO: check a field, maybe CatchupCommitRound?
  412. // TODO: But that requires changing the struct field comment.
  413. }
  414. }
  415. */
  416. }
  417. func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcon.NewRoundStepMessage) {
  418. nrsMsg = &tmcon.NewRoundStepMessage{
  419. Height: rs.Height,
  420. Round: rs.Round,
  421. Step: rs.Step,
  422. SecondsSinceStartTime: int64(time.Since(rs.StartTime).Seconds()),
  423. LastCommitRound: rs.LastCommit.GetRound(),
  424. }
  425. return
  426. }
  427. func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
  428. rs := conR.conS.GetRoundState()
  429. nrsMsg := makeRoundStepMessage(rs)
  430. peer.Send(StateChannel, tmcon.MustEncode(nrsMsg))
  431. }
  432. func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
  433. logger := conR.Logger.With("peer", peer)
  434. OUTER_LOOP:
  435. for {
  436. // Manage disconnects from self or peer.
  437. if !peer.IsRunning() || !conR.IsRunning() {
  438. logger.Info("Stopping gossipDataRoutine for peer")
  439. return
  440. }
  441. rs := conR.conS.GetRoundState()
  442. prs := ps.GetRoundState()
  443. // Send proposal Block parts?
  444. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
  445. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  446. part := rs.ProposalBlockParts.GetPart(index)
  447. msg := &tmcon.BlockPartMessage{
  448. Height: rs.Height, // This tells peer that this part applies to us.
  449. Round: rs.Round, // This tells peer that this part applies to us.
  450. Part: part,
  451. }
  452. logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
  453. if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
  454. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  455. }
  456. continue OUTER_LOOP
  457. }
  458. }
  459. // If the peer is on a previous height that we have, help catch up.
  460. if (0 < prs.Height) && (prs.Height < rs.Height) && (prs.Height >= conR.conS.blockStore.Base()) {
  461. heightLogger := logger.With("height", prs.Height)
  462. // if we never received the commit message from the peer, the block parts wont be initialized
  463. if prs.ProposalBlockParts == nil {
  464. blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
  465. if blockMeta == nil {
  466. heightLogger.Error("Failed to load block meta",
  467. "blockstoreBase", conR.conS.blockStore.Base(), "blockstoreHeight", conR.conS.blockStore.Height())
  468. time.Sleep(conR.conS.config.PeerGossipSleepDuration)
  469. } else {
  470. ps.InitProposalBlockParts(blockMeta.BlockID.PartSetHeader)
  471. }
  472. // continue the loop since prs is a copy and not effected by this initialization
  473. continue OUTER_LOOP
  474. }
  475. conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer)
  476. continue OUTER_LOOP
  477. }
  478. // If height and round don't match, sleep.
  479. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  480. time.Sleep(conR.conS.config.PeerGossipSleepDuration)
  481. continue OUTER_LOOP
  482. }
  483. // By here, height and round match.
  484. // Proposal block parts were already matched and sent if any were wanted.
  485. // (These can match on hash so the round doesn't matter)
  486. // Now consider sending other things, like the Proposal itself.
  487. // Send Proposal && ProposalPOL BitArray?
  488. if rs.Proposal != nil && !prs.Proposal {
  489. // Proposal: share the proposal metadata with peer.
  490. {
  491. msg := &tmcon.ProposalMessage{Proposal: rs.Proposal}
  492. logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
  493. if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
  494. // NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
  495. ps.SetHasProposal(rs.Proposal)
  496. }
  497. }
  498. // ProposalPOL: lets peer know which POL votes we have so far.
  499. // Peer must receive ProposalMessage first.
  500. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  501. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  502. if 0 <= rs.Proposal.POLRound {
  503. msg := &tmcon.ProposalPOLMessage{
  504. Height: rs.Height,
  505. ProposalPOLRound: rs.Proposal.POLRound,
  506. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  507. }
  508. logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
  509. peer.Send(DataChannel, tmcon.MustEncode(msg))
  510. }
  511. continue OUTER_LOOP
  512. }
  513. // Nothing to do. Sleep.
  514. time.Sleep(conR.conS.config.PeerGossipSleepDuration)
  515. continue OUTER_LOOP
  516. }
  517. }
  518. func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundState,
  519. prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer) {
  520. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  521. // Ensure that the peer's PartSetHeader is correct
  522. blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
  523. if blockMeta == nil {
  524. logger.Error("Failed to load block meta", "ourHeight", rs.Height,
  525. "blockstoreBase", conR.conS.blockStore.Base(), "blockstoreHeight", conR.conS.blockStore.Height())
  526. time.Sleep(conR.conS.config.PeerGossipSleepDuration)
  527. return
  528. } else if !blockMeta.BlockID.PartSetHeader.Equals(prs.ProposalBlockPartSetHeader) {
  529. logger.Info("Peer ProposalBlockPartSetHeader mismatch, sleeping",
  530. "blockPartSetHeader", blockMeta.BlockID.PartSetHeader, "peerBlockPartSetHeader", prs.ProposalBlockPartSetHeader)
  531. time.Sleep(conR.conS.config.PeerGossipSleepDuration)
  532. return
  533. }
  534. // Load the part
  535. part := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
  536. if part == nil {
  537. logger.Error("Could not load part", "index", index,
  538. "blockPartSetHeader", blockMeta.BlockID.PartSetHeader, "peerBlockPartSetHeader", prs.ProposalBlockPartSetHeader)
  539. time.Sleep(conR.conS.config.PeerGossipSleepDuration)
  540. return
  541. }
  542. // Send the part
  543. msg := &tmcon.BlockPartMessage{
  544. Height: prs.Height, // Not our height, so it doesn't matter.
  545. Round: prs.Round, // Not our height, so it doesn't matter.
  546. Part: part,
  547. }
  548. logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
  549. if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
  550. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  551. } else {
  552. logger.Debug("Sending block part for catchup failed")
  553. }
  554. return
  555. }
  556. time.Sleep(conR.conS.config.PeerGossipSleepDuration)
  557. }
  558. func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
  559. logger := conR.Logger.With("peer", peer)
  560. // Simple hack to throttle logs upon sleep.
  561. var sleeping = 0
  562. OUTER_LOOP:
  563. for {
  564. // Manage disconnects from self or peer.
  565. if !peer.IsRunning() || !conR.IsRunning() {
  566. logger.Info("Stopping gossipVotesRoutine for peer")
  567. return
  568. }
  569. rs := conR.conS.GetRoundState()
  570. prs := ps.GetRoundState()
  571. switch sleeping {
  572. case 1: // First sleep
  573. sleeping = 2
  574. case 2: // No more sleep
  575. sleeping = 0
  576. }
  577. // If height matches, then send LastCommit, Prevotes, Precommits.
  578. if rs.Height == prs.Height {
  579. heightLogger := logger.With("height", prs.Height)
  580. if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
  581. continue OUTER_LOOP
  582. }
  583. }
  584. // Special catchup logic.
  585. // If peer is lagging by height 1, send LastCommit.
  586. if prs.Height != 0 && rs.Height == prs.Height+1 {
  587. if ps.PickSendVote(rs.LastCommit) {
  588. logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
  589. continue OUTER_LOOP
  590. }
  591. }
  592. // Catchup logic
  593. // If peer is lagging by more than 1, send Commit.
  594. if prs.Height != 0 && rs.Height >= prs.Height+2 && prs.Height >= conR.conS.blockStore.Base() {
  595. // Load the block commit for prs.Height,
  596. // which contains precommit signatures for prs.Height.
  597. if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil {
  598. if ps.PickSendVote(commit) {
  599. logger.Debug("Picked Catchup commit to send", "height", prs.Height)
  600. continue OUTER_LOOP
  601. }
  602. }
  603. }
  604. if sleeping == 0 {
  605. // We sent nothing. Sleep...
  606. sleeping = 1
  607. logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
  608. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  609. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  610. } else if sleeping == 2 {
  611. // Continued sleep...
  612. sleeping = 1
  613. }
  614. time.Sleep(conR.conS.config.PeerGossipSleepDuration)
  615. continue OUTER_LOOP
  616. }
  617. }
  618. func (conR *Reactor) gossipVotesForHeight(
  619. logger log.Logger,
  620. rs *cstypes.RoundState,
  621. prs *cstypes.PeerRoundState,
  622. ps *PeerState,
  623. ) bool {
  624. // If there are lastCommits to send...
  625. if prs.Step == cstypes.RoundStepNewHeight {
  626. if ps.PickSendVote(rs.LastCommit) {
  627. logger.Debug("Picked rs.LastCommit to send")
  628. return true
  629. }
  630. }
  631. // If there are POL prevotes to send...
  632. if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 {
  633. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  634. if ps.PickSendVote(polPrevotes) {
  635. logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
  636. "round", prs.ProposalPOLRound)
  637. return true
  638. }
  639. }
  640. }
  641. // If there are prevotes to send...
  642. if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round {
  643. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  644. logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
  645. return true
  646. }
  647. }
  648. // If there are precommits to send...
  649. if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round {
  650. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  651. logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round)
  652. return true
  653. }
  654. }
  655. // If there are prevotes to send...Needed because of validBlock mechanism
  656. if prs.Round != -1 && prs.Round <= rs.Round {
  657. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  658. logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
  659. return true
  660. }
  661. }
  662. // If there are POLPrevotes to send...
  663. if prs.ProposalPOLRound != -1 {
  664. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  665. if ps.PickSendVote(polPrevotes) {
  666. logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
  667. "round", prs.ProposalPOLRound)
  668. return true
  669. }
  670. }
  671. }
  672. return false
  673. }
  674. // NOTE: `queryMaj23Routine` has a simple crude design since it only comes
  675. // into play for liveness when there's a signature DDoS attack happening.
  676. func (conR *Reactor) queryMaj23Routine(peer p2p.Peer, ps *PeerState) {
  677. logger := conR.Logger.With("peer", peer)
  678. OUTER_LOOP:
  679. for {
  680. // Manage disconnects from self or peer.
  681. if !peer.IsRunning() || !conR.IsRunning() {
  682. logger.Info("Stopping queryMaj23Routine for peer")
  683. return
  684. }
  685. // Maybe send Height/Round/Prevotes
  686. {
  687. rs := conR.conS.GetRoundState()
  688. prs := ps.GetRoundState()
  689. if rs.Height == prs.Height {
  690. if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
  691. peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
  692. Height: prs.Height,
  693. Round: prs.Round,
  694. Type: tmproto.PrevoteType,
  695. BlockID: maj23,
  696. }))
  697. time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
  698. }
  699. }
  700. }
  701. // Maybe send Height/Round/Precommits
  702. {
  703. rs := conR.conS.GetRoundState()
  704. prs := ps.GetRoundState()
  705. if rs.Height == prs.Height {
  706. if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
  707. peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
  708. Height: prs.Height,
  709. Round: prs.Round,
  710. Type: tmproto.PrecommitType,
  711. BlockID: maj23,
  712. }))
  713. time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
  714. }
  715. }
  716. }
  717. // Maybe send Height/Round/ProposalPOL
  718. {
  719. rs := conR.conS.GetRoundState()
  720. prs := ps.GetRoundState()
  721. if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
  722. if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
  723. peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
  724. Height: prs.Height,
  725. Round: prs.ProposalPOLRound,
  726. Type: tmproto.PrevoteType,
  727. BlockID: maj23,
  728. }))
  729. time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
  730. }
  731. }
  732. }
  733. // Little point sending LastCommitRound/LastCommit,
  734. // These are fleeting and non-blocking.
  735. // Maybe send Height/CatchupCommitRound/CatchupCommit.
  736. {
  737. prs := ps.GetRoundState()
  738. if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
  739. prs.Height >= conR.conS.blockStore.Base() {
  740. if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
  741. peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
  742. Height: prs.Height,
  743. Round: commit.Round,
  744. Type: tmproto.PrecommitType,
  745. BlockID: commit.BlockID,
  746. }))
  747. time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
  748. }
  749. }
  750. }
  751. time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
  752. continue OUTER_LOOP
  753. }
  754. }
  755. func (conR *Reactor) peerStatsRoutine() {
  756. for {
  757. if !conR.IsRunning() {
  758. conR.Logger.Info("Stopping peerStatsRoutine")
  759. return
  760. }
  761. select {
  762. case msg := <-conR.conS.statsMsgQueue:
  763. // Get peer
  764. peer := conR.Switch.Peers().Get(msg.PeerID)
  765. if peer == nil {
  766. conR.Logger.Debug("Attempt to update stats for non-existent peer",
  767. "peer", msg.PeerID)
  768. continue
  769. }
  770. // Get peer state
  771. ps, ok := peer.Get(types.PeerStateKey).(*PeerState)
  772. if !ok {
  773. panic(fmt.Sprintf("Peer %v has no state", peer))
  774. }
  775. switch msg.Msg.(type) {
  776. case *tmcon.VoteMessage:
  777. if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 {
  778. conR.Switch.MarkPeerAsGood(peer)
  779. }
  780. case *tmcon.BlockPartMessage:
  781. if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 {
  782. conR.Switch.MarkPeerAsGood(peer)
  783. }
  784. }
  785. case <-conR.conS.Quit():
  786. return
  787. case <-conR.Quit():
  788. return
  789. }
  790. }
  791. }
  792. // String returns a string representation of the Reactor.
  793. // NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables.
  794. // TODO: improve!
  795. func (conR *Reactor) String() string {
  796. // better not to access shared variables
  797. return "ConsensusReactor" // conR.StringIndented("")
  798. }
  799. // StringIndented returns an indented string representation of the Reactor
  800. func (conR *Reactor) StringIndented(indent string) string {
  801. s := "ConsensusReactor{\n"
  802. s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n"
  803. for _, peer := range conR.Switch.Peers().List() {
  804. ps, ok := peer.Get(types.PeerStateKey).(*PeerState)
  805. if !ok {
  806. panic(fmt.Sprintf("Peer %v has no state", peer))
  807. }
  808. s += indent + " " + ps.StringIndented(indent+" ") + "\n"
  809. }
  810. s += indent + "}"
  811. return s
  812. }
  813. // ReactorMetrics sets the metrics
  814. func ReactorMetrics(metrics *tmcon.Metrics) ReactorOption {
  815. return func(conR *Reactor) { conR.Metrics = metrics }
  816. }
  817. //-----------------------------------------------------------------------------
  818. var (
  819. ErrPeerStateHeightRegression = errors.New("error peer state height regression")
  820. ErrPeerStateInvalidStartTime = errors.New("error peer state invalid startTime")
  821. )
  822. // PeerState contains the known state of a peer, including its connection and
  823. // threadsafe access to its PeerRoundState.
  824. // NOTE: THIS GETS DUMPED WITH rpc/core/consensus.go.
  825. // Be mindful of what you Expose.
  826. type PeerState struct {
  827. peer p2p.Peer
  828. logger log.Logger
  829. mtx sync.Mutex // NOTE: Modify below using setters, never directly.
  830. PRS cstypes.PeerRoundState `json:"round_state"` // Exposed.
  831. Stats *peerStateStats `json:"stats"` // Exposed.
  832. }
  833. // peerStateStats holds internal statistics for a peer.
  834. type peerStateStats struct {
  835. Votes int `json:"votes"`
  836. BlockParts int `json:"block_parts"`
  837. }
  838. func (pss peerStateStats) String() string {
  839. return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}",
  840. pss.Votes, pss.BlockParts)
  841. }
  842. // NewPeerState returns a new PeerState for the given Peer
  843. func NewPeerState(peer p2p.Peer) *PeerState {
  844. return &PeerState{
  845. peer: peer,
  846. logger: log.NewNopLogger(),
  847. PRS: cstypes.PeerRoundState{
  848. Round: -1,
  849. ProposalPOLRound: -1,
  850. LastCommitRound: -1,
  851. CatchupCommitRound: -1,
  852. },
  853. Stats: &peerStateStats{},
  854. }
  855. }
  856. // SetLogger allows to set a logger on the peer state. Returns the peer state
  857. // itself.
  858. func (ps *PeerState) SetLogger(logger log.Logger) *PeerState {
  859. ps.logger = logger
  860. return ps
  861. }
  862. // GetRoundState returns an shallow copy of the PeerRoundState.
  863. // There's no point in mutating it since it won't change PeerState.
  864. func (ps *PeerState) GetRoundState() *cstypes.PeerRoundState {
  865. ps.mtx.Lock()
  866. defer ps.mtx.Unlock()
  867. prs := ps.PRS // copy
  868. return &prs
  869. }
  870. // ToJSON returns a json of PeerState.
  871. func (ps *PeerState) ToJSON() ([]byte, error) {
  872. ps.mtx.Lock()
  873. defer ps.mtx.Unlock()
  874. return tmjson.Marshal(ps)
  875. }
  876. // GetHeight returns an atomic snapshot of the PeerRoundState's height
  877. // used by the mempool to ensure peers are caught up before broadcasting new txs
  878. func (ps *PeerState) GetHeight() int64 {
  879. ps.mtx.Lock()
  880. defer ps.mtx.Unlock()
  881. return ps.PRS.Height
  882. }
  883. // SetHasProposal sets the given proposal as known for the peer.
  884. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  885. ps.mtx.Lock()
  886. defer ps.mtx.Unlock()
  887. if ps.PRS.Height != proposal.Height || ps.PRS.Round != proposal.Round {
  888. return
  889. }
  890. if ps.PRS.Proposal {
  891. return
  892. }
  893. ps.PRS.Proposal = true
  894. // ps.PRS.ProposalBlockParts is set due to NewValidBlockMessage
  895. if ps.PRS.ProposalBlockParts != nil {
  896. return
  897. }
  898. ps.PRS.ProposalBlockPartSetHeader = proposal.BlockID.PartSetHeader
  899. ps.PRS.ProposalBlockParts = bits.NewBitArray(int(proposal.BlockID.PartSetHeader.Total))
  900. ps.PRS.ProposalPOLRound = proposal.POLRound
  901. ps.PRS.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  902. }
  903. // InitProposalBlockParts initializes the peer's proposal block parts header and bit array.
  904. func (ps *PeerState) InitProposalBlockParts(partSetHeader types.PartSetHeader) {
  905. ps.mtx.Lock()
  906. defer ps.mtx.Unlock()
  907. if ps.PRS.ProposalBlockParts != nil {
  908. return
  909. }
  910. ps.PRS.ProposalBlockPartSetHeader = partSetHeader
  911. ps.PRS.ProposalBlockParts = bits.NewBitArray(int(partSetHeader.Total))
  912. }
  913. // SetHasProposalBlockPart sets the given block part index as known for the peer.
  914. func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index int) {
  915. ps.mtx.Lock()
  916. defer ps.mtx.Unlock()
  917. if ps.PRS.Height != height || ps.PRS.Round != round {
  918. return
  919. }
  920. ps.PRS.ProposalBlockParts.SetIndex(index, true)
  921. }
  922. // PickSendVote picks a vote and sends it to the peer.
  923. // Returns true if vote was sent.
  924. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
  925. if vote, ok := ps.PickVoteToSend(votes); ok {
  926. msg := &tmcon.VoteMessage{Vote: vote}
  927. ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
  928. if ps.peer.Send(VoteChannel, tmcon.MustEncode(msg)) {
  929. ps.SetHasVote(vote)
  930. return true
  931. }
  932. return false
  933. }
  934. return false
  935. }
  936. // PickVoteToSend picks a vote to send to the peer.
  937. // Returns true if a vote was picked.
  938. // NOTE: `votes` must be the correct Size() for the Height().
  939. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) {
  940. ps.mtx.Lock()
  941. defer ps.mtx.Unlock()
  942. if votes.Size() == 0 {
  943. return nil, false
  944. }
  945. height, round, votesType, size :=
  946. votes.GetHeight(), votes.GetRound(), tmproto.SignedMsgType(votes.Type()), votes.Size()
  947. // Lazily set data using 'votes'.
  948. if votes.IsCommit() {
  949. ps.ensureCatchupCommitRound(height, round, size)
  950. }
  951. ps.ensureVoteBitArrays(height, size)
  952. psVotes := ps.getVoteBitArray(height, round, votesType)
  953. if psVotes == nil {
  954. return nil, false // Not something worth sending
  955. }
  956. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  957. return votes.GetByIndex(int32(index)), true
  958. }
  959. return nil, false
  960. }
  961. func (ps *PeerState) getVoteBitArray(height int64, round int32, votesType tmproto.SignedMsgType) *bits.BitArray {
  962. if !types.IsVoteTypeValid(votesType) {
  963. return nil
  964. }
  965. if ps.PRS.Height == height {
  966. if ps.PRS.Round == round {
  967. switch votesType {
  968. case tmproto.PrevoteType:
  969. return ps.PRS.Prevotes
  970. case tmproto.PrecommitType:
  971. return ps.PRS.Precommits
  972. }
  973. }
  974. if ps.PRS.CatchupCommitRound == round {
  975. switch votesType {
  976. case tmproto.PrevoteType:
  977. return nil
  978. case tmproto.PrecommitType:
  979. return ps.PRS.CatchupCommit
  980. }
  981. }
  982. if ps.PRS.ProposalPOLRound == round {
  983. switch votesType {
  984. case tmproto.PrevoteType:
  985. return ps.PRS.ProposalPOL
  986. case tmproto.PrecommitType:
  987. return nil
  988. }
  989. }
  990. return nil
  991. }
  992. if ps.PRS.Height == height+1 {
  993. if ps.PRS.LastCommitRound == round {
  994. switch votesType {
  995. case tmproto.PrevoteType:
  996. return nil
  997. case tmproto.PrecommitType:
  998. return ps.PRS.LastCommit
  999. }
  1000. }
  1001. return nil
  1002. }
  1003. return nil
  1004. }
  1005. // 'round': A round for which we have a +2/3 commit.
  1006. func (ps *PeerState) ensureCatchupCommitRound(height int64, round int32, numValidators int) {
  1007. if ps.PRS.Height != height {
  1008. return
  1009. }
  1010. /*
  1011. NOTE: This is wrong, 'round' could change.
  1012. e.g. if orig round is not the same as block LastCommit round.
  1013. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  1014. panic(fmt.Sprintf(
  1015. "Conflicting CatchupCommitRound. Height: %v,
  1016. Orig: %v,
  1017. New: %v",
  1018. height,
  1019. ps.CatchupCommitRound,
  1020. round))
  1021. }
  1022. */
  1023. if ps.PRS.CatchupCommitRound == round {
  1024. return // Nothing to do!
  1025. }
  1026. ps.PRS.CatchupCommitRound = round
  1027. if round == ps.PRS.Round {
  1028. ps.PRS.CatchupCommit = ps.PRS.Precommits
  1029. } else {
  1030. ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
  1031. }
  1032. }
  1033. // EnsureVoteBitArrays ensures the bit-arrays have been allocated for tracking
  1034. // what votes this peer has received.
  1035. // NOTE: It's important to make sure that numValidators actually matches
  1036. // what the node sees as the number of validators for height.
  1037. func (ps *PeerState) EnsureVoteBitArrays(height int64, numValidators int) {
  1038. ps.mtx.Lock()
  1039. defer ps.mtx.Unlock()
  1040. ps.ensureVoteBitArrays(height, numValidators)
  1041. }
  1042. func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) {
  1043. if ps.PRS.Height == height {
  1044. if ps.PRS.Prevotes == nil {
  1045. ps.PRS.Prevotes = bits.NewBitArray(numValidators)
  1046. }
  1047. if ps.PRS.Precommits == nil {
  1048. ps.PRS.Precommits = bits.NewBitArray(numValidators)
  1049. }
  1050. if ps.PRS.CatchupCommit == nil {
  1051. ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
  1052. }
  1053. if ps.PRS.ProposalPOL == nil {
  1054. ps.PRS.ProposalPOL = bits.NewBitArray(numValidators)
  1055. }
  1056. } else if ps.PRS.Height == height+1 {
  1057. if ps.PRS.LastCommit == nil {
  1058. ps.PRS.LastCommit = bits.NewBitArray(numValidators)
  1059. }
  1060. }
  1061. }
  1062. // RecordVote increments internal votes related statistics for this peer.
  1063. // It returns the total number of added votes.
  1064. func (ps *PeerState) RecordVote() int {
  1065. ps.mtx.Lock()
  1066. defer ps.mtx.Unlock()
  1067. ps.Stats.Votes++
  1068. return ps.Stats.Votes
  1069. }
  1070. // VotesSent returns the number of blocks for which peer has been sending us
  1071. // votes.
  1072. func (ps *PeerState) VotesSent() int {
  1073. ps.mtx.Lock()
  1074. defer ps.mtx.Unlock()
  1075. return ps.Stats.Votes
  1076. }
  1077. // RecordBlockPart increments internal block part related statistics for this peer.
  1078. // It returns the total number of added block parts.
  1079. func (ps *PeerState) RecordBlockPart() int {
  1080. ps.mtx.Lock()
  1081. defer ps.mtx.Unlock()
  1082. ps.Stats.BlockParts++
  1083. return ps.Stats.BlockParts
  1084. }
  1085. // BlockPartsSent returns the number of useful block parts the peer has sent us.
  1086. func (ps *PeerState) BlockPartsSent() int {
  1087. ps.mtx.Lock()
  1088. defer ps.mtx.Unlock()
  1089. return ps.Stats.BlockParts
  1090. }
  1091. // SetHasVote sets the given vote as known by the peer
  1092. func (ps *PeerState) SetHasVote(vote *types.Vote) {
  1093. ps.mtx.Lock()
  1094. defer ps.mtx.Unlock()
  1095. ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
  1096. }
  1097. func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.SignedMsgType, index int32) {
  1098. logger := ps.logger.With(
  1099. "peerH/R",
  1100. fmt.Sprintf("%d/%d", ps.PRS.Height, ps.PRS.Round),
  1101. "H/R",
  1102. fmt.Sprintf("%d/%d", height, round))
  1103. logger.Debug("setHasVote", "type", voteType, "index", index)
  1104. // NOTE: some may be nil BitArrays -> no side effects.
  1105. psVotes := ps.getVoteBitArray(height, round, voteType)
  1106. if psVotes != nil {
  1107. psVotes.SetIndex(int(index), true)
  1108. }
  1109. }
  1110. // ApplyNewRoundStepMessage updates the peer state for the new round.
  1111. func (ps *PeerState) ApplyNewRoundStepMessage(msg *tmcon.NewRoundStepMessage) {
  1112. ps.mtx.Lock()
  1113. defer ps.mtx.Unlock()
  1114. // Ignore duplicates or decreases
  1115. if CompareHRS(msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step) <= 0 {
  1116. return
  1117. }
  1118. // Just remember these values.
  1119. psHeight := ps.PRS.Height
  1120. psRound := ps.PRS.Round
  1121. psCatchupCommitRound := ps.PRS.CatchupCommitRound
  1122. psCatchupCommit := ps.PRS.CatchupCommit
  1123. startTime := tmtime.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  1124. ps.PRS.Height = msg.Height
  1125. ps.PRS.Round = msg.Round
  1126. ps.PRS.Step = msg.Step
  1127. ps.PRS.StartTime = startTime
  1128. if psHeight != msg.Height || psRound != msg.Round {
  1129. ps.PRS.Proposal = false
  1130. ps.PRS.ProposalBlockPartSetHeader = types.PartSetHeader{}
  1131. ps.PRS.ProposalBlockParts = nil
  1132. ps.PRS.ProposalPOLRound = -1
  1133. ps.PRS.ProposalPOL = nil
  1134. // We'll update the BitArray capacity later.
  1135. ps.PRS.Prevotes = nil
  1136. ps.PRS.Precommits = nil
  1137. }
  1138. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  1139. // Peer caught up to CatchupCommitRound.
  1140. // Preserve psCatchupCommit!
  1141. // NOTE: We prefer to use prs.Precommits if
  1142. // pr.Round matches pr.CatchupCommitRound.
  1143. ps.PRS.Precommits = psCatchupCommit
  1144. }
  1145. if psHeight != msg.Height {
  1146. // Shift Precommits to LastCommit.
  1147. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  1148. ps.PRS.LastCommitRound = msg.LastCommitRound
  1149. ps.PRS.LastCommit = ps.PRS.Precommits
  1150. } else {
  1151. ps.PRS.LastCommitRound = msg.LastCommitRound
  1152. ps.PRS.LastCommit = nil
  1153. }
  1154. // We'll update the BitArray capacity later.
  1155. ps.PRS.CatchupCommitRound = -1
  1156. ps.PRS.CatchupCommit = nil
  1157. }
  1158. }
  1159. // ApplyNewValidBlockMessage updates the peer state for the new valid block.
  1160. func (ps *PeerState) ApplyNewValidBlockMessage(msg *tmcon.NewValidBlockMessage) {
  1161. ps.mtx.Lock()
  1162. defer ps.mtx.Unlock()
  1163. if ps.PRS.Height != msg.Height {
  1164. return
  1165. }
  1166. if ps.PRS.Round != msg.Round && !msg.IsCommit {
  1167. return
  1168. }
  1169. ps.PRS.ProposalBlockPartSetHeader = msg.BlockPartSetHeader
  1170. ps.PRS.ProposalBlockParts = msg.BlockParts
  1171. }
  1172. // ApplyProposalPOLMessage updates the peer state for the new proposal POL.
  1173. func (ps *PeerState) ApplyProposalPOLMessage(msg *tmcon.ProposalPOLMessage) {
  1174. ps.mtx.Lock()
  1175. defer ps.mtx.Unlock()
  1176. if ps.PRS.Height != msg.Height {
  1177. return
  1178. }
  1179. if ps.PRS.ProposalPOLRound != msg.ProposalPOLRound {
  1180. return
  1181. }
  1182. // TODO: Merge onto existing ps.PRS.ProposalPOL?
  1183. // We might have sent some prevotes in the meantime.
  1184. ps.PRS.ProposalPOL = msg.ProposalPOL
  1185. }
  1186. // ApplyHasVoteMessage updates the peer state for the new vote.
  1187. func (ps *PeerState) ApplyHasVoteMessage(msg *tmcon.HasVoteMessage) {
  1188. ps.mtx.Lock()
  1189. defer ps.mtx.Unlock()
  1190. if ps.PRS.Height != msg.Height {
  1191. return
  1192. }
  1193. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  1194. }
  1195. // ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
  1196. // it claims to have for the corresponding BlockID.
  1197. // `ourVotes` is a BitArray of votes we have for msg.BlockID
  1198. // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
  1199. // we conservatively overwrite ps's votes w/ msg.Votes.
  1200. func (ps *PeerState) ApplyVoteSetBitsMessage(msg *tmcon.VoteSetBitsMessage, ourVotes *bits.BitArray) {
  1201. ps.mtx.Lock()
  1202. defer ps.mtx.Unlock()
  1203. votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
  1204. if votes != nil {
  1205. if ourVotes == nil {
  1206. votes.Update(msg.Votes)
  1207. } else {
  1208. otherVotes := votes.Sub(ourVotes)
  1209. hasVotes := otherVotes.Or(msg.Votes)
  1210. votes.Update(hasVotes)
  1211. }
  1212. }
  1213. }
  1214. // String returns a string representation of the PeerState
  1215. func (ps *PeerState) String() string {
  1216. return ps.StringIndented("")
  1217. }
  1218. // StringIndented returns a string representation of the PeerState
  1219. func (ps *PeerState) StringIndented(indent string) string {
  1220. ps.mtx.Lock()
  1221. defer ps.mtx.Unlock()
  1222. return fmt.Sprintf(`PeerState{
  1223. %s Key %v
  1224. %s RoundState %v
  1225. %s Stats %v
  1226. %s}`,
  1227. indent, ps.peer.ID(),
  1228. indent, ps.PRS.StringIndented(indent+" "),
  1229. indent, ps.Stats,
  1230. indent)
  1231. }
  1232. //-----------------------------------------------------------------------------
  1233. // func init() {
  1234. // tmjson.RegisterType(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage")
  1235. // tmjson.RegisterType(&NewValidBlockMessage{}, "tendermint/NewValidBlockMessage")
  1236. // tmjson.RegisterType(&ProposalMessage{}, "tendermint/Proposal")
  1237. // tmjson.RegisterType(&ProposalPOLMessage{}, "tendermint/ProposalPOL")
  1238. // tmjson.RegisterType(&BlockPartMessage{}, "tendermint/BlockPart")
  1239. // tmjson.RegisterType(&VoteMessage{}, "tendermint/Vote")
  1240. // tmjson.RegisterType(&HasVoteMessage{}, "tendermint/HasVote")
  1241. // tmjson.RegisterType(&VoteSetMaj23Message{}, "tendermint/VoteSetMaj23")
  1242. // tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
  1243. // }
  1244. func decodeMsg(bz []byte) (msg tmcon.Message, err error) {
  1245. pb := &tmcons.Message{}
  1246. if err = proto.Unmarshal(bz, pb); err != nil {
  1247. return msg, err
  1248. }
  1249. return tmcon.MsgFromProto(pb)
  1250. }