You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

779 lines
20 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. . "github.com/tendermint/tendermint/binary"
  11. . "github.com/tendermint/tendermint/blocks"
  12. . "github.com/tendermint/tendermint/common"
  13. "github.com/tendermint/tendermint/mempool"
  14. "github.com/tendermint/tendermint/p2p"
  15. "github.com/tendermint/tendermint/state"
  16. )
  17. const (
  18. StateCh = byte(0x20)
  19. DataCh = byte(0x21)
  20. VoteCh = byte(0x22)
  21. peerStateKey = "ConsensusReactor.peerState"
  22. peerGossipSleepDuration = 50 * time.Millisecond // Time to sleep if there's nothing to send.
  23. )
  24. //-----------------------------------------------------------------------------
  25. type ConsensusReactor struct {
  26. sw *p2p.Switch
  27. started uint32
  28. stopped uint32
  29. quit chan struct{}
  30. conS *ConsensusState
  31. }
  32. func NewConsensusReactor(blockStore *BlockStore, mempool *mempool.Mempool, state *state.State) *ConsensusReactor {
  33. conS := NewConsensusState(state, blockStore, mempool)
  34. conR := &ConsensusReactor{
  35. quit: make(chan struct{}),
  36. conS: conS,
  37. }
  38. return conR
  39. }
  40. // Implements Reactor
  41. func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
  42. if atomic.CompareAndSwapUint32(&conR.started, 0, 1) {
  43. log.Info("Starting ConsensusReactor")
  44. conR.sw = sw
  45. conR.conS.Start()
  46. go conR.broadcastNewRoundStepRoutine()
  47. }
  48. }
  49. // Implements Reactor
  50. func (conR *ConsensusReactor) Stop() {
  51. if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) {
  52. log.Info("Stopping ConsensusReactor")
  53. conR.conS.Stop()
  54. close(conR.quit)
  55. }
  56. }
  57. func (conR *ConsensusReactor) IsStopped() bool {
  58. return atomic.LoadUint32(&conR.stopped) == 1
  59. }
  60. // Implements Reactor
  61. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  62. // TODO optimize
  63. return []*p2p.ChannelDescriptor{
  64. &p2p.ChannelDescriptor{
  65. Id: StateCh,
  66. Priority: 5,
  67. },
  68. &p2p.ChannelDescriptor{
  69. Id: DataCh,
  70. Priority: 5,
  71. },
  72. &p2p.ChannelDescriptor{
  73. Id: VoteCh,
  74. Priority: 5,
  75. },
  76. }
  77. }
  78. // Implements Reactor
  79. func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
  80. // Create peerState for peer
  81. peerState := NewPeerState(peer)
  82. peer.Data.Set(peerStateKey, peerState)
  83. // Begin gossip routines for this peer.
  84. go conR.gossipDataRoutine(peer, peerState)
  85. go conR.gossipVotesRoutine(peer, peerState)
  86. }
  87. // Implements Reactor
  88. func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  89. //peer.Data.Get(peerStateKey).(*PeerState).Disconnect()
  90. }
  91. // Implements Reactor
  92. func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
  93. // Get round state
  94. rs := conR.conS.GetRoundState()
  95. ps := peer.Data.Get(peerStateKey).(*PeerState)
  96. _, msg_ := decodeMessage(msgBytes)
  97. var err error = nil
  98. log.Debug("[%X][%v] Receive: %v", chId, peer, msg_)
  99. switch chId {
  100. case StateCh:
  101. switch msg_.(type) {
  102. case *NewRoundStepMessage:
  103. msg := msg_.(*NewRoundStepMessage)
  104. ps.ApplyNewRoundStepMessage(msg, rs)
  105. case *CommitMessage:
  106. msg := msg_.(*CommitMessage)
  107. ps.ApplyCommitMessage(msg)
  108. case *HasVoteMessage:
  109. msg := msg_.(*HasVoteMessage)
  110. ps.ApplyHasVoteMessage(msg)
  111. default:
  112. // Ignore unknown message
  113. }
  114. case DataCh:
  115. switch msg_.(type) {
  116. case *Proposal:
  117. proposal := msg_.(*Proposal)
  118. ps.SetHasProposal(proposal)
  119. err = conR.conS.SetProposal(proposal)
  120. case *PartMessage:
  121. msg := msg_.(*PartMessage)
  122. if msg.Type == partTypeProposalBlock {
  123. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  124. _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
  125. } else if msg.Type == partTypeProposalPOL {
  126. ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
  127. _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
  128. } else {
  129. // Ignore unknown part type
  130. }
  131. default:
  132. // Ignore unknown message
  133. }
  134. case VoteCh:
  135. switch msg_.(type) {
  136. case *Vote:
  137. vote := msg_.(*Vote)
  138. added, index, err := conR.conS.AddVote(vote)
  139. if err != nil {
  140. log.Warning("Error attempting to add vote: %v", err)
  141. }
  142. // Initialize Prevotes/Precommits/Commits if needed
  143. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  144. ps.SetHasVote(vote, index)
  145. if added {
  146. msg := &HasVoteMessage{
  147. Height: vote.Height,
  148. Round: vote.Round,
  149. Type: vote.Type,
  150. Index: index,
  151. }
  152. conR.sw.Broadcast(StateCh, msg)
  153. }
  154. default:
  155. // Ignore unknown message
  156. }
  157. default:
  158. // Ignore unknown channel
  159. }
  160. if err != nil {
  161. log.Warning("Error in Receive(): %v", err)
  162. }
  163. }
  164. // Sets our private validator account for signing votes.
  165. func (conR *ConsensusReactor) SetPrivValidator(priv *PrivValidator) {
  166. conR.conS.SetPrivValidator(priv)
  167. }
  168. //--------------------------------------
  169. // Listens for changes to the ConsensusState.Step by pulling
  170. // on conR.conS.NewStepCh().
  171. func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
  172. for {
  173. // Get RoundState with new Step or quit.
  174. var rs *RoundState
  175. select {
  176. case rs = <-conR.conS.NewStepCh():
  177. case <-conR.quit:
  178. return
  179. }
  180. // Get seconds since beginning of height.
  181. // Due to the condition documented, this is safe.
  182. timeElapsed := rs.StartTime.Sub(time.Now())
  183. // Broadcast NewRoundStepMessage
  184. {
  185. msg := &NewRoundStepMessage{
  186. Height: rs.Height,
  187. Round: rs.Round,
  188. Step: rs.Step,
  189. SecondsSinceStartTime: uint32(timeElapsed.Seconds()),
  190. }
  191. conR.sw.Broadcast(StateCh, msg)
  192. }
  193. // If the step is commit, then also broadcast a CommitMessage.
  194. if rs.Step == RoundStepCommit {
  195. msg := &CommitMessage{
  196. Height: rs.Height,
  197. BlockParts: rs.ProposalBlockParts.Header(),
  198. BlockBitArray: rs.ProposalBlockParts.BitArray(),
  199. }
  200. conR.sw.Broadcast(StateCh, msg)
  201. }
  202. }
  203. }
  204. func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
  205. OUTER_LOOP:
  206. for {
  207. // Manage disconnects from self or peer.
  208. if peer.IsStopped() || conR.IsStopped() {
  209. log.Info("Stopping gossipDataRoutine for %v.", peer)
  210. return
  211. }
  212. rs := conR.conS.GetRoundState()
  213. prs := ps.GetRoundState()
  214. // Send proposal Block parts?
  215. // NOTE: if we or peer is at RoundStepCommit*, the round
  216. // won't necessarily match, but that's OK.
  217. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
  218. if index, ok := rs.ProposalBlockParts.BitArray().Sub(
  219. prs.ProposalBlockBitArray).PickRandom(); ok {
  220. msg := &PartMessage{
  221. Height: rs.Height,
  222. Round: rs.Round,
  223. Type: partTypeProposalBlock,
  224. Part: rs.ProposalBlockParts.GetPart(uint16(index)),
  225. }
  226. peer.Send(DataCh, msg)
  227. ps.SetHasProposalBlockPart(rs.Height, rs.Round, uint16(index))
  228. continue OUTER_LOOP
  229. }
  230. }
  231. // If height and round doesn't match, sleep.
  232. if rs.Height != prs.Height || rs.Round != prs.Round {
  233. time.Sleep(peerGossipSleepDuration)
  234. continue OUTER_LOOP
  235. }
  236. // Send proposal?
  237. if rs.Proposal != nil && !prs.Proposal {
  238. msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal}
  239. peer.Send(DataCh, msg)
  240. ps.SetHasProposal(rs.Proposal)
  241. continue OUTER_LOOP
  242. }
  243. // Send proposal POL parts?
  244. if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) {
  245. if index, ok := rs.ProposalPOLParts.BitArray().Sub(
  246. prs.ProposalPOLBitArray).PickRandom(); ok {
  247. msg := &PartMessage{
  248. Height: rs.Height,
  249. Round: rs.Round,
  250. Type: partTypeProposalPOL,
  251. Part: rs.ProposalPOLParts.GetPart(uint16(index)),
  252. }
  253. peer.Send(DataCh, msg)
  254. ps.SetHasProposalPOLPart(rs.Height, rs.Round, uint16(index))
  255. continue OUTER_LOOP
  256. }
  257. }
  258. // Nothing to do. Sleep.
  259. time.Sleep(peerGossipSleepDuration)
  260. continue OUTER_LOOP
  261. }
  262. }
  263. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
  264. OUTER_LOOP:
  265. for {
  266. // Manage disconnects from self or peer.
  267. if peer.IsStopped() || conR.IsStopped() {
  268. log.Info("Stopping gossipVotesRoutine for %v.", peer)
  269. return
  270. }
  271. rs := conR.conS.GetRoundState()
  272. prs := ps.GetRoundState()
  273. trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) {
  274. // TODO: give priority to our vote.
  275. index, ok := voteSet.BitArray().Sub(peerVoteSet).PickRandom()
  276. if ok {
  277. vote := voteSet.GetByIndex(index)
  278. // NOTE: vote may be a commit.
  279. msg := p2p.TypedMessage{msgTypeVote, vote}
  280. peer.Send(VoteCh, msg)
  281. ps.SetHasVote(vote, index)
  282. return true
  283. }
  284. return false
  285. }
  286. // If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
  287. if rs.Height == prs.Height {
  288. // If there are lastcommits to send...
  289. if prs.Round == 0 && prs.Step == RoundStepNewHeight {
  290. if prs.LastCommits.Size() == rs.LastCommits.Size() {
  291. if trySendVote(rs.LastCommits, prs.LastCommits) {
  292. continue OUTER_LOOP
  293. }
  294. }
  295. }
  296. // Initialize Prevotes/Precommits/Commits if needed
  297. ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
  298. // If there are prevotes to send...
  299. if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
  300. if trySendVote(rs.Prevotes, prs.Prevotes) {
  301. continue OUTER_LOOP
  302. }
  303. }
  304. // If there are precommits to send...
  305. if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
  306. if trySendVote(rs.Precommits, prs.Precommits) {
  307. continue OUTER_LOOP
  308. }
  309. }
  310. // If there are any commits to send...
  311. if trySendVote(rs.Commits, prs.Commits) {
  312. continue OUTER_LOOP
  313. }
  314. }
  315. // If peer is lagging by height 1, match our LastCommits to peer's Commits.
  316. if rs.Height == prs.Height+1 {
  317. // Initialize Commits if needed
  318. ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommits.Size())
  319. // If there are lastcommits to send...
  320. if trySendVote(rs.LastCommits, prs.Commits) {
  321. continue OUTER_LOOP
  322. }
  323. }
  324. // If peer is lagging by more than 1, load and send Validation and send Commits.
  325. if rs.Height >= prs.Height+2 {
  326. // Load the block header and validation for prs.Height+1,
  327. // which contains commit signatures for prs.Height.
  328. header, validation := conR.conS.LoadHeaderValidation(prs.Height + 1)
  329. size := uint(len(validation.Commits))
  330. // Initialize Commits if needed
  331. ps.EnsureVoteBitArrays(prs.Height, size)
  332. index, ok := validation.BitArray().Sub(prs.Commits).PickRandom()
  333. if ok {
  334. rsig := validation.Commits[index]
  335. // Reconstruct vote.
  336. vote := &Vote{
  337. Height: prs.Height,
  338. Round: rsig.Round,
  339. Type: VoteTypeCommit,
  340. BlockHash: header.LastBlockHash,
  341. BlockParts: header.LastBlockParts,
  342. Signature: rsig.Signature,
  343. }
  344. msg := p2p.TypedMessage{msgTypeVote, vote}
  345. peer.Send(VoteCh, msg)
  346. ps.SetHasVote(vote, index)
  347. continue OUTER_LOOP
  348. }
  349. }
  350. // We sent nothing. Sleep...
  351. time.Sleep(peerGossipSleepDuration)
  352. continue OUTER_LOOP
  353. }
  354. }
  355. //-----------------------------------------------------------------------------
  356. // Read only when returned by PeerState.GetRoundState().
  357. type PeerRoundState struct {
  358. Height uint32 // Height peer is at
  359. Round uint16 // Round peer is at
  360. Step RoundStep // Step peer is at
  361. StartTime time.Time // Estimated start of round 0 at this height
  362. Proposal bool // True if peer has proposal for this round
  363. ProposalBlockParts PartSetHeader //
  364. ProposalBlockBitArray BitArray // True bit -> has part
  365. ProposalPOLParts PartSetHeader //
  366. ProposalPOLBitArray BitArray // True bit -> has part
  367. Prevotes BitArray // All votes peer has for this round
  368. Precommits BitArray // All precommits peer has for this round
  369. Commits BitArray // All commits peer has for this height
  370. LastCommits BitArray // All commits peer has for last height
  371. }
  372. //-----------------------------------------------------------------------------
  373. var (
  374. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  375. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  376. )
  377. type PeerState struct {
  378. mtx sync.Mutex
  379. PeerRoundState
  380. }
  381. func NewPeerState(peer *p2p.Peer) *PeerState {
  382. return &PeerState{}
  383. }
  384. // Returns an atomic snapshot of the PeerRoundState.
  385. // There's no point in mutating it since it won't change PeerState.
  386. func (ps *PeerState) GetRoundState() *PeerRoundState {
  387. ps.mtx.Lock()
  388. defer ps.mtx.Unlock()
  389. prs := ps.PeerRoundState // copy
  390. return &prs
  391. }
  392. func (ps *PeerState) SetHasProposal(proposal *Proposal) {
  393. ps.mtx.Lock()
  394. defer ps.mtx.Unlock()
  395. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  396. return
  397. }
  398. if ps.Proposal {
  399. return
  400. }
  401. ps.Proposal = true
  402. ps.ProposalBlockParts = proposal.BlockParts
  403. ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
  404. ps.ProposalPOLParts = proposal.POLParts
  405. ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
  406. }
  407. func (ps *PeerState) SetHasProposalBlockPart(height uint32, round uint16, index uint16) {
  408. ps.mtx.Lock()
  409. defer ps.mtx.Unlock()
  410. if ps.Height != height || ps.Round != round {
  411. return
  412. }
  413. ps.ProposalBlockBitArray.SetIndex(uint(index), true)
  414. }
  415. func (ps *PeerState) SetHasProposalPOLPart(height uint32, round uint16, index uint16) {
  416. ps.mtx.Lock()
  417. defer ps.mtx.Unlock()
  418. if ps.Height != height || ps.Round != round {
  419. return
  420. }
  421. ps.ProposalPOLBitArray.SetIndex(uint(index), true)
  422. }
  423. func (ps *PeerState) EnsureVoteBitArrays(height uint32, numValidators uint) {
  424. ps.mtx.Lock()
  425. defer ps.mtx.Unlock()
  426. if ps.Height != height {
  427. return
  428. }
  429. if ps.Prevotes.IsZero() {
  430. ps.Prevotes = NewBitArray(numValidators)
  431. }
  432. if ps.Precommits.IsZero() {
  433. ps.Precommits = NewBitArray(numValidators)
  434. }
  435. if ps.Commits.IsZero() {
  436. ps.Commits = NewBitArray(numValidators)
  437. }
  438. }
  439. func (ps *PeerState) SetHasVote(vote *Vote, index uint) {
  440. ps.mtx.Lock()
  441. defer ps.mtx.Unlock()
  442. ps.setHasVote(vote.Height, vote.Round, vote.Type, index)
  443. }
  444. func (ps *PeerState) setHasVote(height uint32, round uint16, type_ byte, index uint) {
  445. if ps.Height == height+1 && type_ == VoteTypeCommit {
  446. // Special case for LastCommits.
  447. ps.LastCommits.SetIndex(index, true)
  448. return
  449. } else if ps.Height != height {
  450. // Does not apply.
  451. return
  452. }
  453. switch type_ {
  454. case VoteTypePrevote:
  455. ps.Prevotes.SetIndex(index, true)
  456. case VoteTypePrecommit:
  457. ps.Precommits.SetIndex(index, true)
  458. case VoteTypeCommit:
  459. if round < ps.Round {
  460. ps.Prevotes.SetIndex(index, true)
  461. ps.Precommits.SetIndex(index, true)
  462. }
  463. ps.Commits.SetIndex(index, true)
  464. default:
  465. panic("Invalid vote type")
  466. }
  467. }
  468. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
  469. ps.mtx.Lock()
  470. defer ps.mtx.Unlock()
  471. // Just remember these values.
  472. psHeight := ps.Height
  473. psRound := ps.Round
  474. //psStep := ps.Step
  475. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  476. ps.Height = msg.Height
  477. ps.Round = msg.Round
  478. ps.Step = msg.Step
  479. ps.StartTime = startTime
  480. if psHeight != msg.Height || psRound != msg.Round {
  481. ps.Proposal = false
  482. ps.ProposalBlockParts = PartSetHeader{}
  483. ps.ProposalBlockBitArray = BitArray{}
  484. ps.ProposalPOLParts = PartSetHeader{}
  485. ps.ProposalPOLBitArray = BitArray{}
  486. // We'll update the BitArray capacity later.
  487. ps.Prevotes = BitArray{}
  488. ps.Precommits = BitArray{}
  489. }
  490. if psHeight != msg.Height {
  491. // Shift Commits to LastCommits
  492. if psHeight+1 == msg.Height {
  493. ps.LastCommits = ps.Commits
  494. } else {
  495. ps.LastCommits = BitArray{}
  496. }
  497. // We'll update the BitArray capacity later.
  498. ps.Commits = BitArray{}
  499. }
  500. }
  501. func (ps *PeerState) ApplyCommitMessage(msg *CommitMessage) {
  502. ps.mtx.Lock()
  503. defer ps.mtx.Unlock()
  504. if ps.Height != msg.Height {
  505. return
  506. }
  507. ps.ProposalBlockParts = msg.BlockParts
  508. ps.ProposalBlockBitArray = msg.BlockBitArray
  509. }
  510. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  511. ps.mtx.Lock()
  512. defer ps.mtx.Unlock()
  513. // Special case for LastCommits
  514. if ps.Height == msg.Height+1 && msg.Type == VoteTypeCommit {
  515. ps.LastCommits.SetIndex(msg.Index, true)
  516. return
  517. } else if ps.Height != msg.Height {
  518. return
  519. }
  520. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  521. }
  522. //-----------------------------------------------------------------------------
  523. // Messages
  524. const (
  525. msgTypeUnknown = byte(0x00)
  526. // Messages for communicating state changes
  527. msgTypeNewRoundStep = byte(0x01)
  528. msgTypeCommit = byte(0x02)
  529. // Messages of data
  530. msgTypeProposal = byte(0x11)
  531. msgTypePart = byte(0x12) // both block & POL
  532. msgTypeVote = byte(0x13)
  533. msgTypeHasVote = byte(0x14)
  534. )
  535. // TODO: check for unnecessary extra bytes at the end.
  536. func decodeMessage(bz []byte) (msgType byte, msg interface{}) {
  537. n, err := new(int64), new(error)
  538. // log.Debug("decoding msg bytes: %X", bz)
  539. msgType = bz[0]
  540. r := bytes.NewReader(bz[1:])
  541. switch msgType {
  542. // Messages for communicating state changes
  543. case msgTypeNewRoundStep:
  544. msg = readNewRoundStepMessage(r, n, err)
  545. case msgTypeCommit:
  546. msg = readCommitMessage(r, n, err)
  547. // Messages of data
  548. case msgTypeProposal:
  549. msg = ReadProposal(r, n, err)
  550. case msgTypePart:
  551. msg = readPartMessage(r, n, err)
  552. case msgTypeVote:
  553. msg = ReadVote(r, n, err)
  554. case msgTypeHasVote:
  555. msg = readHasVoteMessage(r, n, err)
  556. default:
  557. msg = nil
  558. }
  559. return
  560. }
  561. //-------------------------------------
  562. type NewRoundStepMessage struct {
  563. Height uint32
  564. Round uint16
  565. Step RoundStep
  566. SecondsSinceStartTime uint32
  567. }
  568. func readNewRoundStepMessage(r io.Reader, n *int64, err *error) *NewRoundStepMessage {
  569. return &NewRoundStepMessage{
  570. Height: ReadUInt32(r, n, err),
  571. Round: ReadUInt16(r, n, err),
  572. Step: RoundStep(ReadUInt8(r, n, err)),
  573. SecondsSinceStartTime: ReadUInt32(r, n, err),
  574. }
  575. }
  576. func (m *NewRoundStepMessage) WriteTo(w io.Writer) (n int64, err error) {
  577. WriteByte(w, msgTypeNewRoundStep, &n, &err)
  578. WriteUInt32(w, m.Height, &n, &err)
  579. WriteUInt16(w, m.Round, &n, &err)
  580. WriteUInt8(w, uint8(m.Step), &n, &err)
  581. WriteUInt32(w, m.SecondsSinceStartTime, &n, &err)
  582. return
  583. }
  584. func (m *NewRoundStepMessage) String() string {
  585. return fmt.Sprintf("[NewRoundStep %v/%v/%X]", m.Height, m.Round, m.Step)
  586. }
  587. //-------------------------------------
  588. type CommitMessage struct {
  589. Height uint32
  590. BlockParts PartSetHeader
  591. BlockBitArray BitArray
  592. }
  593. func readCommitMessage(r io.Reader, n *int64, err *error) *CommitMessage {
  594. return &CommitMessage{
  595. Height: ReadUInt32(r, n, err),
  596. BlockParts: ReadPartSetHeader(r, n, err),
  597. BlockBitArray: ReadBitArray(r, n, err),
  598. }
  599. }
  600. func (m *CommitMessage) WriteTo(w io.Writer) (n int64, err error) {
  601. WriteByte(w, msgTypeCommit, &n, &err)
  602. WriteUInt32(w, m.Height, &n, &err)
  603. WriteBinary(w, m.BlockParts, &n, &err)
  604. WriteBinary(w, m.BlockBitArray, &n, &err)
  605. return
  606. }
  607. func (m *CommitMessage) String() string {
  608. return fmt.Sprintf("[Commit %v/%v/%v]", m.Height, m.BlockParts, m.BlockBitArray)
  609. }
  610. //-------------------------------------
  611. const (
  612. partTypeProposalBlock = byte(0x01)
  613. partTypeProposalPOL = byte(0x02)
  614. )
  615. type PartMessage struct {
  616. Height uint32
  617. Round uint16
  618. Type byte
  619. Part *Part
  620. }
  621. func readPartMessage(r io.Reader, n *int64, err *error) *PartMessage {
  622. return &PartMessage{
  623. Height: ReadUInt32(r, n, err),
  624. Round: ReadUInt16(r, n, err),
  625. Type: ReadByte(r, n, err),
  626. Part: ReadPart(r, n, err),
  627. }
  628. }
  629. func (m *PartMessage) WriteTo(w io.Writer) (n int64, err error) {
  630. WriteByte(w, msgTypePart, &n, &err)
  631. WriteUInt32(w, m.Height, &n, &err)
  632. WriteUInt16(w, m.Round, &n, &err)
  633. WriteByte(w, m.Type, &n, &err)
  634. WriteBinary(w, m.Part, &n, &err)
  635. return
  636. }
  637. func (m *PartMessage) String() string {
  638. return fmt.Sprintf("[PartMessage H:%v R:%v T:%X]", m.Height, m.Round, m.Type)
  639. }
  640. //-------------------------------------
  641. type HasVoteMessage struct {
  642. Height uint32
  643. Round uint16
  644. Type byte
  645. Index uint
  646. }
  647. func readHasVoteMessage(r io.Reader, n *int64, err *error) *HasVoteMessage {
  648. return &HasVoteMessage{
  649. Height: ReadUInt32(r, n, err),
  650. Round: ReadUInt16(r, n, err),
  651. Type: ReadByte(r, n, err),
  652. Index: ReadUVarInt(r, n, err),
  653. }
  654. }
  655. func (m *HasVoteMessage) WriteTo(w io.Writer) (n int64, err error) {
  656. WriteByte(w, msgTypeHasVote, &n, &err)
  657. WriteUInt32(w, m.Height, &n, &err)
  658. WriteUInt16(w, m.Round, &n, &err)
  659. WriteByte(w, m.Type, &n, &err)
  660. WriteUVarInt(w, m.Index, &n, &err)
  661. return
  662. }
  663. func (m *HasVoteMessage) String() string {
  664. return fmt.Sprintf("[HasVoteMessage H:%v R:%v T:%X]", m.Height, m.Round, m.Type)
  665. }