From 56076d4d0e72112a72d6f0a6531db2b39abf3dc3 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 6 Jan 2016 18:42:12 -0500 Subject: [PATCH] some cleanup; log time with consensus msgs --- consensus/replay.go | 111 +++++++++++++++++++++----------------------- consensus/state.go | 8 ++++ 2 files changed, 62 insertions(+), 57 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index 5bb7d27b1..5e7e4d08c 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -21,7 +21,8 @@ import ( // types and functions for savings consensus messages type ConsensusLogMessage struct { - Msg ConsensusLogMessageInterface `json:"msg"` + Time time.Time `json:"time"` + Msg ConsensusLogMessageInterface `json:"msg"` } type ConsensusLogMessageInterface interface{} @@ -38,7 +39,7 @@ func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { if cs.msgLogFP != nil { var n int var err error - wire.WriteJSON(ConsensusLogMessage{msg}, cs.msgLogFP, &n, &err) + wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, cs.msgLogFP, &n, &err) wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line if err != nil { log.Error("Error writing to consensus message log file", "err", err, "msg", msg) @@ -46,16 +47,8 @@ func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { } } -// Open file to log all consensus messages and timeouts for deterministic accountability -func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) - return err -} - //-------------------------------------------------------- -// replay messages +// replay the messages // Interactive playback func (cs ConsensusState) ReplayConsole(file string) error { @@ -67,6 +60,52 @@ func (cs ConsensusState) ReplayMessages(file string) error { return cs.replay(file, false) } +// replay all msgs or start the console +func (cs *ConsensusState) replay(file string, console bool) error { + if cs.IsRunning() { + return errors.New("cs is already running, cannot replay") + } + + cs.startForReplay() + + // set the FP to nil so we don't overwrite + if cs.msgLogFP != nil { + cs.msgLogFP.Close() + cs.msgLogFP = nil + } + + // ensure all new step events are regenerated as expected + newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) + + fp, err := os.OpenFile(file, os.O_RDONLY, 0666) + if err != nil { + return err + } + + pb := newPlayback(file, fp, cs, newStepCh, cs.state.Copy()) + defer pb.fp.Close() + + var nextN int // apply N msgs in a row + for pb.scanner.Scan() { + if nextN == 0 && console { + nextN = pb.replayConsoleLoop() + } + + if err := pb.readReplayMessage(); err != nil { + return err + } + + if nextN > 0 { + nextN -= 1 + } + pb.count += 1 + } + return nil +} + +//------------------------------------------------ +// playback manager + type playback struct { cs *ConsensusState file string @@ -88,7 +127,7 @@ func newPlayback(file string, fp *os.File, cs *ConsensusState, ch chan interface } } -// reset the state and run (pb.count - count) steps +// go back count steps by resetting the state and running (pb.count - count) steps func (pb *playback) replayReset(count int) error { pb.cs.Stop() @@ -96,7 +135,7 @@ func (pb *playback) replayReset(count int) error { newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppCtx, pb.cs.blockStore, pb.cs.mempool) newCs.SetEventSwitch(pb.cs.evsw) - // we ensure all new step events are regenerated as expected + // ensure all new step events are regenerated as expected pb.newStepCh = newCs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) newCs.startForReplay() @@ -136,49 +175,7 @@ func (cs *ConsensusState) startForReplay() { }() } -// replay all msgs or start the console -func (cs *ConsensusState) replay(file string, console bool) error { - if cs.IsRunning() { - return errors.New("cs is already running, cannot replay") - } - - cs.startForReplay() - - if cs.msgLogFP != nil { - cs.msgLogFP.Close() - cs.msgLogFP = nil - } - - // we ensure all new step events are regenerated as expected - newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) - - fp, err := os.OpenFile(file, os.O_RDONLY, 0666) - if err != nil { - return err - } - - pb := newPlayback(file, fp, cs, newStepCh, cs.state.Copy()) - - defer pb.fp.Close() - - var nextN int // apply N msgs in a row - for pb.scanner.Scan() { - if nextN == 0 && console { - nextN = pb.replayConsoleLoop() - } - - if err := pb.readReplayMessage(); err != nil { - return err - } - - if nextN > 0 { - nextN -= 1 - } - pb.count += 1 - } - return nil -} - +// console function for parsing input and running commands func (pb *playback) replayConsoleLoop() int { for { fmt.Printf("> ") @@ -216,7 +213,7 @@ func (pb *playback) replayConsoleLoop() int { // "back N" -> go back N messages // NOTE: "back" is not supported in the state machine design, - // so we restart to do this (expensive ...) + // so we restart and replay up to if len(tokens) == 1 { pb.replayReset(1) diff --git a/consensus/state.go b/consensus/state.go index 232f9e959..d0d47a829 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -277,6 +277,14 @@ func (cs *ConsensusState) OnStop() { } } +// Open file to log all consensus messages and timeouts for deterministic accountability +func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + return err +} + //------------------------------------------------------------ // Public interface for passing messages into the consensus state, // possibly causing a state transition