You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

527 lines
13 KiB

  1. package consensus
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. cstypes "github.com/tendermint/tendermint/internal/consensus/types"
  9. "github.com/tendermint/tendermint/libs/bits"
  10. "github.com/tendermint/tendermint/libs/log"
  11. tmtime "github.com/tendermint/tendermint/libs/time"
  12. tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
  13. "github.com/tendermint/tendermint/types"
  14. )
  15. var (
  16. ErrPeerStateHeightRegression = errors.New("peer state height regression")
  17. ErrPeerStateInvalidStartTime = errors.New("peer state invalid startTime")
  18. )
  19. // peerStateStats holds internal statistics for a peer.
  20. type peerStateStats struct {
  21. Votes int `json:"votes,string"`
  22. BlockParts int `json:"block_parts,string"`
  23. }
  24. func (pss peerStateStats) String() string {
  25. return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}", pss.Votes, pss.BlockParts)
  26. }
  27. // PeerState contains the known state of a peer, including its connection and
  28. // threadsafe access to its PeerRoundState.
  29. // NOTE: THIS GETS DUMPED WITH rpc/core/consensus.go.
  30. // Be mindful of what you Expose.
  31. type PeerState struct {
  32. peerID types.NodeID
  33. logger log.Logger
  34. // NOTE: Modify below using setters, never directly.
  35. mtx sync.RWMutex
  36. running bool
  37. PRS cstypes.PeerRoundState `json:"round_state"`
  38. Stats *peerStateStats `json:"stats"`
  39. }
  40. // NewPeerState returns a new PeerState for the given node ID.
  41. func NewPeerState(logger log.Logger, peerID types.NodeID) *PeerState {
  42. return &PeerState{
  43. peerID: peerID,
  44. logger: logger,
  45. PRS: cstypes.PeerRoundState{
  46. Round: -1,
  47. ProposalPOLRound: -1,
  48. LastCommitRound: -1,
  49. CatchupCommitRound: -1,
  50. },
  51. Stats: &peerStateStats{},
  52. }
  53. }
  54. // SetRunning sets the running state of the peer.
  55. func (ps *PeerState) SetRunning(v bool) {
  56. ps.mtx.Lock()
  57. defer ps.mtx.Unlock()
  58. ps.running = v
  59. }
  60. // IsRunning returns true if a PeerState is considered running where multiple
  61. // broadcasting goroutines exist for the peer.
  62. func (ps *PeerState) IsRunning() bool {
  63. ps.mtx.RLock()
  64. defer ps.mtx.RUnlock()
  65. return ps.running
  66. }
  67. // GetRoundState returns a shallow copy of the PeerRoundState. There's no point
  68. // in mutating it since it won't change PeerState.
  69. func (ps *PeerState) GetRoundState() *cstypes.PeerRoundState {
  70. ps.mtx.Lock()
  71. defer ps.mtx.Unlock()
  72. prs := ps.PRS.Copy()
  73. return &prs
  74. }
  75. // ToJSON returns a json of PeerState.
  76. func (ps *PeerState) ToJSON() ([]byte, error) {
  77. ps.mtx.Lock()
  78. defer ps.mtx.Unlock()
  79. return json.Marshal(ps)
  80. }
  81. // GetHeight returns an atomic snapshot of the PeerRoundState's height used by
  82. // the mempool to ensure peers are caught up before broadcasting new txs.
  83. func (ps *PeerState) GetHeight() int64 {
  84. ps.mtx.Lock()
  85. defer ps.mtx.Unlock()
  86. return ps.PRS.Height
  87. }
  88. // SetHasProposal sets the given proposal as known for the peer.
  89. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  90. ps.mtx.Lock()
  91. defer ps.mtx.Unlock()
  92. if ps.PRS.Height != proposal.Height || ps.PRS.Round != proposal.Round {
  93. return
  94. }
  95. if ps.PRS.Proposal {
  96. return
  97. }
  98. ps.PRS.Proposal = true
  99. // ps.PRS.ProposalBlockParts is set due to NewValidBlockMessage
  100. if ps.PRS.ProposalBlockParts != nil {
  101. return
  102. }
  103. ps.PRS.ProposalBlockPartSetHeader = proposal.BlockID.PartSetHeader
  104. ps.PRS.ProposalBlockParts = bits.NewBitArray(int(proposal.BlockID.PartSetHeader.Total))
  105. ps.PRS.ProposalPOLRound = proposal.POLRound
  106. ps.PRS.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  107. }
  108. // InitProposalBlockParts initializes the peer's proposal block parts header
  109. // and bit array.
  110. func (ps *PeerState) InitProposalBlockParts(partSetHeader types.PartSetHeader) {
  111. ps.mtx.Lock()
  112. defer ps.mtx.Unlock()
  113. if ps.PRS.ProposalBlockParts != nil {
  114. return
  115. }
  116. ps.PRS.ProposalBlockPartSetHeader = partSetHeader
  117. ps.PRS.ProposalBlockParts = bits.NewBitArray(int(partSetHeader.Total))
  118. }
  119. // SetHasProposalBlockPart sets the given block part index as known for the peer.
  120. func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index int) {
  121. ps.mtx.Lock()
  122. defer ps.mtx.Unlock()
  123. if ps.PRS.Height != height || ps.PRS.Round != round {
  124. return
  125. }
  126. ps.PRS.ProposalBlockParts.SetIndex(index, true)
  127. }
  128. // PickVoteToSend picks a vote to send to the peer. It will return true if a
  129. // vote was picked.
  130. //
  131. // NOTE: `votes` must be the correct Size() for the Height().
  132. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (*types.Vote, bool) {
  133. ps.mtx.Lock()
  134. defer ps.mtx.Unlock()
  135. if votes.Size() == 0 {
  136. return nil, false
  137. }
  138. var (
  139. height = votes.GetHeight()
  140. round = votes.GetRound()
  141. votesType = tmproto.SignedMsgType(votes.Type())
  142. size = votes.Size()
  143. )
  144. // lazily set data using 'votes'
  145. if votes.IsCommit() {
  146. ps.ensureCatchupCommitRound(height, round, size)
  147. }
  148. ps.ensureVoteBitArrays(height, size)
  149. psVotes := ps.getVoteBitArray(height, round, votesType)
  150. if psVotes == nil {
  151. return nil, false // not something worth sending
  152. }
  153. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  154. vote := votes.GetByIndex(int32(index))
  155. if vote != nil {
  156. return vote, true
  157. }
  158. }
  159. return nil, false
  160. }
  161. func (ps *PeerState) getVoteBitArray(height int64, round int32, votesType tmproto.SignedMsgType) *bits.BitArray {
  162. if !types.IsVoteTypeValid(votesType) {
  163. return nil
  164. }
  165. if ps.PRS.Height == height {
  166. if ps.PRS.Round == round {
  167. switch votesType {
  168. case tmproto.PrevoteType:
  169. return ps.PRS.Prevotes
  170. case tmproto.PrecommitType:
  171. return ps.PRS.Precommits
  172. }
  173. }
  174. if ps.PRS.CatchupCommitRound == round {
  175. switch votesType {
  176. case tmproto.PrevoteType:
  177. return nil
  178. case tmproto.PrecommitType:
  179. return ps.PRS.CatchupCommit
  180. }
  181. }
  182. if ps.PRS.ProposalPOLRound == round {
  183. switch votesType {
  184. case tmproto.PrevoteType:
  185. return ps.PRS.ProposalPOL
  186. case tmproto.PrecommitType:
  187. return nil
  188. }
  189. }
  190. return nil
  191. }
  192. if ps.PRS.Height == height+1 {
  193. if ps.PRS.LastCommitRound == round {
  194. switch votesType {
  195. case tmproto.PrevoteType:
  196. return nil
  197. case tmproto.PrecommitType:
  198. return ps.PRS.LastCommit
  199. }
  200. }
  201. return nil
  202. }
  203. return nil
  204. }
  205. // 'round': A round for which we have a +2/3 commit.
  206. func (ps *PeerState) ensureCatchupCommitRound(height int64, round int32, numValidators int) {
  207. if ps.PRS.Height != height {
  208. return
  209. }
  210. /*
  211. NOTE: This is wrong, 'round' could change.
  212. e.g. if orig round is not the same as block LastCommit round.
  213. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  214. panic(fmt.Sprintf(
  215. "Conflicting CatchupCommitRound. Height: %v,
  216. Orig: %v,
  217. New: %v",
  218. height,
  219. ps.CatchupCommitRound,
  220. round))
  221. }
  222. */
  223. if ps.PRS.CatchupCommitRound == round {
  224. return // Nothing to do!
  225. }
  226. ps.PRS.CatchupCommitRound = round
  227. if round == ps.PRS.Round {
  228. ps.PRS.CatchupCommit = ps.PRS.Precommits
  229. } else {
  230. ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
  231. }
  232. }
  233. // EnsureVoteBitArrays ensures the bit-arrays have been allocated for tracking
  234. // what votes this peer has received.
  235. // NOTE: It's important to make sure that numValidators actually matches
  236. // what the node sees as the number of validators for height.
  237. func (ps *PeerState) EnsureVoteBitArrays(height int64, numValidators int) {
  238. ps.mtx.Lock()
  239. defer ps.mtx.Unlock()
  240. ps.ensureVoteBitArrays(height, numValidators)
  241. }
  242. func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) {
  243. if ps.PRS.Height == height {
  244. if ps.PRS.Prevotes == nil {
  245. ps.PRS.Prevotes = bits.NewBitArray(numValidators)
  246. }
  247. if ps.PRS.Precommits == nil {
  248. ps.PRS.Precommits = bits.NewBitArray(numValidators)
  249. }
  250. if ps.PRS.CatchupCommit == nil {
  251. ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
  252. }
  253. if ps.PRS.ProposalPOL == nil {
  254. ps.PRS.ProposalPOL = bits.NewBitArray(numValidators)
  255. }
  256. } else if ps.PRS.Height == height+1 {
  257. if ps.PRS.LastCommit == nil {
  258. ps.PRS.LastCommit = bits.NewBitArray(numValidators)
  259. }
  260. }
  261. }
  262. // RecordVote increments internal votes related statistics for this peer.
  263. // It returns the total number of added votes.
  264. func (ps *PeerState) RecordVote() int {
  265. ps.mtx.Lock()
  266. defer ps.mtx.Unlock()
  267. ps.Stats.Votes++
  268. return ps.Stats.Votes
  269. }
  270. // VotesSent returns the number of blocks for which peer has been sending us
  271. // votes.
  272. func (ps *PeerState) VotesSent() int {
  273. ps.mtx.Lock()
  274. defer ps.mtx.Unlock()
  275. return ps.Stats.Votes
  276. }
  277. // RecordBlockPart increments internal block part related statistics for this peer.
  278. // It returns the total number of added block parts.
  279. func (ps *PeerState) RecordBlockPart() int {
  280. ps.mtx.Lock()
  281. defer ps.mtx.Unlock()
  282. ps.Stats.BlockParts++
  283. return ps.Stats.BlockParts
  284. }
  285. // BlockPartsSent returns the number of useful block parts the peer has sent us.
  286. func (ps *PeerState) BlockPartsSent() int {
  287. ps.mtx.Lock()
  288. defer ps.mtx.Unlock()
  289. return ps.Stats.BlockParts
  290. }
  291. // SetHasVote sets the given vote as known by the peer
  292. func (ps *PeerState) SetHasVote(vote *types.Vote) {
  293. if vote == nil {
  294. return
  295. }
  296. ps.mtx.Lock()
  297. defer ps.mtx.Unlock()
  298. ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
  299. }
  300. func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.SignedMsgType, index int32) {
  301. logger := ps.logger.With(
  302. "peerH/R", fmt.Sprintf("%d/%d", ps.PRS.Height, ps.PRS.Round),
  303. "H/R", fmt.Sprintf("%d/%d", height, round),
  304. )
  305. logger.Debug("setHasVote", "type", voteType, "index", index)
  306. // NOTE: some may be nil BitArrays -> no side effects
  307. psVotes := ps.getVoteBitArray(height, round, voteType)
  308. if psVotes != nil {
  309. psVotes.SetIndex(int(index), true)
  310. }
  311. }
  312. // ApplyNewRoundStepMessage updates the peer state for the new round.
  313. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  314. ps.mtx.Lock()
  315. defer ps.mtx.Unlock()
  316. // ignore duplicates or decreases
  317. if CompareHRS(msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step) <= 0 {
  318. return
  319. }
  320. var (
  321. psHeight = ps.PRS.Height
  322. psRound = ps.PRS.Round
  323. psCatchupCommitRound = ps.PRS.CatchupCommitRound
  324. psCatchupCommit = ps.PRS.CatchupCommit
  325. startTime = tmtime.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  326. )
  327. ps.PRS.Height = msg.Height
  328. ps.PRS.Round = msg.Round
  329. ps.PRS.Step = msg.Step
  330. ps.PRS.StartTime = startTime
  331. if psHeight != msg.Height || psRound != msg.Round {
  332. ps.PRS.Proposal = false
  333. ps.PRS.ProposalBlockPartSetHeader = types.PartSetHeader{}
  334. ps.PRS.ProposalBlockParts = nil
  335. ps.PRS.ProposalPOLRound = -1
  336. ps.PRS.ProposalPOL = nil
  337. // we'll update the BitArray capacity later
  338. ps.PRS.Prevotes = nil
  339. ps.PRS.Precommits = nil
  340. }
  341. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  342. // Peer caught up to CatchupCommitRound.
  343. // Preserve psCatchupCommit!
  344. // NOTE: We prefer to use prs.Precommits if
  345. // pr.Round matches pr.CatchupCommitRound.
  346. ps.PRS.Precommits = psCatchupCommit
  347. }
  348. if psHeight != msg.Height {
  349. // shift Precommits to LastCommit
  350. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  351. ps.PRS.LastCommitRound = msg.LastCommitRound
  352. ps.PRS.LastCommit = ps.PRS.Precommits
  353. } else {
  354. ps.PRS.LastCommitRound = msg.LastCommitRound
  355. ps.PRS.LastCommit = nil
  356. }
  357. // we'll update the BitArray capacity later
  358. ps.PRS.CatchupCommitRound = -1
  359. ps.PRS.CatchupCommit = nil
  360. }
  361. }
  362. // ApplyNewValidBlockMessage updates the peer state for the new valid block.
  363. func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
  364. ps.mtx.Lock()
  365. defer ps.mtx.Unlock()
  366. if ps.PRS.Height != msg.Height {
  367. return
  368. }
  369. if ps.PRS.Round != msg.Round && !msg.IsCommit {
  370. return
  371. }
  372. ps.PRS.ProposalBlockPartSetHeader = msg.BlockPartSetHeader
  373. ps.PRS.ProposalBlockParts = msg.BlockParts
  374. }
  375. // ApplyProposalPOLMessage updates the peer state for the new proposal POL.
  376. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  377. ps.mtx.Lock()
  378. defer ps.mtx.Unlock()
  379. if ps.PRS.Height != msg.Height {
  380. return
  381. }
  382. if ps.PRS.ProposalPOLRound != msg.ProposalPOLRound {
  383. return
  384. }
  385. // TODO: Merge onto existing ps.PRS.ProposalPOL?
  386. // We might have sent some prevotes in the meantime.
  387. ps.PRS.ProposalPOL = msg.ProposalPOL
  388. }
  389. // ApplyHasVoteMessage updates the peer state for the new vote.
  390. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  391. ps.mtx.Lock()
  392. defer ps.mtx.Unlock()
  393. if ps.PRS.Height != msg.Height {
  394. return
  395. }
  396. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  397. }
  398. // ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
  399. // it claims to have for the corresponding BlockID.
  400. // `ourVotes` is a BitArray of votes we have for msg.BlockID
  401. // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
  402. // we conservatively overwrite ps's votes w/ msg.Votes.
  403. func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *bits.BitArray) {
  404. ps.mtx.Lock()
  405. defer ps.mtx.Unlock()
  406. votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
  407. if votes != nil {
  408. if ourVotes == nil {
  409. votes.Update(msg.Votes)
  410. } else {
  411. otherVotes := votes.Sub(ourVotes)
  412. hasVotes := otherVotes.Or(msg.Votes)
  413. votes.Update(hasVotes)
  414. }
  415. }
  416. }
  417. // String returns a string representation of the PeerState
  418. func (ps *PeerState) String() string {
  419. return ps.StringIndented("")
  420. }
  421. // StringIndented returns a string representation of the PeerState
  422. func (ps *PeerState) StringIndented(indent string) string {
  423. ps.mtx.Lock()
  424. defer ps.mtx.Unlock()
  425. return fmt.Sprintf(`PeerState{
  426. %s Key %v
  427. %s RoundState %v
  428. %s Stats %v
  429. %s}`,
  430. indent, ps.peerID,
  431. indent, ps.PRS.StringIndented(indent+" "),
  432. indent, ps.Stats,
  433. indent,
  434. )
  435. }