You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

378 lines
9.9 KiB

  1. package consensus
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. "time"
  12. . "github.com/tendermint/go-common"
  13. "github.com/tendermint/go-wire"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. // unmarshal and apply a single message to the consensus state
  18. // as if it were received in receiveRoutine
  19. // NOTE: receiveRoutine should not be running
  20. func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error {
  21. var err error
  22. var msg ConsensusLogMessage
  23. wire.ReadJSON(&msg, msgBytes, &err)
  24. if err != nil {
  25. fmt.Println("MsgBytes:", msgBytes, string(msgBytes))
  26. return fmt.Errorf("Error reading json data: %v", err)
  27. }
  28. // for logging
  29. switch m := msg.Msg.(type) {
  30. case types.EventDataRoundState:
  31. log.Notice("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
  32. // these are playback checks
  33. ticker := time.After(time.Second * 2)
  34. if newStepCh != nil {
  35. select {
  36. case mi := <-newStepCh:
  37. m2 := mi.(types.EventDataRoundState)
  38. if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
  39. return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
  40. }
  41. case <-ticker:
  42. return fmt.Errorf("Failed to read off newStepCh")
  43. }
  44. }
  45. case msgInfo:
  46. peerKey := m.PeerKey
  47. if peerKey == "" {
  48. peerKey = "local"
  49. }
  50. switch msg := m.Msg.(type) {
  51. case *ProposalMessage:
  52. p := msg.Proposal
  53. log.Notice("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
  54. p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey)
  55. case *BlockPartMessage:
  56. log.Notice("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey)
  57. case *VoteMessage:
  58. v := msg.Vote
  59. log.Notice("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
  60. "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey)
  61. }
  62. cs.handleMsg(m, cs.RoundState)
  63. case timeoutInfo:
  64. log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
  65. cs.handleTimeout(m, cs.RoundState)
  66. default:
  67. return fmt.Errorf("Replay: Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg))
  68. }
  69. return nil
  70. }
  71. // replay only those messages since the last block.
  72. // timeoutRoutine should run concurrently to read off tickChan
  73. func (cs *ConsensusState) catchupReplay(csHeight int) error {
  74. if !cs.wal.Exists() {
  75. return nil
  76. }
  77. // set replayMode
  78. cs.replayMode = true
  79. defer func() { cs.replayMode = false }()
  80. // starting from end of file,
  81. // read messages until a new height is found
  82. var walHeight int
  83. nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool {
  84. var err error
  85. var msg ConsensusLogMessage
  86. wire.ReadJSON(&msg, lineBytes, &err)
  87. if err != nil {
  88. panic(Fmt("Failed to read cs_msg_log json: %v", err))
  89. }
  90. m, ok := msg.Msg.(types.EventDataRoundState)
  91. walHeight = m.Height
  92. if ok && m.Step == RoundStepNewHeight.String() {
  93. return true
  94. }
  95. return false
  96. })
  97. if err != nil {
  98. return err
  99. }
  100. // ensure the height matches
  101. if walHeight != csHeight {
  102. var err error
  103. if walHeight > csHeight {
  104. err = errors.New(Fmt("WAL height (%d) exceeds cs height (%d). Is your cs.state corrupted?", walHeight, csHeight))
  105. } else {
  106. log.Notice("Replay: nothing to do", "cs.height", csHeight, "wal.height", walHeight)
  107. }
  108. return err
  109. }
  110. var beginning bool // if we had to go back to the beginning
  111. if c, _ := cs.wal.fp.Seek(0, 1); c == 0 {
  112. beginning = true
  113. }
  114. log.Notice("Catchup by replaying consensus messages", "n", nLines, "height", walHeight)
  115. // now we can replay the latest nLines on consensus state
  116. // note we can't use scan because we've already been reading from the file
  117. // XXX: if a msg is too big we need to find out why or increase this for that case ...
  118. maxMsgSize := 1000000
  119. reader := bufio.NewReaderSize(cs.wal.fp, maxMsgSize)
  120. for i := 0; i < nLines; i++ {
  121. msgBytes, err := reader.ReadBytes('\n')
  122. if err == io.EOF {
  123. log.Warn("Replay: EOF", "bytes", string(msgBytes))
  124. break
  125. } else if err != nil {
  126. return err
  127. } else if len(msgBytes) == 0 {
  128. log.Warn("Replay: msg bytes is 0")
  129. continue
  130. } else if len(msgBytes) == 1 && msgBytes[0] == '\n' {
  131. log.Warn("Replay: new line")
  132. continue
  133. }
  134. // the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it
  135. if !beginning && i == 1 {
  136. log.Warn("Replay: not beginning and 1")
  137. continue
  138. }
  139. // NOTE: since the priv key is set when the msgs are received
  140. // it will attempt to eg double sign but we can just ignore it
  141. // since the votes will be replayed and we'll get to the next step
  142. if err := cs.readReplayMessage(msgBytes, nil); err != nil {
  143. return err
  144. }
  145. }
  146. log.Notice("Replay: Done")
  147. return nil
  148. }
  149. //--------------------------------------------------------
  150. // replay messages interactively or all at once
  151. // Interactive playback
  152. func (cs ConsensusState) ReplayConsole(file string) error {
  153. return cs.replay(file, true)
  154. }
  155. // Full playback, with tests
  156. func (cs ConsensusState) ReplayMessages(file string) error {
  157. return cs.replay(file, false)
  158. }
  159. // replay all msgs or start the console
  160. func (cs *ConsensusState) replay(file string, console bool) error {
  161. if cs.IsRunning() {
  162. return errors.New("cs is already running, cannot replay")
  163. }
  164. if cs.wal != nil {
  165. return errors.New("cs wal is open, cannot replay")
  166. }
  167. cs.startForReplay()
  168. // ensure all new step events are regenerated as expected
  169. newStepCh := subscribeToEvent(cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1)
  170. // just open the file for reading, no need to use wal
  171. fp, err := os.OpenFile(file, os.O_RDONLY, 0666)
  172. if err != nil {
  173. return err
  174. }
  175. pb := newPlayback(file, fp, cs, cs.state.Copy())
  176. defer pb.fp.Close()
  177. var nextN int // apply N msgs in a row
  178. for pb.scanner.Scan() {
  179. if nextN == 0 && console {
  180. nextN = pb.replayConsoleLoop()
  181. }
  182. if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil {
  183. return err
  184. }
  185. if nextN > 0 {
  186. nextN -= 1
  187. }
  188. pb.count += 1
  189. }
  190. return nil
  191. }
  192. //------------------------------------------------
  193. // playback manager
  194. type playback struct {
  195. cs *ConsensusState
  196. fp *os.File
  197. scanner *bufio.Scanner
  198. count int // how many lines/msgs into the file are we
  199. // replays can be reset to beginning
  200. fileName string // so we can close/reopen the file
  201. genesisState *sm.State // so the replay session knows where to restart from
  202. }
  203. func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback {
  204. return &playback{
  205. cs: cs,
  206. fp: fp,
  207. fileName: fileName,
  208. genesisState: genState,
  209. scanner: bufio.NewScanner(fp),
  210. }
  211. }
  212. // go back count steps by resetting the state and running (pb.count - count) steps
  213. func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
  214. pb.cs.Stop()
  215. newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool)
  216. newCS.SetEventSwitch(pb.cs.evsw)
  217. newCS.startForReplay()
  218. pb.fp.Close()
  219. fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0666)
  220. if err != nil {
  221. return err
  222. }
  223. pb.fp = fp
  224. pb.scanner = bufio.NewScanner(fp)
  225. count = pb.count - count
  226. log.Notice(Fmt("Reseting from %d to %d", pb.count, count))
  227. pb.count = 0
  228. pb.cs = newCS
  229. for i := 0; pb.scanner.Scan() && i < count; i++ {
  230. if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil {
  231. return err
  232. }
  233. pb.count += 1
  234. }
  235. return nil
  236. }
  237. func (cs *ConsensusState) startForReplay() {
  238. // don't want to start full cs
  239. cs.BaseService.OnStart()
  240. // since we replay tocks we just ignore ticks
  241. go func() {
  242. for {
  243. select {
  244. case <-cs.tickChan:
  245. case <-cs.Quit:
  246. return
  247. }
  248. }
  249. }()
  250. }
  251. // console function for parsing input and running commands
  252. func (pb *playback) replayConsoleLoop() int {
  253. for {
  254. fmt.Printf("> ")
  255. bufReader := bufio.NewReader(os.Stdin)
  256. line, more, err := bufReader.ReadLine()
  257. if more {
  258. Exit("input is too long")
  259. } else if err != nil {
  260. Exit(err.Error())
  261. }
  262. tokens := strings.Split(string(line), " ")
  263. if len(tokens) == 0 {
  264. continue
  265. }
  266. switch tokens[0] {
  267. case "next":
  268. // "next" -> replay next message
  269. // "next N" -> replay next N messages
  270. if len(tokens) == 1 {
  271. return 0
  272. } else {
  273. i, err := strconv.Atoi(tokens[1])
  274. if err != nil {
  275. fmt.Println("next takes an integer argument")
  276. } else {
  277. return i
  278. }
  279. }
  280. case "back":
  281. // "back" -> go back one message
  282. // "back N" -> go back N messages
  283. // NOTE: "back" is not supported in the state machine design,
  284. // so we restart and replay up to
  285. // ensure all new step events are regenerated as expected
  286. newStepCh := subscribeToEvent(pb.cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1)
  287. if len(tokens) == 1 {
  288. pb.replayReset(1, newStepCh)
  289. } else {
  290. i, err := strconv.Atoi(tokens[1])
  291. if err != nil {
  292. fmt.Println("back takes an integer argument")
  293. } else if i > pb.count {
  294. fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count)
  295. } else {
  296. pb.replayReset(i, newStepCh)
  297. }
  298. }
  299. case "rs":
  300. // "rs" -> print entire round state
  301. // "rs short" -> print height/round/step
  302. // "rs <field>" -> print another field of the round state
  303. rs := pb.cs.RoundState
  304. if len(tokens) == 1 {
  305. fmt.Println(rs)
  306. } else {
  307. switch tokens[1] {
  308. case "short":
  309. fmt.Printf("%v/%v/%v\n", rs.Height, rs.Round, rs.Step)
  310. case "validators":
  311. fmt.Println(rs.Validators)
  312. case "proposal":
  313. fmt.Println(rs.Proposal)
  314. case "proposal_block":
  315. fmt.Printf("%v %v\n", rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort())
  316. case "locked_round":
  317. fmt.Println(rs.LockedRound)
  318. case "locked_block":
  319. fmt.Printf("%v %v\n", rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort())
  320. case "votes":
  321. fmt.Println(rs.Votes.StringIndented(" "))
  322. default:
  323. fmt.Println("Unknown option", tokens[1])
  324. }
  325. }
  326. case "n":
  327. fmt.Println(pb.count)
  328. }
  329. }
  330. return 0
  331. }