You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

362 lines
9.2 KiB

9 years ago
  1. package consensus
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. "time"
  12. . "github.com/tendermint/go-common"
  13. "github.com/tendermint/go-wire"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. // unmarshal and apply a single message to the consensus state
  18. func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error {
  19. var err error
  20. var msg ConsensusLogMessage
  21. wire.ReadJSON(&msg, msgBytes, &err)
  22. if err != nil {
  23. fmt.Println("MsgBytes:", msgBytes, string(msgBytes))
  24. return fmt.Errorf("Error reading json data: %v", err)
  25. }
  26. // for logging
  27. switch m := msg.Msg.(type) {
  28. case types.EventDataRoundState:
  29. log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step)
  30. // these are playback checks
  31. ticker := time.After(time.Second * 2)
  32. if newStepCh != nil {
  33. select {
  34. case mi := <-newStepCh:
  35. m2 := mi.(types.EventDataRoundState)
  36. if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
  37. return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
  38. }
  39. case <-ticker:
  40. return fmt.Errorf("Failed to read off newStepCh")
  41. }
  42. }
  43. case msgInfo:
  44. peerKey := m.PeerKey
  45. if peerKey == "" {
  46. peerKey = "local"
  47. }
  48. switch msg := m.Msg.(type) {
  49. case *ProposalMessage:
  50. p := msg.Proposal
  51. log.Notice("Proposal", "height", p.Height, "round", p.Round, "header",
  52. p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey)
  53. case *BlockPartMessage:
  54. log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey)
  55. case *VoteMessage:
  56. v := msg.Vote
  57. log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type,
  58. "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey)
  59. }
  60. // internal or from peer
  61. if m.PeerKey == "" {
  62. cs.internalMsgQueue <- m
  63. } else {
  64. cs.peerMsgQueue <- m
  65. }
  66. case timeoutInfo:
  67. log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
  68. cs.tockChan <- m
  69. default:
  70. return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg))
  71. }
  72. return nil
  73. }
  74. // replay only those messages since the last block
  75. func (cs *ConsensusState) catchupReplay(height int) error {
  76. if cs.wal == nil {
  77. log.Warn("consensus msg log is nil")
  78. return nil
  79. }
  80. if !cs.wal.exists {
  81. // new wal, nothing to catchup on
  82. return nil
  83. }
  84. // starting from end of file,
  85. // read messages until a new height is found
  86. nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool {
  87. var err error
  88. var msg ConsensusLogMessage
  89. wire.ReadJSON(&msg, lineBytes, &err)
  90. if err != nil {
  91. panic(Fmt("Failed to read cs_msg_log json: %v", err))
  92. }
  93. m, ok := msg.Msg.(types.EventDataRoundState)
  94. if ok && m.Step == RoundStepNewHeight.String() {
  95. // TODO: ensure the height matches
  96. return true
  97. }
  98. return false
  99. })
  100. if err != nil {
  101. return err
  102. }
  103. var beginning bool // if we had to go back to the beginning
  104. if c, _ := cs.wal.fp.Seek(0, 1); c == 0 {
  105. beginning = true
  106. }
  107. log.Notice("Catchup by replaying consensus messages", "n", nLines)
  108. // now we can replay the latest nLines on consensus state
  109. // note we can't use scan because we've already been reading from the file
  110. reader := bufio.NewReader(cs.wal.fp)
  111. for i := 0; i < nLines; i++ {
  112. msgBytes, err := reader.ReadBytes('\n')
  113. if err == io.EOF {
  114. break
  115. } else if err != nil {
  116. return err
  117. } else if len(msgBytes) == 0 {
  118. continue
  119. } else if len(msgBytes) == 1 && msgBytes[0] == '\n' {
  120. continue
  121. }
  122. // the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it
  123. if !beginning && i == 1 {
  124. continue
  125. }
  126. // NOTE: since the priv key is set when the msgs are received
  127. // it will attempt to eg double sign but we can just ignore it
  128. // since the votes will be replayed and we'll get to the next step
  129. if err := cs.readReplayMessage(msgBytes, nil); err != nil {
  130. return err
  131. }
  132. }
  133. log.Info("Done catchup replay")
  134. return nil
  135. }
  136. //--------------------------------------------------------
  137. // replay messages interactively or all at once
  138. // Interactive playback
  139. func (cs ConsensusState) ReplayConsole(file string) error {
  140. return cs.replay(file, true)
  141. }
  142. // Full playback, with tests
  143. func (cs ConsensusState) ReplayMessages(file string) error {
  144. return cs.replay(file, false)
  145. }
  146. // replay all msgs or start the console
  147. func (cs *ConsensusState) replay(file string, console bool) error {
  148. if cs.IsRunning() {
  149. return errors.New("cs is already running, cannot replay")
  150. }
  151. if cs.wal != nil {
  152. return errors.New("cs wal is open, cannot replay")
  153. }
  154. cs.startForReplay()
  155. // ensure all new step events are regenerated as expected
  156. newStepCh := subscribeToEvent(cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1)
  157. // just open the file for reading, no need to use wal
  158. fp, err := os.OpenFile(file, os.O_RDONLY, 0666)
  159. if err != nil {
  160. return err
  161. }
  162. pb := newPlayback(file, fp, cs, cs.state.Copy())
  163. defer pb.fp.Close()
  164. var nextN int // apply N msgs in a row
  165. for pb.scanner.Scan() {
  166. if nextN == 0 && console {
  167. nextN = pb.replayConsoleLoop()
  168. }
  169. if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil {
  170. return err
  171. }
  172. if nextN > 0 {
  173. nextN -= 1
  174. }
  175. pb.count += 1
  176. }
  177. return nil
  178. }
  179. //------------------------------------------------
  180. // playback manager
  181. type playback struct {
  182. cs *ConsensusState
  183. fp *os.File
  184. scanner *bufio.Scanner
  185. count int // how many lines/msgs into the file are we
  186. // replays can be reset to beginning
  187. fileName string // so we can close/reopen the file
  188. genesisState *sm.State // so the replay session knows where to restart from
  189. }
  190. func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback {
  191. return &playback{
  192. cs: cs,
  193. fp: fp,
  194. fileName: fileName,
  195. genesisState: genState,
  196. scanner: bufio.NewScanner(fp),
  197. }
  198. }
  199. // go back count steps by resetting the state and running (pb.count - count) steps
  200. func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
  201. pb.cs.Stop()
  202. newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool)
  203. newCs.SetEventSwitch(pb.cs.evsw)
  204. newCs.startForReplay()
  205. pb.fp.Close()
  206. fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0666)
  207. if err != nil {
  208. return err
  209. }
  210. pb.fp = fp
  211. pb.scanner = bufio.NewScanner(fp)
  212. count = pb.count - count
  213. log.Notice(Fmt("Reseting from %d to %d", pb.count, count))
  214. pb.count = 0
  215. pb.cs = newCs
  216. for i := 0; pb.scanner.Scan() && i < count; i++ {
  217. if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil {
  218. return err
  219. }
  220. pb.count += 1
  221. }
  222. return nil
  223. }
  224. func (cs *ConsensusState) startForReplay() {
  225. cs.BaseService.OnStart()
  226. go cs.receiveRoutine(0)
  227. // since we replay tocks we just ignore ticks
  228. go func() {
  229. for {
  230. select {
  231. case <-cs.tickChan:
  232. case <-cs.Quit:
  233. return
  234. }
  235. }
  236. }()
  237. }
  238. // console function for parsing input and running commands
  239. func (pb *playback) replayConsoleLoop() int {
  240. for {
  241. fmt.Printf("> ")
  242. bufReader := bufio.NewReader(os.Stdin)
  243. line, more, err := bufReader.ReadLine()
  244. if more {
  245. Exit("input is too long")
  246. } else if err != nil {
  247. Exit(err.Error())
  248. }
  249. tokens := strings.Split(string(line), " ")
  250. if len(tokens) == 0 {
  251. continue
  252. }
  253. switch tokens[0] {
  254. case "next":
  255. // "next" -> replay next message
  256. // "next N" -> replay next N messages
  257. if len(tokens) == 1 {
  258. return 0
  259. } else {
  260. i, err := strconv.Atoi(tokens[1])
  261. if err != nil {
  262. fmt.Println("next takes an integer argument")
  263. } else {
  264. return i
  265. }
  266. }
  267. case "back":
  268. // "back" -> go back one message
  269. // "back N" -> go back N messages
  270. // NOTE: "back" is not supported in the state machine design,
  271. // so we restart and replay up to
  272. // ensure all new step events are regenerated as expected
  273. newStepCh := subscribeToEvent(pb.cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1)
  274. if len(tokens) == 1 {
  275. pb.replayReset(1, newStepCh)
  276. } else {
  277. i, err := strconv.Atoi(tokens[1])
  278. if err != nil {
  279. fmt.Println("back takes an integer argument")
  280. } else if i > pb.count {
  281. fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count)
  282. } else {
  283. pb.replayReset(i, newStepCh)
  284. }
  285. }
  286. case "rs":
  287. // "rs" -> print entire round state
  288. // "rs short" -> print height/round/step
  289. // "rs <field>" -> print another field of the round state
  290. rs := pb.cs.RoundState
  291. if len(tokens) == 1 {
  292. fmt.Println(rs)
  293. } else {
  294. switch tokens[1] {
  295. case "short":
  296. fmt.Printf("%v/%v/%v\n", rs.Height, rs.Round, rs.Step)
  297. case "validators":
  298. fmt.Println(rs.Validators)
  299. case "proposal":
  300. fmt.Println(rs.Proposal)
  301. case "proposal_block":
  302. fmt.Printf("%v %v\n", rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort())
  303. case "locked_round":
  304. fmt.Println(rs.LockedRound)
  305. case "locked_block":
  306. fmt.Printf("%v %v\n", rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort())
  307. case "votes":
  308. fmt.Println(rs.Votes.StringIndented(" "))
  309. default:
  310. fmt.Println("Unknown option", tokens[1])
  311. }
  312. }
  313. case "n":
  314. fmt.Println(pb.count)
  315. }
  316. }
  317. return 0
  318. }