You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

267 lines
7.0 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package consensus
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "strconv"
  8. "strings"
  9. bc "github.com/tendermint/tendermint/blockchain"
  10. cfg "github.com/tendermint/tendermint/config"
  11. "github.com/tendermint/tendermint/proxy"
  12. sm "github.com/tendermint/tendermint/state"
  13. "github.com/tendermint/tendermint/types"
  14. cmn "github.com/tendermint/tmlibs/common"
  15. dbm "github.com/tendermint/tmlibs/db"
  16. )
  17. //--------------------------------------------------------
  18. // replay messages interactively or all at once
  19. func RunReplayFile(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig, console bool) {
  20. consensusState := newConsensusStateForReplay(config, csConfig)
  21. if err := consensusState.ReplayFile(csConfig.WalFile(), console); err != nil {
  22. cmn.Exit(cmn.Fmt("Error during consensus replay: %v", err))
  23. }
  24. }
  25. // Replay msgs in file or start the console
  26. func (cs *ConsensusState) ReplayFile(file string, console bool) error {
  27. if cs.IsRunning() {
  28. return errors.New("cs is already running, cannot replay")
  29. }
  30. if cs.wal != nil {
  31. return errors.New("cs wal is open, cannot replay")
  32. }
  33. cs.startForReplay()
  34. // ensure all new step events are regenerated as expected
  35. newStepCh := subscribeToEvent(cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1)
  36. // just open the file for reading, no need to use wal
  37. fp, err := os.OpenFile(file, os.O_RDONLY, 0666)
  38. if err != nil {
  39. return err
  40. }
  41. pb := newPlayback(file, fp, cs, cs.state.Copy())
  42. defer pb.fp.Close()
  43. var nextN int // apply N msgs in a row
  44. for pb.scanner.Scan() {
  45. if nextN == 0 && console {
  46. nextN = pb.replayConsoleLoop()
  47. }
  48. if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil {
  49. return err
  50. }
  51. if nextN > 0 {
  52. nextN -= 1
  53. }
  54. pb.count += 1
  55. }
  56. return nil
  57. }
  58. //------------------------------------------------
  59. // playback manager
  60. type playback struct {
  61. cs *ConsensusState
  62. fp *os.File
  63. scanner *bufio.Scanner
  64. count int // how many lines/msgs into the file are we
  65. // replays can be reset to beginning
  66. fileName string // so we can close/reopen the file
  67. genesisState *sm.State // so the replay session knows where to restart from
  68. }
  69. func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback {
  70. return &playback{
  71. cs: cs,
  72. fp: fp,
  73. fileName: fileName,
  74. genesisState: genState,
  75. scanner: bufio.NewScanner(fp),
  76. }
  77. }
  78. // go back count steps by resetting the state and running (pb.count - count) steps
  79. func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
  80. pb.cs.Stop()
  81. pb.cs.Wait()
  82. newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool)
  83. newCS.SetEventSwitch(pb.cs.evsw)
  84. newCS.startForReplay()
  85. pb.fp.Close()
  86. fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0666)
  87. if err != nil {
  88. return err
  89. }
  90. pb.fp = fp
  91. pb.scanner = bufio.NewScanner(fp)
  92. count = pb.count - count
  93. fmt.Printf("Reseting from %d to %d\n", pb.count, count)
  94. pb.count = 0
  95. pb.cs = newCS
  96. for i := 0; pb.scanner.Scan() && i < count; i++ {
  97. if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil {
  98. return err
  99. }
  100. pb.count += 1
  101. }
  102. return nil
  103. }
  104. func (cs *ConsensusState) startForReplay() {
  105. cs.Logger.Error("Replay commands are disabled until someone updates them and writes tests")
  106. /* TODO:!
  107. // since we replay tocks we just ignore ticks
  108. go func() {
  109. for {
  110. select {
  111. case <-cs.tickChan:
  112. case <-cs.Quit:
  113. return
  114. }
  115. }
  116. }()*/
  117. }
  118. // console function for parsing input and running commands
  119. func (pb *playback) replayConsoleLoop() int {
  120. for {
  121. fmt.Printf("> ")
  122. bufReader := bufio.NewReader(os.Stdin)
  123. line, more, err := bufReader.ReadLine()
  124. if more {
  125. cmn.Exit("input is too long")
  126. } else if err != nil {
  127. cmn.Exit(err.Error())
  128. }
  129. tokens := strings.Split(string(line), " ")
  130. if len(tokens) == 0 {
  131. continue
  132. }
  133. switch tokens[0] {
  134. case "next":
  135. // "next" -> replay next message
  136. // "next N" -> replay next N messages
  137. if len(tokens) == 1 {
  138. return 0
  139. } else {
  140. i, err := strconv.Atoi(tokens[1])
  141. if err != nil {
  142. fmt.Println("next takes an integer argument")
  143. } else {
  144. return i
  145. }
  146. }
  147. case "back":
  148. // "back" -> go back one message
  149. // "back N" -> go back N messages
  150. // NOTE: "back" is not supported in the state machine design,
  151. // so we restart and replay up to
  152. // ensure all new step events are regenerated as expected
  153. newStepCh := subscribeToEvent(pb.cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1)
  154. if len(tokens) == 1 {
  155. pb.replayReset(1, newStepCh)
  156. } else {
  157. i, err := strconv.Atoi(tokens[1])
  158. if err != nil {
  159. fmt.Println("back takes an integer argument")
  160. } else if i > pb.count {
  161. fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count)
  162. } else {
  163. pb.replayReset(i, newStepCh)
  164. }
  165. }
  166. case "rs":
  167. // "rs" -> print entire round state
  168. // "rs short" -> print height/round/step
  169. // "rs <field>" -> print another field of the round state
  170. rs := pb.cs.RoundState
  171. if len(tokens) == 1 {
  172. fmt.Println(rs)
  173. } else {
  174. switch tokens[1] {
  175. case "short":
  176. fmt.Printf("%v/%v/%v\n", rs.Height, rs.Round, rs.Step)
  177. case "validators":
  178. fmt.Println(rs.Validators)
  179. case "proposal":
  180. fmt.Println(rs.Proposal)
  181. case "proposal_block":
  182. fmt.Printf("%v %v\n", rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort())
  183. case "locked_round":
  184. fmt.Println(rs.LockedRound)
  185. case "locked_block":
  186. fmt.Printf("%v %v\n", rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort())
  187. case "votes":
  188. fmt.Println(rs.Votes.StringIndented(" "))
  189. default:
  190. fmt.Println("Unknown option", tokens[1])
  191. }
  192. }
  193. case "n":
  194. fmt.Println(pb.count)
  195. }
  196. }
  197. return 0
  198. }
  199. //--------------------------------------------------------------------------------
  200. // convenience for replay mode
  201. func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig) *ConsensusState {
  202. // Get BlockStore
  203. blockStoreDB := dbm.NewDB("blockstore", config.DBBackend, config.DBDir())
  204. blockStore := bc.NewBlockStore(blockStoreDB)
  205. // Get State
  206. stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir())
  207. state, err := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile())
  208. if err != nil {
  209. cmn.Exit(err.Error())
  210. }
  211. // Create proxyAppConn connection (consensus, mempool, query)
  212. clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir())
  213. proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(state, blockStore))
  214. _, err = proxyApp.Start()
  215. if err != nil {
  216. cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err))
  217. }
  218. // Make event switch
  219. eventSwitch := types.NewEventSwitch()
  220. if _, err := eventSwitch.Start(); err != nil {
  221. cmn.Exit(cmn.Fmt("Failed to start event switch: %v", err))
  222. }
  223. consensusState := NewConsensusState(csConfig, state.Copy(), proxyApp.Consensus(), blockStore, types.MockMempool{})
  224. consensusState.SetEventSwitch(eventSwitch)
  225. return consensusState
  226. }