You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

156 lines
4.3 KiB

  1. package consensus
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "reflect"
  7. "strconv"
  8. "strings"
  9. "time"
  10. auto "github.com/tendermint/go-autofile"
  11. . "github.com/tendermint/go-common"
  12. "github.com/tendermint/go-wire"
  13. "github.com/tendermint/tendermint/types"
  14. )
  15. // Unmarshal and apply a single message to the consensus state
  16. // as if it were received in receiveRoutine
  17. // Lines that start with "#" are ignored.
  18. // NOTE: receiveRoutine should not be running
  19. func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error {
  20. // Skip over empty and meta lines
  21. if len(msgBytes) == 0 || msgBytes[0] == '#' {
  22. return nil
  23. }
  24. var err error
  25. var msg TimedWALMessage
  26. wire.ReadJSON(&msg, msgBytes, &err)
  27. if err != nil {
  28. fmt.Println("MsgBytes:", msgBytes, string(msgBytes))
  29. return fmt.Errorf("Error reading json data: %v", err)
  30. }
  31. // for logging
  32. switch m := msg.Msg.(type) {
  33. case types.EventDataRoundState:
  34. log.Notice("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
  35. // these are playback checks
  36. ticker := time.After(time.Second * 2)
  37. if newStepCh != nil {
  38. select {
  39. case mi := <-newStepCh:
  40. m2 := mi.(types.EventDataRoundState)
  41. if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
  42. return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
  43. }
  44. case <-ticker:
  45. return fmt.Errorf("Failed to read off newStepCh")
  46. }
  47. }
  48. case msgInfo:
  49. peerKey := m.PeerKey
  50. if peerKey == "" {
  51. peerKey = "local"
  52. }
  53. switch msg := m.Msg.(type) {
  54. case *ProposalMessage:
  55. p := msg.Proposal
  56. log.Notice("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
  57. p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey)
  58. case *BlockPartMessage:
  59. log.Notice("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey)
  60. case *VoteMessage:
  61. v := msg.Vote
  62. log.Notice("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
  63. "blockID", v.BlockID, "peer", peerKey)
  64. }
  65. cs.handleMsg(m, cs.RoundState)
  66. case timeoutInfo:
  67. log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
  68. cs.handleTimeout(m, cs.RoundState)
  69. default:
  70. return fmt.Errorf("Replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg))
  71. }
  72. return nil
  73. }
  74. // replay only those messages since the last block.
  75. // timeoutRoutine should run concurrently to read off tickChan
  76. func (cs *ConsensusState) catchupReplay(csHeight int) error {
  77. // set replayMode
  78. cs.replayMode = true
  79. defer func() { cs.replayMode = false }()
  80. // Ensure that height+1 doesn't exist
  81. gr, found, err := cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight+1))
  82. if found {
  83. return errors.New(Fmt("WAL should not contain height %d.", csHeight+1))
  84. }
  85. if gr != nil {
  86. gr.Close()
  87. }
  88. // Search for height marker
  89. gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight))
  90. if err == io.EOF {
  91. log.Warn("Replay: wal.group.Search returned EOF", "height", csHeight)
  92. return nil
  93. } else if err != nil {
  94. return err
  95. }
  96. if !found {
  97. return errors.New(Fmt("WAL does not contain height %d.", csHeight))
  98. }
  99. defer gr.Close()
  100. log.Notice("Catchup by replaying consensus messages", "height", csHeight)
  101. for {
  102. line, err := gr.ReadLine()
  103. if err != nil {
  104. if err == io.EOF {
  105. break
  106. } else {
  107. return err
  108. }
  109. }
  110. // NOTE: since the priv key is set when the msgs are received
  111. // it will attempt to eg double sign but we can just ignore it
  112. // since the votes will be replayed and we'll get to the next step
  113. if err := cs.readReplayMessage([]byte(line), nil); err != nil {
  114. return err
  115. }
  116. }
  117. log.Notice("Replay: Done")
  118. return nil
  119. }
  120. //--------------------------------------------------------------------------------
  121. // Parses marker lines of the form:
  122. // #HEIGHT: 12345
  123. func makeHeightSearchFunc(height int) auto.SearchFunc {
  124. return func(line string) (int, error) {
  125. line = strings.TrimRight(line, "\n")
  126. parts := strings.Split(line, " ")
  127. if len(parts) != 2 {
  128. return -1, errors.New("Line did not have 2 parts")
  129. }
  130. i, err := strconv.Atoi(parts[1])
  131. if err != nil {
  132. return -1, errors.New("Failed to parse INFO: " + err.Error())
  133. }
  134. if height < i {
  135. return 1, nil
  136. } else if height == i {
  137. return 0, nil
  138. } else {
  139. return -1, nil
  140. }
  141. }
  142. }