You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

531 lines
14 KiB

  1. package consensus
  2. import (
  3. "errors"
  4. "fmt"
  5. "time"
  6. cstypes "github.com/tendermint/tendermint/internal/consensus/types"
  7. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  8. "github.com/tendermint/tendermint/libs/bits"
  9. tmjson "github.com/tendermint/tendermint/libs/json"
  10. "github.com/tendermint/tendermint/libs/log"
  11. tmtime "github.com/tendermint/tendermint/libs/time"
  12. tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
  13. "github.com/tendermint/tendermint/types"
  14. )
  15. var (
  16. ErrPeerStateHeightRegression = errors.New("peer state height regression")
  17. ErrPeerStateInvalidStartTime = errors.New("peer state invalid startTime")
  18. )
  19. // peerStateStats holds internal statistics for a peer.
  20. type peerStateStats struct {
  21. Votes int `json:"votes"`
  22. BlockParts int `json:"block_parts"`
  23. }
  24. func (pss peerStateStats) String() string {
  25. return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}", pss.Votes, pss.BlockParts)
  26. }
  27. // PeerState contains the known state of a peer, including its connection and
  28. // threadsafe access to its PeerRoundState.
  29. // NOTE: THIS GETS DUMPED WITH rpc/core/consensus.go.
  30. // Be mindful of what you Expose.
  31. type PeerState struct {
  32. peerID types.NodeID
  33. logger log.Logger
  34. // NOTE: Modify below using setters, never directly.
  35. mtx tmsync.RWMutex
  36. running bool
  37. PRS cstypes.PeerRoundState `json:"round_state"`
  38. Stats *peerStateStats `json:"stats"`
  39. closer *tmsync.Closer
  40. }
  41. // NewPeerState returns a new PeerState for the given node ID.
  42. func NewPeerState(logger log.Logger, peerID types.NodeID) *PeerState {
  43. return &PeerState{
  44. peerID: peerID,
  45. logger: logger,
  46. closer: tmsync.NewCloser(),
  47. PRS: cstypes.PeerRoundState{
  48. Round: -1,
  49. ProposalPOLRound: -1,
  50. LastCommitRound: -1,
  51. CatchupCommitRound: -1,
  52. },
  53. Stats: &peerStateStats{},
  54. }
  55. }
  56. // SetRunning sets the running state of the peer.
  57. func (ps *PeerState) SetRunning(v bool) {
  58. ps.mtx.Lock()
  59. defer ps.mtx.Unlock()
  60. ps.running = v
  61. }
  62. // IsRunning returns true if a PeerState is considered running where multiple
  63. // broadcasting goroutines exist for the peer.
  64. func (ps *PeerState) IsRunning() bool {
  65. ps.mtx.RLock()
  66. defer ps.mtx.RUnlock()
  67. return ps.running
  68. }
  69. // GetRoundState returns a shallow copy of the PeerRoundState. There's no point
  70. // in mutating it since it won't change PeerState.
  71. func (ps *PeerState) GetRoundState() *cstypes.PeerRoundState {
  72. ps.mtx.Lock()
  73. defer ps.mtx.Unlock()
  74. prs := ps.PRS.Copy()
  75. return &prs
  76. }
  77. // ToJSON returns a json of PeerState.
  78. func (ps *PeerState) ToJSON() ([]byte, error) {
  79. ps.mtx.Lock()
  80. defer ps.mtx.Unlock()
  81. return tmjson.Marshal(ps)
  82. }
  83. // GetHeight returns an atomic snapshot of the PeerRoundState's height used by
  84. // the mempool to ensure peers are caught up before broadcasting new txs.
  85. func (ps *PeerState) GetHeight() int64 {
  86. ps.mtx.Lock()
  87. defer ps.mtx.Unlock()
  88. return ps.PRS.Height
  89. }
  90. // SetHasProposal sets the given proposal as known for the peer.
  91. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  92. ps.mtx.Lock()
  93. defer ps.mtx.Unlock()
  94. if ps.PRS.Height != proposal.Height || ps.PRS.Round != proposal.Round {
  95. return
  96. }
  97. if ps.PRS.Proposal {
  98. return
  99. }
  100. ps.PRS.Proposal = true
  101. // ps.PRS.ProposalBlockParts is set due to NewValidBlockMessage
  102. if ps.PRS.ProposalBlockParts != nil {
  103. return
  104. }
  105. ps.PRS.ProposalBlockPartSetHeader = proposal.BlockID.PartSetHeader
  106. ps.PRS.ProposalBlockParts = bits.NewBitArray(int(proposal.BlockID.PartSetHeader.Total))
  107. ps.PRS.ProposalPOLRound = proposal.POLRound
  108. ps.PRS.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  109. }
  110. // InitProposalBlockParts initializes the peer's proposal block parts header
  111. // and bit array.
  112. func (ps *PeerState) InitProposalBlockParts(partSetHeader types.PartSetHeader) {
  113. ps.mtx.Lock()
  114. defer ps.mtx.Unlock()
  115. if ps.PRS.ProposalBlockParts != nil {
  116. return
  117. }
  118. ps.PRS.ProposalBlockPartSetHeader = partSetHeader
  119. ps.PRS.ProposalBlockParts = bits.NewBitArray(int(partSetHeader.Total))
  120. }
  121. // SetHasProposalBlockPart sets the given block part index as known for the peer.
  122. func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index int) {
  123. ps.mtx.Lock()
  124. defer ps.mtx.Unlock()
  125. if ps.PRS.Height != height || ps.PRS.Round != round {
  126. return
  127. }
  128. ps.PRS.ProposalBlockParts.SetIndex(index, true)
  129. }
  130. // PickVoteToSend picks a vote to send to the peer. It will return true if a
  131. // vote was picked.
  132. //
  133. // NOTE: `votes` must be the correct Size() for the Height().
  134. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (*types.Vote, bool) {
  135. ps.mtx.Lock()
  136. defer ps.mtx.Unlock()
  137. if votes.Size() == 0 {
  138. return nil, false
  139. }
  140. var (
  141. height = votes.GetHeight()
  142. round = votes.GetRound()
  143. votesType = tmproto.SignedMsgType(votes.Type())
  144. size = votes.Size()
  145. )
  146. // lazily set data using 'votes'
  147. if votes.IsCommit() {
  148. ps.ensureCatchupCommitRound(height, round, size)
  149. }
  150. ps.ensureVoteBitArrays(height, size)
  151. psVotes := ps.getVoteBitArray(height, round, votesType)
  152. if psVotes == nil {
  153. return nil, false // not something worth sending
  154. }
  155. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  156. vote := votes.GetByIndex(int32(index))
  157. if vote != nil {
  158. return vote, true
  159. }
  160. }
  161. return nil, false
  162. }
  163. func (ps *PeerState) getVoteBitArray(height int64, round int32, votesType tmproto.SignedMsgType) *bits.BitArray {
  164. if !types.IsVoteTypeValid(votesType) {
  165. return nil
  166. }
  167. if ps.PRS.Height == height {
  168. if ps.PRS.Round == round {
  169. switch votesType {
  170. case tmproto.PrevoteType:
  171. return ps.PRS.Prevotes
  172. case tmproto.PrecommitType:
  173. return ps.PRS.Precommits
  174. }
  175. }
  176. if ps.PRS.CatchupCommitRound == round {
  177. switch votesType {
  178. case tmproto.PrevoteType:
  179. return nil
  180. case tmproto.PrecommitType:
  181. return ps.PRS.CatchupCommit
  182. }
  183. }
  184. if ps.PRS.ProposalPOLRound == round {
  185. switch votesType {
  186. case tmproto.PrevoteType:
  187. return ps.PRS.ProposalPOL
  188. case tmproto.PrecommitType:
  189. return nil
  190. }
  191. }
  192. return nil
  193. }
  194. if ps.PRS.Height == height+1 {
  195. if ps.PRS.LastCommitRound == round {
  196. switch votesType {
  197. case tmproto.PrevoteType:
  198. return nil
  199. case tmproto.PrecommitType:
  200. return ps.PRS.LastCommit
  201. }
  202. }
  203. return nil
  204. }
  205. return nil
  206. }
  207. // 'round': A round for which we have a +2/3 commit.
  208. func (ps *PeerState) ensureCatchupCommitRound(height int64, round int32, numValidators int) {
  209. if ps.PRS.Height != height {
  210. return
  211. }
  212. /*
  213. NOTE: This is wrong, 'round' could change.
  214. e.g. if orig round is not the same as block LastCommit round.
  215. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  216. panic(fmt.Sprintf(
  217. "Conflicting CatchupCommitRound. Height: %v,
  218. Orig: %v,
  219. New: %v",
  220. height,
  221. ps.CatchupCommitRound,
  222. round))
  223. }
  224. */
  225. if ps.PRS.CatchupCommitRound == round {
  226. return // Nothing to do!
  227. }
  228. ps.PRS.CatchupCommitRound = round
  229. if round == ps.PRS.Round {
  230. ps.PRS.CatchupCommit = ps.PRS.Precommits
  231. } else {
  232. ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
  233. }
  234. }
  235. // EnsureVoteBitArrays ensures the bit-arrays have been allocated for tracking
  236. // what votes this peer has received.
  237. // NOTE: It's important to make sure that numValidators actually matches
  238. // what the node sees as the number of validators for height.
  239. func (ps *PeerState) EnsureVoteBitArrays(height int64, numValidators int) {
  240. ps.mtx.Lock()
  241. defer ps.mtx.Unlock()
  242. ps.ensureVoteBitArrays(height, numValidators)
  243. }
  244. func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) {
  245. if ps.PRS.Height == height {
  246. if ps.PRS.Prevotes == nil {
  247. ps.PRS.Prevotes = bits.NewBitArray(numValidators)
  248. }
  249. if ps.PRS.Precommits == nil {
  250. ps.PRS.Precommits = bits.NewBitArray(numValidators)
  251. }
  252. if ps.PRS.CatchupCommit == nil {
  253. ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
  254. }
  255. if ps.PRS.ProposalPOL == nil {
  256. ps.PRS.ProposalPOL = bits.NewBitArray(numValidators)
  257. }
  258. } else if ps.PRS.Height == height+1 {
  259. if ps.PRS.LastCommit == nil {
  260. ps.PRS.LastCommit = bits.NewBitArray(numValidators)
  261. }
  262. }
  263. }
  264. // RecordVote increments internal votes related statistics for this peer.
  265. // It returns the total number of added votes.
  266. func (ps *PeerState) RecordVote() int {
  267. ps.mtx.Lock()
  268. defer ps.mtx.Unlock()
  269. ps.Stats.Votes++
  270. return ps.Stats.Votes
  271. }
  272. // VotesSent returns the number of blocks for which peer has been sending us
  273. // votes.
  274. func (ps *PeerState) VotesSent() int {
  275. ps.mtx.Lock()
  276. defer ps.mtx.Unlock()
  277. return ps.Stats.Votes
  278. }
  279. // RecordBlockPart increments internal block part related statistics for this peer.
  280. // It returns the total number of added block parts.
  281. func (ps *PeerState) RecordBlockPart() int {
  282. ps.mtx.Lock()
  283. defer ps.mtx.Unlock()
  284. ps.Stats.BlockParts++
  285. return ps.Stats.BlockParts
  286. }
  287. // BlockPartsSent returns the number of useful block parts the peer has sent us.
  288. func (ps *PeerState) BlockPartsSent() int {
  289. ps.mtx.Lock()
  290. defer ps.mtx.Unlock()
  291. return ps.Stats.BlockParts
  292. }
  293. // SetHasVote sets the given vote as known by the peer
  294. func (ps *PeerState) SetHasVote(vote *types.Vote) {
  295. if vote == nil {
  296. return
  297. }
  298. ps.mtx.Lock()
  299. defer ps.mtx.Unlock()
  300. ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
  301. }
  302. func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.SignedMsgType, index int32) {
  303. logger := ps.logger.With(
  304. "peerH/R", fmt.Sprintf("%d/%d", ps.PRS.Height, ps.PRS.Round),
  305. "H/R", fmt.Sprintf("%d/%d", height, round),
  306. )
  307. logger.Debug("setHasVote", "type", voteType, "index", index)
  308. // NOTE: some may be nil BitArrays -> no side effects
  309. psVotes := ps.getVoteBitArray(height, round, voteType)
  310. if psVotes != nil {
  311. psVotes.SetIndex(int(index), true)
  312. }
  313. }
  314. // ApplyNewRoundStepMessage updates the peer state for the new round.
  315. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  316. ps.mtx.Lock()
  317. defer ps.mtx.Unlock()
  318. // ignore duplicates or decreases
  319. if CompareHRS(msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step) <= 0 {
  320. return
  321. }
  322. var (
  323. psHeight = ps.PRS.Height
  324. psRound = ps.PRS.Round
  325. psCatchupCommitRound = ps.PRS.CatchupCommitRound
  326. psCatchupCommit = ps.PRS.CatchupCommit
  327. startTime = tmtime.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  328. )
  329. ps.PRS.Height = msg.Height
  330. ps.PRS.Round = msg.Round
  331. ps.PRS.Step = msg.Step
  332. ps.PRS.StartTime = startTime
  333. if psHeight != msg.Height || psRound != msg.Round {
  334. ps.PRS.Proposal = false
  335. ps.PRS.ProposalBlockPartSetHeader = types.PartSetHeader{}
  336. ps.PRS.ProposalBlockParts = nil
  337. ps.PRS.ProposalPOLRound = -1
  338. ps.PRS.ProposalPOL = nil
  339. // we'll update the BitArray capacity later
  340. ps.PRS.Prevotes = nil
  341. ps.PRS.Precommits = nil
  342. }
  343. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  344. // Peer caught up to CatchupCommitRound.
  345. // Preserve psCatchupCommit!
  346. // NOTE: We prefer to use prs.Precommits if
  347. // pr.Round matches pr.CatchupCommitRound.
  348. ps.PRS.Precommits = psCatchupCommit
  349. }
  350. if psHeight != msg.Height {
  351. // shift Precommits to LastCommit
  352. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  353. ps.PRS.LastCommitRound = msg.LastCommitRound
  354. ps.PRS.LastCommit = ps.PRS.Precommits
  355. } else {
  356. ps.PRS.LastCommitRound = msg.LastCommitRound
  357. ps.PRS.LastCommit = nil
  358. }
  359. // we'll update the BitArray capacity later
  360. ps.PRS.CatchupCommitRound = -1
  361. ps.PRS.CatchupCommit = nil
  362. }
  363. }
  364. // ApplyNewValidBlockMessage updates the peer state for the new valid block.
  365. func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
  366. ps.mtx.Lock()
  367. defer ps.mtx.Unlock()
  368. if ps.PRS.Height != msg.Height {
  369. return
  370. }
  371. if ps.PRS.Round != msg.Round && !msg.IsCommit {
  372. return
  373. }
  374. ps.PRS.ProposalBlockPartSetHeader = msg.BlockPartSetHeader
  375. ps.PRS.ProposalBlockParts = msg.BlockParts
  376. }
  377. // ApplyProposalPOLMessage updates the peer state for the new proposal POL.
  378. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  379. ps.mtx.Lock()
  380. defer ps.mtx.Unlock()
  381. if ps.PRS.Height != msg.Height {
  382. return
  383. }
  384. if ps.PRS.ProposalPOLRound != msg.ProposalPOLRound {
  385. return
  386. }
  387. // TODO: Merge onto existing ps.PRS.ProposalPOL?
  388. // We might have sent some prevotes in the meantime.
  389. ps.PRS.ProposalPOL = msg.ProposalPOL
  390. }
  391. // ApplyHasVoteMessage updates the peer state for the new vote.
  392. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  393. ps.mtx.Lock()
  394. defer ps.mtx.Unlock()
  395. if ps.PRS.Height != msg.Height {
  396. return
  397. }
  398. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  399. }
  400. // ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
  401. // it claims to have for the corresponding BlockID.
  402. // `ourVotes` is a BitArray of votes we have for msg.BlockID
  403. // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
  404. // we conservatively overwrite ps's votes w/ msg.Votes.
  405. func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *bits.BitArray) {
  406. ps.mtx.Lock()
  407. defer ps.mtx.Unlock()
  408. votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
  409. if votes != nil {
  410. if ourVotes == nil {
  411. votes.Update(msg.Votes)
  412. } else {
  413. otherVotes := votes.Sub(ourVotes)
  414. hasVotes := otherVotes.Or(msg.Votes)
  415. votes.Update(hasVotes)
  416. }
  417. }
  418. }
  419. // String returns a string representation of the PeerState
  420. func (ps *PeerState) String() string {
  421. return ps.StringIndented("")
  422. }
  423. // StringIndented returns a string representation of the PeerState
  424. func (ps *PeerState) StringIndented(indent string) string {
  425. ps.mtx.Lock()
  426. defer ps.mtx.Unlock()
  427. return fmt.Sprintf(`PeerState{
  428. %s Key %v
  429. %s RoundState %v
  430. %s Stats %v
  431. %s}`,
  432. indent, ps.peerID,
  433. indent, ps.PRS.StringIndented(indent+" "),
  434. indent, ps.Stats,
  435. indent,
  436. )
  437. }