You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

339 lines
8.8 KiB

  1. package consensus
  2. import (
  3. "bufio"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strconv"
  10. "strings"
  11. dbm "github.com/tendermint/tm-db"
  12. cfg "github.com/tendermint/tendermint/config"
  13. tmcon "github.com/tendermint/tendermint/consensus"
  14. "github.com/tendermint/tendermint/libs/log"
  15. tmos "github.com/tendermint/tendermint/libs/os"
  16. "github.com/tendermint/tendermint/proxy"
  17. sm "github.com/tendermint/tendermint/state"
  18. "github.com/tendermint/tendermint/store"
  19. "github.com/tendermint/tendermint/types"
  20. )
  21. const (
  22. // event bus subscriber
  23. subscriber = "replay-file"
  24. )
  25. //--------------------------------------------------------
  26. // replay messages interactively or all at once
  27. // replay the wal file
  28. func RunReplayFile(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig, console bool) {
  29. consensusState := newConsensusStateForReplay(config, csConfig)
  30. if err := consensusState.ReplayFile(csConfig.WalFile(), console); err != nil {
  31. tmos.Exit(fmt.Sprintf("Error during consensus replay: %v", err))
  32. }
  33. }
  34. // Replay msgs in file or start the console
  35. func (cs *State) ReplayFile(file string, console bool) error {
  36. if cs.IsRunning() {
  37. return errors.New("cs is already running, cannot replay")
  38. }
  39. if cs.wal != nil {
  40. return errors.New("cs wal is open, cannot replay")
  41. }
  42. cs.startForReplay()
  43. // ensure all new step events are regenerated as expected
  44. ctx := context.Background()
  45. newStepSub, err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
  46. if err != nil {
  47. return fmt.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)
  48. }
  49. defer func() {
  50. if err := cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep); err != nil {
  51. cs.Logger.Error("Error unsubscribing to event bus", "err", err)
  52. }
  53. }()
  54. // just open the file for reading, no need to use wal
  55. fp, err := os.OpenFile(file, os.O_RDONLY, 0600)
  56. if err != nil {
  57. return err
  58. }
  59. pb := newPlayback(file, fp, cs, cs.state.Copy())
  60. defer pb.fp.Close()
  61. var nextN int // apply N msgs in a row
  62. var msg *tmcon.TimedWALMessage
  63. for {
  64. if nextN == 0 && console {
  65. nextN = pb.replayConsoleLoop()
  66. }
  67. msg, err = pb.dec.Decode()
  68. if err == io.EOF {
  69. return nil
  70. } else if err != nil {
  71. return err
  72. }
  73. if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
  74. return err
  75. }
  76. if nextN > 0 {
  77. nextN--
  78. }
  79. pb.count++
  80. }
  81. }
  82. //------------------------------------------------
  83. // playback manager
  84. type playback struct {
  85. cs *State
  86. fp *os.File
  87. dec *WALDecoder
  88. count int // how many lines/msgs into the file are we
  89. // replays can be reset to beginning
  90. fileName string // so we can close/reopen the file
  91. genesisState sm.State // so the replay session knows where to restart from
  92. }
  93. func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *playback {
  94. return &playback{
  95. cs: cs,
  96. fp: fp,
  97. fileName: fileName,
  98. genesisState: genState,
  99. dec: NewWALDecoder(fp),
  100. }
  101. }
  102. // go back count steps by resetting the state and running (pb.count - count) steps
  103. func (pb *playback) replayReset(count int, newStepSub types.Subscription) error {
  104. if err := pb.cs.Stop(); err != nil {
  105. return err
  106. }
  107. pb.cs.Wait()
  108. newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
  109. pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, map[int64]Misbehavior{})
  110. newCS.SetEventBus(pb.cs.eventBus)
  111. newCS.startForReplay()
  112. if err := pb.fp.Close(); err != nil {
  113. return err
  114. }
  115. fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0600)
  116. if err != nil {
  117. return err
  118. }
  119. pb.fp = fp
  120. pb.dec = NewWALDecoder(fp)
  121. count = pb.count - count
  122. fmt.Printf("Reseting from %d to %d\n", pb.count, count)
  123. pb.count = 0
  124. pb.cs = newCS
  125. var msg *tmcon.TimedWALMessage
  126. for i := 0; i < count; i++ {
  127. msg, err = pb.dec.Decode()
  128. if err == io.EOF {
  129. return nil
  130. } else if err != nil {
  131. return err
  132. }
  133. if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
  134. return err
  135. }
  136. pb.count++
  137. }
  138. return nil
  139. }
  140. func (cs *State) startForReplay() {
  141. cs.Logger.Error("Replay commands are disabled until someone updates them and writes tests")
  142. /* TODO:!
  143. // since we replay tocks we just ignore ticks
  144. go func() {
  145. for {
  146. select {
  147. case <-cs.tickChan:
  148. case <-cs.Quit:
  149. return
  150. }
  151. }
  152. }()*/
  153. }
  154. // console function for parsing input and running commands
  155. func (pb *playback) replayConsoleLoop() int {
  156. for {
  157. fmt.Printf("> ")
  158. bufReader := bufio.NewReader(os.Stdin)
  159. line, more, err := bufReader.ReadLine()
  160. if more {
  161. tmos.Exit("input is too long")
  162. } else if err != nil {
  163. tmos.Exit(err.Error())
  164. }
  165. tokens := strings.Split(string(line), " ")
  166. if len(tokens) == 0 {
  167. continue
  168. }
  169. switch tokens[0] {
  170. case "next":
  171. // "next" -> replay next message
  172. // "next N" -> replay next N messages
  173. if len(tokens) == 1 {
  174. return 0
  175. }
  176. i, err := strconv.Atoi(tokens[1])
  177. if err != nil {
  178. fmt.Println("next takes an integer argument")
  179. } else {
  180. return i
  181. }
  182. case "back":
  183. // "back" -> go back one message
  184. // "back N" -> go back N messages
  185. // NOTE: "back" is not supported in the state machine design,
  186. // so we restart and replay up to
  187. ctx := context.Background()
  188. // ensure all new step events are regenerated as expected
  189. newStepSub, err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
  190. if err != nil {
  191. tmos.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep))
  192. }
  193. defer func() {
  194. if err := pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep); err != nil {
  195. pb.cs.Logger.Error("Error unsubscribing from eventBus", "err", err)
  196. }
  197. }()
  198. if len(tokens) == 1 {
  199. if err := pb.replayReset(1, newStepSub); err != nil {
  200. pb.cs.Logger.Error("Replay reset error", "err", err)
  201. }
  202. } else {
  203. i, err := strconv.Atoi(tokens[1])
  204. if err != nil {
  205. fmt.Println("back takes an integer argument")
  206. } else if i > pb.count {
  207. fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count)
  208. } else if err := pb.replayReset(i, newStepSub); err != nil {
  209. pb.cs.Logger.Error("Replay reset error", "err", err)
  210. }
  211. }
  212. case "rs":
  213. // "rs" -> print entire round state
  214. // "rs short" -> print height/round/step
  215. // "rs <field>" -> print another field of the round state
  216. rs := pb.cs.RoundState
  217. if len(tokens) == 1 {
  218. fmt.Println(rs)
  219. } else {
  220. switch tokens[1] {
  221. case "short":
  222. fmt.Printf("%v/%v/%v\n", rs.Height, rs.Round, rs.Step)
  223. case "validators":
  224. fmt.Println(rs.Validators)
  225. case "proposal":
  226. fmt.Println(rs.Proposal)
  227. case "proposal_block":
  228. fmt.Printf("%v %v\n", rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort())
  229. case "locked_round":
  230. fmt.Println(rs.LockedRound)
  231. case "locked_block":
  232. fmt.Printf("%v %v\n", rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort())
  233. case "votes":
  234. fmt.Println(rs.Votes.StringIndented(" "))
  235. default:
  236. fmt.Println("Unknown option", tokens[1])
  237. }
  238. }
  239. case "n":
  240. fmt.Println(pb.count)
  241. }
  242. }
  243. }
  244. //--------------------------------------------------------------------------------
  245. // convenience for replay mode
  246. func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig) *State {
  247. dbType := dbm.BackendType(config.DBBackend)
  248. // Get BlockStore
  249. blockStoreDB, err := dbm.NewDB("blockstore", dbType, config.DBDir())
  250. if err != nil {
  251. tmos.Exit(err.Error())
  252. }
  253. blockStore := store.NewBlockStore(blockStoreDB)
  254. // Get State
  255. stateDB, err := dbm.NewDB("state", dbType, config.DBDir())
  256. if err != nil {
  257. tmos.Exit(err.Error())
  258. }
  259. stateStore := sm.NewStore(stateDB)
  260. gdoc, err := sm.MakeGenesisDocFromFile(config.GenesisFile())
  261. if err != nil {
  262. tmos.Exit(err.Error())
  263. }
  264. state, err := sm.MakeGenesisState(gdoc)
  265. if err != nil {
  266. tmos.Exit(err.Error())
  267. }
  268. // Create proxyAppConn connection (consensus, mempool, query)
  269. clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir())
  270. proxyApp := proxy.NewAppConns(clientCreator)
  271. err = proxyApp.Start()
  272. if err != nil {
  273. tmos.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err))
  274. }
  275. eventBus := types.NewEventBus()
  276. if err := eventBus.Start(); err != nil {
  277. tmos.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
  278. }
  279. handshaker := NewHandshaker(stateStore, state, blockStore, gdoc)
  280. handshaker.SetEventBus(eventBus)
  281. err = handshaker.Handshake(proxyApp)
  282. if err != nil {
  283. tmos.Exit(fmt.Sprintf("Error on handshake: %v", err))
  284. }
  285. mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
  286. blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
  287. consensusState := NewState(csConfig, state.Copy(), blockExec,
  288. blockStore, mempool, evpool, map[int64]Misbehavior{})
  289. consensusState.SetEventBus(eventBus)
  290. return consensusState
  291. }