|
|
@ -15,67 +15,83 @@ import ( |
|
|
|
"github.com/tendermint/go-wire" |
|
|
|
|
|
|
|
sm "github.com/tendermint/tendermint/state" |
|
|
|
"github.com/tendermint/tendermint/tailseek" |
|
|
|
"github.com/tendermint/tendermint/types" |
|
|
|
) |
|
|
|
|
|
|
|
//--------------------------------------------------------
|
|
|
|
// types and functions for savings consensus messages
|
|
|
|
|
|
|
|
type ConsensusLogMessage struct { |
|
|
|
Time time.Time `json:"time"` |
|
|
|
Msg ConsensusLogMessageInterface `json:"msg"` |
|
|
|
} |
|
|
|
|
|
|
|
type ConsensusLogMessageInterface interface{} |
|
|
|
|
|
|
|
var _ = wire.RegisterInterface( |
|
|
|
struct{ ConsensusLogMessageInterface }{}, |
|
|
|
wire.ConcreteType{&types.EventDataRoundState{}, 0x01}, |
|
|
|
wire.ConcreteType{msgInfo{}, 0x02}, |
|
|
|
wire.ConcreteType{timeoutInfo{}, 0x03}, |
|
|
|
) |
|
|
|
// unmarshal and apply a single message to the consensus state
|
|
|
|
func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error { |
|
|
|
var err error |
|
|
|
var msg ConsensusLogMessage |
|
|
|
wire.ReadJSON(&msg, msgBytes, &err) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("Error reading json data: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
// called in newStep and for each pass in receiveRoutine
|
|
|
|
func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { |
|
|
|
if cs.msgLogFP != nil { |
|
|
|
var n int |
|
|
|
var err error |
|
|
|
wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, cs.msgLogFP, &n, &err) |
|
|
|
wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line
|
|
|
|
if err != nil { |
|
|
|
log.Error("Error writing to consensus message log file", "err", err, "msg", msg) |
|
|
|
// for logging
|
|
|
|
switch m := msg.Msg.(type) { |
|
|
|
case *types.EventDataRoundState: |
|
|
|
log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) |
|
|
|
// these are playback checks
|
|
|
|
ticker := time.After(time.Second * 2) |
|
|
|
if newStepCh != nil { |
|
|
|
select { |
|
|
|
case mi := <-newStepCh: |
|
|
|
m2 := mi.(*types.EventDataRoundState) |
|
|
|
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { |
|
|
|
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) |
|
|
|
} |
|
|
|
case <-ticker: |
|
|
|
return fmt.Errorf("Failed to read off newStepCh") |
|
|
|
} |
|
|
|
} |
|
|
|
case msgInfo: |
|
|
|
peerKey := m.PeerKey |
|
|
|
if peerKey == "" { |
|
|
|
peerKey = "local" |
|
|
|
} |
|
|
|
switch msg := m.Msg.(type) { |
|
|
|
case *ProposalMessage: |
|
|
|
p := msg.Proposal |
|
|
|
log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", |
|
|
|
p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) |
|
|
|
case *BlockPartMessage: |
|
|
|
log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) |
|
|
|
case *VoteMessage: |
|
|
|
v := msg.Vote |
|
|
|
log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, |
|
|
|
"hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) |
|
|
|
} |
|
|
|
// internal or from peer
|
|
|
|
if m.PeerKey == "" { |
|
|
|
cs.internalMsgQueue <- m |
|
|
|
} else { |
|
|
|
cs.peerMsgQueue <- m |
|
|
|
} |
|
|
|
case timeoutInfo: |
|
|
|
log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) |
|
|
|
cs.tockChan <- m |
|
|
|
default: |
|
|
|
return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
//--------------------------------------------------------
|
|
|
|
// replay the messages
|
|
|
|
|
|
|
|
// Interactive playback
|
|
|
|
func (cs ConsensusState) ReplayConsole(file string) error { |
|
|
|
return cs.replay(file, true) |
|
|
|
} |
|
|
|
|
|
|
|
// Full playback, with tests
|
|
|
|
func (cs ConsensusState) ReplayMessages(file string) error { |
|
|
|
return cs.replay(file, false) |
|
|
|
} |
|
|
|
|
|
|
|
// replay only those messages since the last block
|
|
|
|
func (cs *ConsensusState) catchupReplay(height int) error { |
|
|
|
if !cs.msgLogExists { |
|
|
|
if cs.wal == nil { |
|
|
|
log.Warn("consensus msg log is nil") |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
if cs.msgLogFP == nil { |
|
|
|
log.Warn("consensus msg log is nil") |
|
|
|
if !cs.wal.exists { |
|
|
|
// new wal, nothing to catchup on
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
log.Notice("Catchup by replaying consensus messages") |
|
|
|
f := cs.msgLogFP |
|
|
|
|
|
|
|
n, err := seek.SeekFromEndOfFile(f, func(lineBytes []byte) bool { |
|
|
|
// starting from end of file,
|
|
|
|
// read messages until a new height is found
|
|
|
|
nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { |
|
|
|
var err error |
|
|
|
var msg ConsensusLogMessage |
|
|
|
wire.ReadJSON(&msg, lineBytes, &err) |
|
|
@ -84,7 +100,6 @@ func (cs *ConsensusState) catchupReplay(height int) error { |
|
|
|
} |
|
|
|
m, ok := msg.Msg.(*types.EventDataRoundState) |
|
|
|
if ok && m.Step == RoundStepNewHeight.String() { |
|
|
|
f.Seek(0, 1) |
|
|
|
// TODO: ensure the height matches
|
|
|
|
return true |
|
|
|
} |
|
|
@ -95,13 +110,10 @@ func (cs *ConsensusState) catchupReplay(height int) error { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// we found it, now we can replay everything
|
|
|
|
pb := newPlayback("", cs.msgLogFP, cs, nil, cs.state.Copy()) |
|
|
|
|
|
|
|
reader := bufio.NewReader(cs.msgLogFP) |
|
|
|
i := 0 |
|
|
|
for { |
|
|
|
i += 1 |
|
|
|
// now we can replay the latest nLines on consensus state
|
|
|
|
// note we can't use scan because we've already been reading from the file
|
|
|
|
reader := bufio.NewReader(cs.wal.fp) |
|
|
|
for i := 0; i < nLines; i++ { |
|
|
|
msgBytes, err := reader.ReadBytes('\n') |
|
|
|
if err == io.EOF { |
|
|
|
break |
|
|
@ -118,16 +130,24 @@ func (cs *ConsensusState) catchupReplay(height int) error { |
|
|
|
// NOTE: since the priv key is set when the msgs are received
|
|
|
|
// it will attempt to eg double sign but we can just ignore it
|
|
|
|
// since the votes will be replayed and we'll get to the next step
|
|
|
|
if err := pb.readReplayMessage(msgBytes); err != nil { |
|
|
|
if err := cs.readReplayMessage(msgBytes, nil); err != nil { |
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
if i >= n { |
|
|
|
break |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
//--------------------------------------------------------
|
|
|
|
// replay messages interactively or all at once
|
|
|
|
|
|
|
|
// Interactive playback
|
|
|
|
func (cs ConsensusState) ReplayConsole(file string) error { |
|
|
|
return cs.replay(file, true) |
|
|
|
} |
|
|
|
|
|
|
|
// Full playback, with tests
|
|
|
|
func (cs ConsensusState) ReplayMessages(file string) error { |
|
|
|
return cs.replay(file, false) |
|
|
|
} |
|
|
|
|
|
|
|
// replay all msgs or start the console
|
|
|
@ -135,24 +155,22 @@ func (cs *ConsensusState) replay(file string, console bool) error { |
|
|
|
if cs.IsRunning() { |
|
|
|
return errors.New("cs is already running, cannot replay") |
|
|
|
} |
|
|
|
if cs.wal != nil { |
|
|
|
return errors.New("cs wal is open, cannot replay") |
|
|
|
} |
|
|
|
|
|
|
|
cs.startForReplay() |
|
|
|
|
|
|
|
// set the FP to nil so we don't overwrite
|
|
|
|
if cs.msgLogFP != nil { |
|
|
|
cs.msgLogFP.Close() |
|
|
|
cs.msgLogFP = nil |
|
|
|
} |
|
|
|
|
|
|
|
// ensure all new step events are regenerated as expected
|
|
|
|
newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) |
|
|
|
|
|
|
|
// just open the file for reading, no need to use wal
|
|
|
|
fp, err := os.OpenFile(file, os.O_RDONLY, 0666) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
pb := newPlayback(file, fp, cs, newStepCh, cs.state.Copy()) |
|
|
|
pb := newPlayback(file, fp, cs, cs.state.Copy()) |
|
|
|
defer pb.fp.Close() |
|
|
|
|
|
|
|
var nextN int // apply N msgs in a row
|
|
|
@ -161,7 +179,7 @@ func (cs *ConsensusState) replay(file string, console bool) error { |
|
|
|
nextN = pb.replayConsoleLoop() |
|
|
|
} |
|
|
|
|
|
|
|
if err := pb.readReplayMessage(pb.scanner.Bytes()); err != nil { |
|
|
|
if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
@ -177,41 +195,39 @@ func (cs *ConsensusState) replay(file string, console bool) error { |
|
|
|
// playback manager
|
|
|
|
|
|
|
|
type playback struct { |
|
|
|
cs *ConsensusState |
|
|
|
file string |
|
|
|
fp *os.File |
|
|
|
scanner *bufio.Scanner |
|
|
|
newStepCh chan interface{} |
|
|
|
genesisState *sm.State |
|
|
|
count int |
|
|
|
cs *ConsensusState |
|
|
|
|
|
|
|
fp *os.File |
|
|
|
scanner *bufio.Scanner |
|
|
|
count int // how many lines/msgs into the file are we
|
|
|
|
|
|
|
|
// replays can be reset to beginning
|
|
|
|
fileName string // so we can close/reopen the file
|
|
|
|
genesisState *sm.State // so the replay session knows where to restart from
|
|
|
|
} |
|
|
|
|
|
|
|
func newPlayback(file string, fp *os.File, cs *ConsensusState, ch chan interface{}, genState *sm.State) *playback { |
|
|
|
func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback { |
|
|
|
return &playback{ |
|
|
|
cs: cs, |
|
|
|
file: file, |
|
|
|
newStepCh: ch, |
|
|
|
genesisState: genState, |
|
|
|
fp: fp, |
|
|
|
fileName: fileName, |
|
|
|
genesisState: genState, |
|
|
|
scanner: bufio.NewScanner(fp), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// go back count steps by resetting the state and running (pb.count - count) steps
|
|
|
|
func (pb *playback) replayReset(count int) error { |
|
|
|
func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { |
|
|
|
|
|
|
|
pb.cs.Stop() |
|
|
|
|
|
|
|
newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool) |
|
|
|
newCs.SetEventSwitch(pb.cs.evsw) |
|
|
|
|
|
|
|
// ensure all new step events are regenerated as expected
|
|
|
|
pb.newStepCh = newCs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) |
|
|
|
|
|
|
|
newCs.startForReplay() |
|
|
|
|
|
|
|
pb.fp.Close() |
|
|
|
fp, err := os.OpenFile(pb.file, os.O_RDONLY, 0666) |
|
|
|
fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0666) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
@ -222,7 +238,7 @@ func (pb *playback) replayReset(count int) error { |
|
|
|
pb.count = 0 |
|
|
|
pb.cs = newCs |
|
|
|
for i := 0; pb.scanner.Scan() && i < count; i++ { |
|
|
|
if err := pb.readReplayMessage(pb.scanner.Bytes()); err != nil { |
|
|
|
if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
pb.count += 1 |
|
|
@ -285,8 +301,10 @@ func (pb *playback) replayConsoleLoop() int { |
|
|
|
// NOTE: "back" is not supported in the state machine design,
|
|
|
|
// so we restart and replay up to
|
|
|
|
|
|
|
|
// ensure all new step events are regenerated as expected
|
|
|
|
newStepCh := pb.cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) |
|
|
|
if len(tokens) == 1 { |
|
|
|
pb.replayReset(1) |
|
|
|
pb.replayReset(1, newStepCh) |
|
|
|
} else { |
|
|
|
i, err := strconv.Atoi(tokens[1]) |
|
|
|
if err != nil { |
|
|
@ -294,7 +312,7 @@ func (pb *playback) replayConsoleLoop() int { |
|
|
|
} else if i > pb.count { |
|
|
|
fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count) |
|
|
|
} else { |
|
|
|
pb.replayReset(i) |
|
|
|
pb.replayReset(i, newStepCh) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -333,103 +351,3 @@ func (pb *playback) replayConsoleLoop() int { |
|
|
|
} |
|
|
|
return 0 |
|
|
|
} |
|
|
|
|
|
|
|
func (pb *playback) readReplayMessage(msgBytes []byte) error { |
|
|
|
var err error |
|
|
|
var msg ConsensusLogMessage |
|
|
|
wire.ReadJSON(&msg, msgBytes, &err) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("Error reading json data: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
// for logging
|
|
|
|
switch m := msg.Msg.(type) { |
|
|
|
case *types.EventDataRoundState: |
|
|
|
log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) |
|
|
|
// these are playback checks
|
|
|
|
ticker := time.After(time.Second * 2) |
|
|
|
if pb.newStepCh != nil { |
|
|
|
select { |
|
|
|
case mi := <-pb.newStepCh: |
|
|
|
m2 := mi.(*types.EventDataRoundState) |
|
|
|
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { |
|
|
|
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) |
|
|
|
} |
|
|
|
case <-ticker: |
|
|
|
return fmt.Errorf("Failed to read off newStepCh") |
|
|
|
} |
|
|
|
} |
|
|
|
case msgInfo: |
|
|
|
peerKey := m.PeerKey |
|
|
|
if peerKey == "" { |
|
|
|
peerKey = "local" |
|
|
|
} |
|
|
|
switch msg := m.Msg.(type) { |
|
|
|
case *ProposalMessage: |
|
|
|
p := msg.Proposal |
|
|
|
log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", |
|
|
|
p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) |
|
|
|
case *BlockPartMessage: |
|
|
|
log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) |
|
|
|
case *VoteMessage: |
|
|
|
v := msg.Vote |
|
|
|
log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, |
|
|
|
"hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) |
|
|
|
} |
|
|
|
// internal or from peer
|
|
|
|
if m.PeerKey == "" { |
|
|
|
pb.cs.internalMsgQueue <- m |
|
|
|
} else { |
|
|
|
pb.cs.peerMsgQueue <- m |
|
|
|
} |
|
|
|
case timeoutInfo: |
|
|
|
log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) |
|
|
|
pb.cs.tockChan <- m |
|
|
|
default: |
|
|
|
return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// Read lines starting from the end of the file until we read a line that causes found to return true
|
|
|
|
func SeekFromEndOfFile(f *os.File, found func([]byte) bool) (nLines int, err error) { |
|
|
|
var current int64 |
|
|
|
// start at the end
|
|
|
|
current, err = f.Seek(0, 2) |
|
|
|
if err != nil { |
|
|
|
fmt.Println("1") |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// backup until we find the the right line
|
|
|
|
for { |
|
|
|
current -= 1 |
|
|
|
if current < 0 { |
|
|
|
return |
|
|
|
} |
|
|
|
// backup one and read a new byte
|
|
|
|
if _, err = f.Seek(current, 0); err != nil { |
|
|
|
fmt.Println("2", current) |
|
|
|
return |
|
|
|
} |
|
|
|
b := make([]byte, 1) |
|
|
|
if _, err = f.Read(b); err != nil { |
|
|
|
return |
|
|
|
} |
|
|
|
if b[0] == '\n' || len(b) == 0 { |
|
|
|
nLines += 1 |
|
|
|
|
|
|
|
// read a full line
|
|
|
|
reader := bufio.NewReader(f) |
|
|
|
lineBytes, _ := reader.ReadBytes('\n') |
|
|
|
if len(lineBytes) == 0 { |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
if found(lineBytes) { |
|
|
|
f.Seek(current, 0) |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |