Browse Source

some cleanup; log time with consensus msgs

pull/172/head
Ethan Buchman 9 years ago
parent
commit
56076d4d0e
2 changed files with 62 additions and 57 deletions
  1. +54
    -57
      consensus/replay.go
  2. +8
    -0
      consensus/state.go

+ 54
- 57
consensus/replay.go View File

@ -21,7 +21,8 @@ import (
// types and functions for savings consensus messages
type ConsensusLogMessage struct {
Msg ConsensusLogMessageInterface `json:"msg"`
Time time.Time `json:"time"`
Msg ConsensusLogMessageInterface `json:"msg"`
}
type ConsensusLogMessageInterface interface{}
@ -38,7 +39,7 @@ func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) {
if cs.msgLogFP != nil {
var n int
var err error
wire.WriteJSON(ConsensusLogMessage{msg}, cs.msgLogFP, &n, &err)
wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, cs.msgLogFP, &n, &err)
wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line
if err != nil {
log.Error("Error writing to consensus message log file", "err", err, "msg", msg)
@ -46,16 +47,8 @@ func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) {
}
}
// Open file to log all consensus messages and timeouts for deterministic accountability
func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
return err
}
//--------------------------------------------------------
// replay messages
// replay the messages
// Interactive playback
func (cs ConsensusState) ReplayConsole(file string) error {
@ -67,6 +60,52 @@ func (cs ConsensusState) ReplayMessages(file string) error {
return cs.replay(file, false)
}
// replay all msgs or start the console
func (cs *ConsensusState) replay(file string, console bool) error {
if cs.IsRunning() {
return errors.New("cs is already running, cannot replay")
}
cs.startForReplay()
// set the FP to nil so we don't overwrite
if cs.msgLogFP != nil {
cs.msgLogFP.Close()
cs.msgLogFP = nil
}
// ensure all new step events are regenerated as expected
newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1)
fp, err := os.OpenFile(file, os.O_RDONLY, 0666)
if err != nil {
return err
}
pb := newPlayback(file, fp, cs, newStepCh, cs.state.Copy())
defer pb.fp.Close()
var nextN int // apply N msgs in a row
for pb.scanner.Scan() {
if nextN == 0 && console {
nextN = pb.replayConsoleLoop()
}
if err := pb.readReplayMessage(); err != nil {
return err
}
if nextN > 0 {
nextN -= 1
}
pb.count += 1
}
return nil
}
//------------------------------------------------
// playback manager
type playback struct {
cs *ConsensusState
file string
@ -88,7 +127,7 @@ func newPlayback(file string, fp *os.File, cs *ConsensusState, ch chan interface
}
}
// reset the state and run (pb.count - count) steps
// go back count steps by resetting the state and running (pb.count - count) steps
func (pb *playback) replayReset(count int) error {
pb.cs.Stop()
@ -96,7 +135,7 @@ func (pb *playback) replayReset(count int) error {
newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppCtx, pb.cs.blockStore, pb.cs.mempool)
newCs.SetEventSwitch(pb.cs.evsw)
// we ensure all new step events are regenerated as expected
// ensure all new step events are regenerated as expected
pb.newStepCh = newCs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1)
newCs.startForReplay()
@ -136,49 +175,7 @@ func (cs *ConsensusState) startForReplay() {
}()
}
// replay all msgs or start the console
func (cs *ConsensusState) replay(file string, console bool) error {
if cs.IsRunning() {
return errors.New("cs is already running, cannot replay")
}
cs.startForReplay()
if cs.msgLogFP != nil {
cs.msgLogFP.Close()
cs.msgLogFP = nil
}
// we ensure all new step events are regenerated as expected
newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1)
fp, err := os.OpenFile(file, os.O_RDONLY, 0666)
if err != nil {
return err
}
pb := newPlayback(file, fp, cs, newStepCh, cs.state.Copy())
defer pb.fp.Close()
var nextN int // apply N msgs in a row
for pb.scanner.Scan() {
if nextN == 0 && console {
nextN = pb.replayConsoleLoop()
}
if err := pb.readReplayMessage(); err != nil {
return err
}
if nextN > 0 {
nextN -= 1
}
pb.count += 1
}
return nil
}
// console function for parsing input and running commands
func (pb *playback) replayConsoleLoop() int {
for {
fmt.Printf("> ")
@ -216,7 +213,7 @@ func (pb *playback) replayConsoleLoop() int {
// "back N" -> go back N messages
// NOTE: "back" is not supported in the state machine design,
// so we restart to do this (expensive ...)
// so we restart and replay up to
if len(tokens) == 1 {
pb.replayReset(1)


+ 8
- 0
consensus/state.go View File

@ -277,6 +277,14 @@ func (cs *ConsensusState) OnStop() {
}
}
// Open file to log all consensus messages and timeouts for deterministic accountability
func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
return err
}
//------------------------------------------------------------
// Public interface for passing messages into the consensus state,
// possibly causing a state transition


Loading…
Cancel
Save