You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

338 lines
8.7 KiB

  1. package consensus
  2. import (
  3. "bufio"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strconv"
  10. "strings"
  11. dbm "github.com/tendermint/tm-db"
  12. cfg "github.com/tendermint/tendermint/config"
  13. "github.com/tendermint/tendermint/libs/log"
  14. tmos "github.com/tendermint/tendermint/libs/os"
  15. "github.com/tendermint/tendermint/proxy"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/store"
  18. "github.com/tendermint/tendermint/types"
  19. )
  20. const (
  21. // event bus subscriber
  22. subscriber = "replay-file"
  23. )
  24. //--------------------------------------------------------
  25. // replay messages interactively or all at once
  26. // replay the wal file
  27. func RunReplayFile(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig, console bool) {
  28. consensusState := newConsensusStateForReplay(config, csConfig)
  29. if err := consensusState.ReplayFile(csConfig.WalFile(), console); err != nil {
  30. tmos.Exit(fmt.Sprintf("Error during consensus replay: %v", err))
  31. }
  32. }
  33. // Replay msgs in file or start the console
  34. func (cs *State) ReplayFile(file string, console bool) error {
  35. if cs.IsRunning() {
  36. return errors.New("cs is already running, cannot replay")
  37. }
  38. if cs.wal != nil {
  39. return errors.New("cs wal is open, cannot replay")
  40. }
  41. cs.startForReplay()
  42. // ensure all new step events are regenerated as expected
  43. ctx := context.Background()
  44. newStepSub, err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
  45. if err != nil {
  46. return fmt.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)
  47. }
  48. defer func() {
  49. if err := cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep); err != nil {
  50. cs.Logger.Error("Error unsubscribing to event bus", "err", err)
  51. }
  52. }()
  53. // just open the file for reading, no need to use wal
  54. fp, err := os.OpenFile(file, os.O_RDONLY, 0600)
  55. if err != nil {
  56. return err
  57. }
  58. pb := newPlayback(file, fp, cs, cs.state.Copy())
  59. defer pb.fp.Close()
  60. var nextN int // apply N msgs in a row
  61. var msg *TimedWALMessage
  62. for {
  63. if nextN == 0 && console {
  64. nextN = pb.replayConsoleLoop()
  65. }
  66. msg, err = pb.dec.Decode()
  67. if err == io.EOF {
  68. return nil
  69. } else if err != nil {
  70. return err
  71. }
  72. if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
  73. return err
  74. }
  75. if nextN > 0 {
  76. nextN--
  77. }
  78. pb.count++
  79. }
  80. }
  81. //------------------------------------------------
  82. // playback manager
  83. type playback struct {
  84. cs *State
  85. fp *os.File
  86. dec *WALDecoder
  87. count int // how many lines/msgs into the file are we
  88. // replays can be reset to beginning
  89. fileName string // so we can close/reopen the file
  90. genesisState sm.State // so the replay session knows where to restart from
  91. }
  92. func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *playback {
  93. return &playback{
  94. cs: cs,
  95. fp: fp,
  96. fileName: fileName,
  97. genesisState: genState,
  98. dec: NewWALDecoder(fp),
  99. }
  100. }
  101. // go back count steps by resetting the state and running (pb.count - count) steps
  102. func (pb *playback) replayReset(count int, newStepSub types.Subscription) error {
  103. if err := pb.cs.Stop(); err != nil {
  104. return err
  105. }
  106. pb.cs.Wait()
  107. newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
  108. pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, map[int64]Misbehavior{})
  109. newCS.SetEventBus(pb.cs.eventBus)
  110. newCS.startForReplay()
  111. if err := pb.fp.Close(); err != nil {
  112. return err
  113. }
  114. fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0600)
  115. if err != nil {
  116. return err
  117. }
  118. pb.fp = fp
  119. pb.dec = NewWALDecoder(fp)
  120. count = pb.count - count
  121. fmt.Printf("Reseting from %d to %d\n", pb.count, count)
  122. pb.count = 0
  123. pb.cs = newCS
  124. var msg *TimedWALMessage
  125. for i := 0; i < count; i++ {
  126. msg, err = pb.dec.Decode()
  127. if err == io.EOF {
  128. return nil
  129. } else if err != nil {
  130. return err
  131. }
  132. if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
  133. return err
  134. }
  135. pb.count++
  136. }
  137. return nil
  138. }
  139. func (cs *State) startForReplay() {
  140. cs.Logger.Error("Replay commands are disabled until someone updates them and writes tests")
  141. /* TODO:!
  142. // since we replay tocks we just ignore ticks
  143. go func() {
  144. for {
  145. select {
  146. case <-cs.tickChan:
  147. case <-cs.Quit:
  148. return
  149. }
  150. }
  151. }()*/
  152. }
  153. // console function for parsing input and running commands
  154. func (pb *playback) replayConsoleLoop() int {
  155. for {
  156. fmt.Printf("> ")
  157. bufReader := bufio.NewReader(os.Stdin)
  158. line, more, err := bufReader.ReadLine()
  159. if more {
  160. tmos.Exit("input is too long")
  161. } else if err != nil {
  162. tmos.Exit(err.Error())
  163. }
  164. tokens := strings.Split(string(line), " ")
  165. if len(tokens) == 0 {
  166. continue
  167. }
  168. switch tokens[0] {
  169. case "next":
  170. // "next" -> replay next message
  171. // "next N" -> replay next N messages
  172. if len(tokens) == 1 {
  173. return 0
  174. }
  175. i, err := strconv.Atoi(tokens[1])
  176. if err != nil {
  177. fmt.Println("next takes an integer argument")
  178. } else {
  179. return i
  180. }
  181. case "back":
  182. // "back" -> go back one message
  183. // "back N" -> go back N messages
  184. // NOTE: "back" is not supported in the state machine design,
  185. // so we restart and replay up to
  186. ctx := context.Background()
  187. // ensure all new step events are regenerated as expected
  188. newStepSub, err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
  189. if err != nil {
  190. tmos.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep))
  191. }
  192. defer func() {
  193. if err := pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep); err != nil {
  194. pb.cs.Logger.Error("Error unsubscribing from eventBus", "err", err)
  195. }
  196. }()
  197. if len(tokens) == 1 {
  198. if err := pb.replayReset(1, newStepSub); err != nil {
  199. pb.cs.Logger.Error("Replay reset error", "err", err)
  200. }
  201. } else {
  202. i, err := strconv.Atoi(tokens[1])
  203. if err != nil {
  204. fmt.Println("back takes an integer argument")
  205. } else if i > pb.count {
  206. fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count)
  207. } else if err := pb.replayReset(i, newStepSub); err != nil {
  208. pb.cs.Logger.Error("Replay reset error", "err", err)
  209. }
  210. }
  211. case "rs":
  212. // "rs" -> print entire round state
  213. // "rs short" -> print height/round/step
  214. // "rs <field>" -> print another field of the round state
  215. rs := pb.cs.RoundState
  216. if len(tokens) == 1 {
  217. fmt.Println(rs)
  218. } else {
  219. switch tokens[1] {
  220. case "short":
  221. fmt.Printf("%v/%v/%v\n", rs.Height, rs.Round, rs.Step)
  222. case "validators":
  223. fmt.Println(rs.Validators)
  224. case "proposal":
  225. fmt.Println(rs.Proposal)
  226. case "proposal_block":
  227. fmt.Printf("%v %v\n", rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort())
  228. case "locked_round":
  229. fmt.Println(rs.LockedRound)
  230. case "locked_block":
  231. fmt.Printf("%v %v\n", rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort())
  232. case "votes":
  233. fmt.Println(rs.Votes.StringIndented(" "))
  234. default:
  235. fmt.Println("Unknown option", tokens[1])
  236. }
  237. }
  238. case "n":
  239. fmt.Println(pb.count)
  240. }
  241. }
  242. }
  243. //--------------------------------------------------------------------------------
  244. // convenience for replay mode
  245. func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig) *State {
  246. dbType := dbm.BackendType(config.DBBackend)
  247. // Get BlockStore
  248. blockStoreDB, err := dbm.NewDB("blockstore", dbType, config.DBDir())
  249. if err != nil {
  250. tmos.Exit(err.Error())
  251. }
  252. blockStore := store.NewBlockStore(blockStoreDB)
  253. // Get State
  254. stateDB, err := dbm.NewDB("state", dbType, config.DBDir())
  255. if err != nil {
  256. tmos.Exit(err.Error())
  257. }
  258. stateStore := sm.NewStore(stateDB)
  259. gdoc, err := sm.MakeGenesisDocFromFile(config.GenesisFile())
  260. if err != nil {
  261. tmos.Exit(err.Error())
  262. }
  263. state, err := sm.MakeGenesisState(gdoc)
  264. if err != nil {
  265. tmos.Exit(err.Error())
  266. }
  267. // Create proxyAppConn connection (consensus, mempool, query)
  268. clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir())
  269. proxyApp := proxy.NewAppConns(clientCreator)
  270. err = proxyApp.Start()
  271. if err != nil {
  272. tmos.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err))
  273. }
  274. eventBus := types.NewEventBus()
  275. if err := eventBus.Start(); err != nil {
  276. tmos.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
  277. }
  278. handshaker := NewHandshaker(stateStore, state, blockStore, gdoc)
  279. handshaker.SetEventBus(eventBus)
  280. err = handshaker.Handshake(proxyApp)
  281. if err != nil {
  282. tmos.Exit(fmt.Sprintf("Error on handshake: %v", err))
  283. }
  284. mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
  285. blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
  286. consensusState := NewState(csConfig, state.Copy(), blockExec,
  287. blockStore, mempool, evpool, map[int64]Misbehavior{})
  288. consensusState.SetEventBus(eventBus)
  289. return consensusState
  290. }