From 2b13386d7b9fa8d19eaeaaafabad673ded3f6335 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 23 Dec 2015 01:27:40 -0500 Subject: [PATCH] consensus: replay console --- cmd/tendermint/main.go | 6 +- consensus/replay.go | 287 +++++++++++++++++++++++++++++++++++++++++ consensus/state.go | 88 ------------- node/node.go | 32 ++++- 4 files changed, 317 insertions(+), 96 deletions(-) create mode 100644 consensus/replay.go diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index b9c4538b1..e3847e0fb 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -35,7 +35,11 @@ Commands: case "node": node.RunNode() case "replay": - node.RunReplay() + if len(args) > 1 && args[1] == "console" { + node.RunReplayConsole() + } else { + node.RunReplay() + } case "init": init_files() case "show_validator": diff --git a/consensus/replay.go b/consensus/replay.go new file mode 100644 index 000000000..3abe0bf48 --- /dev/null +++ b/consensus/replay.go @@ -0,0 +1,287 @@ +package consensus + +import ( + "bufio" + "errors" + "fmt" + "os" + "reflect" + "strconv" + "strings" + "time" + + . "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/go-common" + "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/go-wire" + + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" +) + +//-------------------------------------------------------- +// types and functions for savings consensus messages + +type ConsensusLogMessage struct { + Msg ConsensusLogMessageInterface `json:"msg"` +} + +type ConsensusLogMessageInterface interface{} + +var _ = wire.RegisterInterface( + struct{ ConsensusLogMessageInterface }{}, + wire.ConcreteType{&types.EventDataRoundState{}, 0x01}, + wire.ConcreteType{msgInfo{}, 0x02}, + wire.ConcreteType{timeoutInfo{}, 0x03}, +) + +// called in newStep and for each pass in receiveRoutine +func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { + if cs.msgLogFP != nil { + var n int + var err error + wire.WriteJSON(ConsensusLogMessage{msg}, cs.msgLogFP, &n, &err) + wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line + if err != nil { + log.Error("Error writing to consensus message log file", "err", err, "msg", msg) + } + } +} + +// Open file to log all consensus messages and timeouts for deterministic accountability +func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + return err +} + +//-------------------------------------------------------- +// replay messages + +// Interactive playback +func (cs ConsensusState) ReplayConsole(file string) error { + return cs.replay(file, true) +} + +// Full playback, with tests +func (cs ConsensusState) ReplayMessages(file string) error { + return cs.replay(file, false) +} + +type playback struct { + cs *ConsensusState + file string + fp *os.File + scanner *bufio.Scanner + newStepCh chan interface{} + genesisState *sm.State + count int +} + +func newPlayback(file string, fp *os.File, cs *ConsensusState, ch chan interface{}, genState *sm.State) *playback { + return &playback{ + cs: cs, + file: file, + newStepCh: ch, + genesisState: genState, + fp: fp, + scanner: bufio.NewScanner(fp), + } +} + +func (pb *playback) replayReset(count int) error { + + pb.cs.Stop() + + newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppCtx, pb.cs.blockStore, pb.cs.mempool) + newCs.SetEventSwitch(pb.cs.evsw) + + // we ensure all new step events are regenerated as expected + pb.newStepCh = newCs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) + + newCs.BaseService.OnStart() + newCs.startRoutines(0) + + pb.fp.Close() + fp, err := os.OpenFile(pb.file, os.O_RDONLY, 0666) + if err != nil { + return err + } + pb.fp = fp + pb.scanner = bufio.NewScanner(fp) + count = pb.count - count + log.Notice(Fmt("Reseting from %d to %d", pb.count, count)) + pb.count = 0 + pb.cs = newCs + for i := 0; pb.scanner.Scan() && i < count; i++ { + if err := pb.readReplayMessage(); err != nil { + return err + } + pb.count += 1 + } + return nil +} + +func (cs *ConsensusState) replay(file string, console bool) error { + if cs.IsRunning() { + return errors.New("cs is already running, cannot replay") + } + + cs.BaseService.OnStart() + cs.startRoutines(0) + + if cs.msgLogFP != nil { + cs.msgLogFP.Close() + cs.msgLogFP = nil + } + + // we ensure all new step events are regenerated as expected + newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) + + fp, err := os.OpenFile(file, os.O_RDONLY, 0666) + if err != nil { + return err + } + + pb := newPlayback(file, fp, cs, newStepCh, cs.state.Copy()) + + defer pb.fp.Close() + + var nextN int // apply N msgs in a row + for pb.scanner.Scan() { + if nextN == 0 && console { + nextN = pb.replayConsoleLoop() + } + + if err := pb.readReplayMessage(); err != nil { + return err + } + + if nextN > 0 { + nextN -= 1 + } + pb.count += 1 + } + return nil +} + +func (pb *playback) replayConsoleLoop() int { + for { + fmt.Printf("> ") + bufReader := bufio.NewReader(os.Stdin) + line, more, err := bufReader.ReadLine() + if more { + Exit("input is too long") + } else if err != nil { + Exit(err.Error()) + } + + tokens := strings.Split(string(line), " ") + if len(tokens) == 0 { + continue + } + + switch tokens[0] { + case "next": + // "next" -> replay next message + // "next N" -> replay next N messages + + if len(tokens) == 1 { + return 0 + } else { + i, err := strconv.Atoi(tokens[1]) + if err != nil { + fmt.Println("next takes an integer argument") + } else { + return i + } + } + + case "back": + // "back" -> go back one message + // "back N" -> go back N messages + + // NOTE: "back" is not supported in the state machine design, + // so we restart to do this (expensive ...) + + if len(tokens) == 1 { + pb.replayReset(1) + } else { + i, err := strconv.Atoi(tokens[1]) + if err != nil { + fmt.Println("back takes an integer argument") + } else if i > pb.count { + fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count) + } else { + pb.replayReset(i) + } + } + + case "rs": + // "rs" -> print entire round state + // "rs short" -> print height/round/step + // "rs " -> print another field of the round state + + rs := pb.cs.RoundState + if len(tokens) == 1 { + fmt.Println(rs) + } else { + switch tokens[1] { + case "short": + fmt.Printf("%v/%v/%v\n", rs.Height, rs.Round, rs.Step) + case "validators": + fmt.Println(rs.Validators) + case "proposal": + fmt.Println(rs.Proposal) + case "proposal_block": + fmt.Printf("%v %v\n", rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort()) + case "locked_round": + fmt.Println(rs.LockedRound) + case "locked_block": + fmt.Printf("%v %v\n", rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort()) + case "votes": + fmt.Println(rs.Votes.StringIndented(" ")) + + default: + fmt.Println("Unknown option", tokens[1]) + } + } + } + } + return 0 +} + +func (pb *playback) readReplayMessage() error { + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, pb.scanner.Bytes(), &err) + if err != nil { + return fmt.Errorf("Error reading json data: %v", err) + } + log.Notice("Replaying message", "type", reflect.TypeOf(msg.Msg), "msg", msg.Msg) + switch m := msg.Msg.(type) { + case *types.EventDataRoundState: + // these are playback checks + ticker := time.After(time.Second * 2) + select { + case mi := <-pb.newStepCh: + m2 := mi.(*types.EventDataRoundState) + if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { + return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) + } + case <-ticker: + return fmt.Errorf("Failed to read off newStepCh") + } + case msgInfo: + // internal or from peer + if m.PeerKey == "" { + pb.cs.internalMsgQueue <- m + } else { + pb.cs.peerMsgQueue <- m + } + case timeoutInfo: + pb.cs.tockChan <- m + default: + return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + } + return nil +} diff --git a/consensus/state.go b/consensus/state.go index 9d9e65d87..5217a2311 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1,7 +1,6 @@ package consensus import ( - "bufio" "bytes" "errors" "fmt" @@ -277,68 +276,6 @@ func (cs *ConsensusState) OnStop() { } } -func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) - return err -} - -// Playback with tests -func (cs ConsensusState) ReplayMessagesFromFile(file string) error { - if cs.IsRunning() { - return errors.New("cs is already running, cannot replay") - } - - cs.BaseService.OnStart() - cs.startRoutines(0) - - if cs.msgLogFP != nil { - cs.msgLogFP.Close() - cs.msgLogFP = nil - } - - // we ensure all new step events are regenerated as expected - newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) - - fp, err := os.OpenFile(file, os.O_RDONLY, 0666) - if err != nil { - return err - } - defer fp.Close() - scanner := bufio.NewScanner(fp) - for scanner.Scan() { - var err error - var msg ConsensusLogMessage - wire.ReadJSON(&msg, scanner.Bytes(), &err) - if err != nil { - return fmt.Errorf("Error reading json data: %v", err) - } - log.Notice("Replaying message", "type", reflect.TypeOf(msg.Msg), "msg", msg.Msg) - switch m := msg.Msg.(type) { - case *types.EventDataRoundState: - // these are playback checks - mi := <-newStepCh - m2 := mi.(*types.EventDataRoundState) - if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { - return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) - } - case msgInfo: - // internal or from peer - if m.PeerKey == "" { - cs.internalMsgQueue <- m - } else { - cs.peerMsgQueue <- m - } - case timeoutInfo: - cs.tockChan <- m - default: - return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) - } - } - return nil -} - //------------------------------------------------------------ // Public interface for passing messages into the consensus state, // possibly causing a state transition @@ -1523,31 +1460,6 @@ func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSe } -func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { - if cs.msgLogFP != nil { - var n int - var err error - wire.WriteJSON(ConsensusLogMessage{msg}, cs.msgLogFP, &n, &err) - wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line - if err != nil { - log.Error("Error writing to consensus message log file", "err", err, "msg", msg) - } - } -} - -type ConsensusLogMessage struct { - Msg ConsensusLogMessageInterface `json:"msg"` -} - -type ConsensusLogMessageInterface interface{} - -var _ = wire.RegisterInterface( - struct{ ConsensusLogMessageInterface }{}, - wire.ConcreteType{&types.EventDataRoundState{}, 0x01}, - wire.ConcreteType{msgInfo{}, 0x02}, - wire.ConcreteType{timeoutInfo{}, 0x03}, -) - //--------------------------------------------------------- func CompareHRS(h1, r1 int, s1 RoundStepType, h2, r2 int, s2 RoundStepType) int { diff --git a/node/node.go b/node/node.go index a5f2fbda7..1208a4b53 100644 --- a/node/node.go +++ b/node/node.go @@ -311,12 +311,7 @@ func RunNode() { }) } -func RunReplay() { - msgLogFile := config.GetString("cs_msg_log") - if msgLogFile == "" { - Exit("cs_msg_log file name not set in tendermint config") - } - +func newConsensusState() *consensus.ConsensusState { // Get BlockStore blockStoreDB := dbm.GetDB("blockstore") blockStore := bc.NewBlockStore(blockStoreDB) @@ -345,8 +340,31 @@ func RunReplay() { consensusState := consensus.NewConsensusState(state.Copy(), proxyAppCtxConsensus, blockStore, mempool) consensusState.SetEventSwitch(eventSwitch) + return consensusState +} + +func RunReplayConsole() { + msgLogFile := config.GetString("cs_msg_log") + if msgLogFile == "" { + Exit("cs_msg_log file name not set in tendermint config") + } + + consensusState := newConsensusState() + + if err := consensusState.ReplayConsole(msgLogFile); err != nil { + Exit(Fmt("Error during consensus replay: %v", err)) + } +} + +func RunReplay() { + msgLogFile := config.GetString("cs_msg_log") + if msgLogFile == "" { + Exit("cs_msg_log file name not set in tendermint config") + } + + consensusState := newConsensusState() - if err := consensusState.ReplayMessagesFromFile(msgLogFile); err != nil { + if err := consensusState.ReplayMessages(msgLogFile); err != nil { Exit(Fmt("Error during consensus replay: %v", err)) } log.Notice("Replay run successfully")