|
|
@ -77,7 +77,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte |
|
|
|
|
|
|
|
// replay only those messages since the last block.
|
|
|
|
// timeoutRoutine should run concurrently to read off tickChan
|
|
|
|
func (cs *ConsensusState) catchupReplay(height int) error { |
|
|
|
func (cs *ConsensusState) catchupReplay(csHeight int) error { |
|
|
|
if !cs.wal.Exists() { |
|
|
|
return nil |
|
|
|
} |
|
|
@ -88,6 +88,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { |
|
|
|
|
|
|
|
// starting from end of file,
|
|
|
|
// read messages until a new height is found
|
|
|
|
var walHeight int |
|
|
|
nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { |
|
|
|
var err error |
|
|
|
var msg ConsensusLogMessage |
|
|
@ -96,8 +97,8 @@ func (cs *ConsensusState) catchupReplay(height int) error { |
|
|
|
panic(Fmt("Failed to read cs_msg_log json: %v", err)) |
|
|
|
} |
|
|
|
m, ok := msg.Msg.(types.EventDataRoundState) |
|
|
|
walHeight = m.Height |
|
|
|
if ok && m.Step == RoundStepNewHeight.String() { |
|
|
|
// TODO: ensure the height matches
|
|
|
|
return true |
|
|
|
} |
|
|
|
return false |
|
|
@ -107,12 +108,23 @@ func (cs *ConsensusState) catchupReplay(height int) error { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// ensure the height matches
|
|
|
|
if walHeight != csHeight { |
|
|
|
var err error |
|
|
|
if walHeight > csHeight { |
|
|
|
err = errors.New(Fmt("WAL height (%d) exceeds cs height (%d). Is your cs.state corrupted?", walHeight, csHeight)) |
|
|
|
} else { |
|
|
|
log.Notice("Replay: nothing to do", "cs.height", csHeight, "wal.height", walHeight) |
|
|
|
} |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
var beginning bool // if we had to go back to the beginning
|
|
|
|
if c, _ := cs.wal.fp.Seek(0, 1); c == 0 { |
|
|
|
beginning = true |
|
|
|
} |
|
|
|
|
|
|
|
log.Notice("Catchup by replaying consensus messages", "n", nLines) |
|
|
|
log.Notice("Catchup by replaying consensus messages", "n", nLines, "height", walHeight) |
|
|
|
|
|
|
|
// now we can replay the latest nLines on consensus state
|
|
|
|
// note we can't use scan because we've already been reading from the file
|
|
|
|