diff --git a/consensus/replay.go b/consensus/replay.go index afdbcf9f2..54e3c134f 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -11,6 +11,7 @@ import ( "strings" "time" + auto "github.com/tendermint/go-autofile" . "github.com/tendermint/go-common" "github.com/tendermint/go-wire" @@ -18,12 +19,17 @@ import ( "github.com/tendermint/tendermint/types" ) -// unmarshal and apply a single message to the consensus state +// Unmarshal and apply a single message to the consensus state // as if it were received in receiveRoutine +// Lines that start with "#" are ignored. // NOTE: receiveRoutine should not be running func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error { + // Skip over empty and meta lines + if len(msgBytes) == 0 || msgBytes[0] == '#' { + return nil + } var err error - var msg ConsensusLogMessage + var msg TimedWALMessage wire.ReadJSON(&msg, msgBytes, &err) if err != nil { fmt.Println("MsgBytes:", msgBytes, string(msgBytes)) @@ -70,7 +76,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) cs.handleTimeout(m, cs.RoundState) default: - return fmt.Errorf("Replay: Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + return fmt.Errorf("Replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg)) } return nil } @@ -78,83 +84,45 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte // replay only those messages since the last block. // timeoutRoutine should run concurrently to read off tickChan func (cs *ConsensusState) catchupReplay(csHeight int) error { - if !cs.wal.Exists() { - return nil - } // set replayMode cs.replayMode = true defer func() { cs.replayMode = false }() - // starting from end of file, - // read messages until a new height is found - var walHeight int - nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { - var err error - var msg ConsensusLogMessage - wire.ReadJSON(&msg, lineBytes, &err) - if err != nil { - panic(Fmt("Failed to read cs_msg_log json: %v", err)) - } - m, ok := msg.Msg.(types.EventDataRoundState) - walHeight = m.Height - if ok && m.Step == RoundStepNewHeight.String() { - return true - } - return false - }) + // Ensure that height+1 doesn't exist + gr, found, err := cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight+1)) + if found { + return errors.New(Fmt("WAL should not contain height %d.", csHeight+1)) + } + if gr != nil { + gr.Close() + } + // Search for height marker + gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) if err != nil { return err } - - // ensure the height matches - if walHeight != csHeight { - var err error - if walHeight > csHeight { - err = errors.New(Fmt("WAL height (%d) exceeds cs height (%d). Is your cs.state corrupted?", walHeight, csHeight)) - } else { - log.Notice("Replay: nothing to do", "cs.height", csHeight, "wal.height", walHeight) - } - return err + if !found { + return errors.New(Fmt("WAL does not contain height %d.", csHeight)) } + defer gr.Close() - var beginning bool // if we had to go back to the beginning - if c, _ := cs.wal.fp.Seek(0, 1); c == 0 { - beginning = true - } + log.Notice("Catchup by replaying consensus messages", "height", csHeight) - log.Notice("Catchup by replaying consensus messages", "n", nLines, "height", walHeight) - - // now we can replay the latest nLines on consensus state - // note we can't use scan because we've already been reading from the file - // XXX: if a msg is too big we need to find out why or increase this for that case ... - maxMsgSize := 1000000 - reader := bufio.NewReaderSize(cs.wal.fp, maxMsgSize) - for i := 0; i < nLines; i++ { - msgBytes, err := reader.ReadBytes('\n') - if err == io.EOF { - log.Warn("Replay: EOF", "bytes", string(msgBytes)) - break - } else if err != nil { - return err - } else if len(msgBytes) == 0 { - log.Warn("Replay: msg bytes is 0") - continue - } else if len(msgBytes) == 1 && msgBytes[0] == '\n' { - log.Warn("Replay: new line") - continue - } - // the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it - if !beginning && i == 1 { - log.Warn("Replay: not beginning and 1") - continue + for { + line, err := gr.ReadLine() + if err != nil { + if err == io.EOF { + break + } else { + return err + } } - // NOTE: since the priv key is set when the msgs are received // it will attempt to eg double sign but we can just ignore it // since the votes will be replayed and we'll get to the next step - if err := cs.readReplayMessage(msgBytes, nil); err != nil { + if err := cs.readReplayMessage([]byte(line), nil); err != nil { return err } } @@ -245,6 +213,7 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm. func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { pb.cs.Stop() + pb.cs.Wait() newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool) newCS.SetEventSwitch(pb.cs.evsw) @@ -376,3 +345,28 @@ func (pb *playback) replayConsoleLoop() int { } return 0 } + +//-------------------------------------------------------------------------------- + +// Parses marker lines of the form: +// #HEIGHT: 12345 +func makeHeightSearchFunc(height int) auto.SearchFunc { + return func(line string) (int, error) { + line = strings.TrimRight(line, "\n") + parts := strings.Split(line, " ") + if len(parts) != 2 { + return -1, errors.New("Line did not have 2 parts") + } + i, err := strconv.Atoi(parts[1]) + if err != nil { + return -1, errors.New("Failed to parse INFO: " + err.Error()) + } + if height < i { + return 1, nil + } else if height == i { + return 0, nil + } else { + return -1, nil + } + } +} diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 1e7c1e810..154545112 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -17,13 +17,13 @@ import ( var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data") // the priv validator changes step at these lines for a block with 1 val and 1 part -var baseStepChanges = []int{2, 5, 7} +var baseStepChanges = []int{3, 6, 8} // test recovery from each line in each testCase var testCases = []*testCase{ newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part) newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part - newTestCase("small_block2", []int{2, 7, 9}), // small block with txs across 3 smaller block parts + newTestCase("small_block2", []int{3, 8, 10}), // small block with txs across 3 smaller block parts } type testCase struct { @@ -108,6 +108,7 @@ func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh // should eventually be followed by a new block, or else something is wrong waitForBlock(newBlockCh, thisCase, i) cs.Stop() + cs.Wait() } func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { @@ -162,7 +163,7 @@ func TestReplayCrashBeforeWritePropose(t *testing.T) { cs, newBlockCh, proposalMsg, f := setupReplayTest(thisCase, lineNum, false) // propose // Set LastSig var err error - var msg ConsensusLogMessage + var msg TimedWALMessage wire.ReadJSON(&msg, []byte(proposalMsg), &err) proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) if err != nil { @@ -181,7 +182,7 @@ func TestReplayCrashBeforeWritePrevote(t *testing.T) { types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) { // Set LastSig var err error - var msg ConsensusLogMessage + var msg TimedWALMessage wire.ReadJSON(&msg, []byte(voteMsg), &err) vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) if err != nil { @@ -201,7 +202,7 @@ func TestReplayCrashBeforeWritePrecommit(t *testing.T) { types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) { // Set LastSig var err error - var msg ConsensusLogMessage + var msg TimedWALMessage wire.ReadJSON(&msg, []byte(voteMsg), &err) vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) if err != nil { diff --git a/consensus/state.go b/consensus/state.go index b8052e9f2..8eb95ddbc 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -343,6 +343,7 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.BaseService.OnStop() + // Make BaseService.Wait() wait until cs.wal.Wait() if cs.wal != nil && cs.IsRunning() { cs.wal.Wait() } @@ -658,7 +659,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // close wal now that we're done writing to it if cs.wal != nil { - cs.wal.Close() + cs.wal.Stop() } return } diff --git a/consensus/test_data/empty_block.cswal b/consensus/test_data/empty_block.cswal index 65800c429..29361c53e 100644 --- a/consensus/test_data/empty_block.cswal +++ b/consensus/test_data/empty_block.cswal @@ -1,3 +1,4 @@ +#HEIGHT: 1 {"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]} {"time":"2016-04-03T11:23:54.388Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-04-03T11:23:54.388Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"pol_round":-1,"signature":"3A2ECD5023B21EC144EC16CFF1B992A4321317B83EEDD8969FDFEA6EB7BF4389F38DDA3E7BB109D63A07491C16277A197B241CF1F05F5E485C59882ECACD9E07"}}],"peer_key":""}]} diff --git a/consensus/test_data/small_block1.cswal b/consensus/test_data/small_block1.cswal index 738f7951a..232bf20f2 100644 --- a/consensus/test_data/small_block1.cswal +++ b/consensus/test_data/small_block1.cswal @@ -1,3 +1,4 @@ +#HEIGHT: 1 {"time":"2016-10-11T15:29:08.113Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]} {"time":"2016-10-11T15:29:08.115Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-10-11T15:29:08.115Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"A2C0B5D384DFF2692FF679D00CEAE93A55DCDD1A"},"pol_round":-1,"signature":"116961B715FB54C09983209F7F3BFD95C7DA2E0D7AB9CFE9F0750F2402E2AEB715CFD55FB2C5DB88F485391D426A48705E0474AB94328463290D086D88BAD704"}}],"peer_key":""}]} diff --git a/consensus/test_data/small_block2.cswal b/consensus/test_data/small_block2.cswal index fdb07b0b2..84d860223 100644 --- a/consensus/test_data/small_block2.cswal +++ b/consensus/test_data/small_block2.cswal @@ -1,3 +1,4 @@ +#HEIGHT: 1 {"time":"2016-10-11T16:21:23.438Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]} {"time":"2016-10-11T16:21:23.440Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-10-11T16:21:23.440Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":3,"hash":"88BC082C86DED0A5E2BBC3677B610D155FEDBCEA"},"pol_round":-1,"signature":"8F74F7032E50DFBC17E8B42DD15FD54858B45EEB1B8DAF6432AFBBB1333AC1E850290DE82DF613A10430EB723023527498D45C106FD2946FEF03A9C8B301020B"}}],"peer_key":""}]} diff --git a/consensus/wal.go b/consensus/wal.go index aa2dabe32..ce3a96f8a 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -1,10 +1,9 @@ package consensus import ( - "bufio" - "os" "time" + auto "github.com/tendermint/go-autofile" . "github.com/tendermint/go-common" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" @@ -13,15 +12,15 @@ import ( //-------------------------------------------------------- // types and functions for savings consensus messages -type ConsensusLogMessage struct { - Time time.Time `json:"time"` - Msg ConsensusLogMessageInterface `json:"msg"` +type TimedWALMessage struct { + Time time.Time `json:"time"` + Msg WALMessage `json:"msg"` } -type ConsensusLogMessageInterface interface{} +type WALMessage interface{} var _ = wire.RegisterInterface( - struct{ ConsensusLogMessageInterface }{}, + struct{ WALMessage }{}, wire.ConcreteType{types.EventDataRoundState{}, 0x01}, wire.ConcreteType{msgInfo{}, 0x02}, wire.ConcreteType{timeoutInfo{}, 0x03}, @@ -35,119 +34,59 @@ var _ = wire.RegisterInterface( // TODO: currently the wal is overwritten during replay catchup // give it a mode so it's either reading or appending - must read to end to start appending again type WAL struct { - fp *os.File - exists bool // if the file already existed (restarted process) - - done chan struct{} + BaseService + group *auto.Group light bool // ignore block parts } -func NewWAL(file string, light bool) (*WAL, error) { - var walExists bool - if _, err := os.Stat(file); err == nil { - walExists = true +func NewWAL(path string, light bool) (*WAL, error) { + head, err := auto.OpenAutoFile(path) + if err != nil { + return nil, err } - fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) + group, err := auto.OpenGroup(head) if err != nil { return nil, err } - return &WAL{ - fp: fp, - exists: walExists, - done: make(chan struct{}), - light: light, - }, nil + wal := &WAL{ + group: group, + light: light, + } + wal.BaseService = *NewBaseService(log, "WAL", wal) + return wal, nil } -func (wal *WAL) Exists() bool { - if wal == nil { - log.Warn("consensus msg log is nil") - return false - } - return wal.exists +func (wal *WAL) OnStop() { + wal.BaseService.OnStop() + wal.group.Head.Close() + wal.group.Close() } // called in newStep and for each pass in receiveRoutine -func (wal *WAL) Save(clm ConsensusLogMessageInterface) { +func (wal *WAL) Save(wmsg WALMessage) { if wal == nil { return } if wal.light { // in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts) - if mi, ok := clm.(msgInfo); ok { + if mi, ok := wmsg.(msgInfo); ok { _ = mi if mi.PeerKey != "" { return } } } - var clmBytes = wire.JSONBytes(ConsensusLogMessage{time.Now(), clm}) - var n int - var err error - wire.WriteTo(append(clmBytes, byte('\n')), wal.fp, &n, &err) // one message per line - if err != nil { - PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, clm)) - } -} - -// Must not be called concurrently with a write. -func (wal *WAL) Close() { - if wal != nil { - wal.fp.Close() + // Write #HEIGHT: XYZ if new height + if edrs, ok := wmsg.(types.EventDataRoundState); ok { + if edrs.Step == RoundStepNewHeight.String() { + wal.group.WriteLine(Fmt("#HEIGHT: %v", edrs.Height)) + } } - wal.done <- struct{}{} -} - -func (wal *WAL) Wait() { - <-wal.done -} - -// TODO: remove once we stop supporting older golang version -const ( - ioSeekStart = 0 - ioSeekCurrent = 1 - ioSeekEnd = 2 -) - -func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { - var current int64 - // start at the end - current, err = wal.fp.Seek(0, ioSeekEnd) + // Write the wal message + var wmsgBytes = wire.JSONBytes(TimedWALMessage{time.Now(), wmsg}) + err := wal.group.WriteLine(string(wmsgBytes)) if err != nil { - return - } - - // backup until we find the the right line - // current is how far we are from the beginning - for { - current -= 1 - if current < 0 { - wal.fp.Seek(0, ioSeekStart) // back to beginning - return - } - // backup one and read a new byte - if _, err = wal.fp.Seek(current, ioSeekStart); err != nil { - return - } - b := make([]byte, 1) - if _, err = wal.fp.Read(b); err != nil { - return - } - if b[0] == '\n' || len(b) == 0 { - nLines += 1 - // read a full line - reader := bufio.NewReader(wal.fp) - lineBytes, _ := reader.ReadBytes('\n') - if len(lineBytes) == 0 { - continue - } - - if found(lineBytes) { - wal.fp.Seek(0, ioSeekCurrent) // (?) - wal.fp.Seek(current, ioSeekStart) - return - } - } + PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, wmsg)) } } diff --git a/consensus/wal_test.go b/consensus/wal_test.go deleted file mode 100644 index 648692e40..000000000 --- a/consensus/wal_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package consensus - -import ( - "io/ioutil" - "os" - "path" - "strings" - "testing" - - . "github.com/tendermint/go-common" -) - -var testTxt = `{"time":"2016-01-16T04:42:00.390Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrevote"}]} -{"time":"2016-01-16T04:42:00.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":1,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"4CC6845A128E723A299B470CCBB2A158612AA51321447F6492F3DA57D135C27FCF4124B3B19446A248252BDA45B152819C76AAA5FD35E1C07091885CE6955E05"}}],"peer_key":""}]} -{"time":"2016-01-16T04:42:00.392Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrecommit"}]} -{"time":"2016-01-16T04:42:00.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":2,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"1B9924E010F47E0817695DFE462C531196E5A12632434DE12180BBA3EFDAD6B3960FDB9357AFF085EB61729A7D4A6AD8408555D7569C87D9028F280192FD4E05"}}],"peer_key":""}]} -{"time":"2016-01-16T04:42:00.393Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepCommit"}]} -{"time":"2016-01-16T04:42:00.395Z","msg":[1,{"height":28220,"round":0,"step":"RoundStepNewHeight"}]}` - -func TestSeek(t *testing.T) { - f, err := ioutil.TempFile(os.TempDir(), "seek_test_") - if err != nil { - panic(err) - } - - stat, _ := f.Stat() - name := stat.Name() - - _, err = f.WriteString(testTxt) - if err != nil { - panic(err) - } - f.Close() - - wal, err := NewWAL(path.Join(os.TempDir(), name), config.GetBool("cswal_light")) - if err != nil { - panic(err) - } - - keyWord := "Precommit" - n, err := wal.SeekFromEnd(func(b []byte) bool { - if strings.Contains(string(b), keyWord) { - return true - } - return false - }) - if err != nil { - panic(err) - } - - // confirm n - spl := strings.Split(testTxt, "\n") - var i int - var s string - for i, s = range spl { - if strings.Contains(s, keyWord) { - break - } - } - // n is lines from the end. - spl = spl[i:] - if n != len(spl) { - panic(Fmt("Wrong nLines. Got %d, expected %d", n, len(spl))) - } - - b, err := ioutil.ReadAll(wal.fp) - if err != nil { - panic(err) - } - // first char is a \n - spl2 := strings.Split(strings.Trim(string(b), "\n"), "\n") - for i, s := range spl { - if s != spl2[i] { - panic(Fmt("Mismatch. Got %s, expected %s", spl2[i], s)) - } - } - -} diff --git a/glide.lock b/glide.lock index 412f320df..9c5cc7e2d 100644 --- a/glide.lock +++ b/glide.lock @@ -52,7 +52,7 @@ imports: - name: github.com/tendermint/flowcontrol version: 84d9671090430e8ec80e35b339907e0579b999eb - name: github.com/tendermint/go-autofile - version: c26b857900009ac81c78c1bc03f85e0c8e47818a + version: 916f3d789b6afaf7bfe161aeec391c8a35e354a8 - name: github.com/tendermint/go-clist version: 3baa390bbaf7634251c42ad69a8682e7e3990552 - name: github.com/tendermint/go-common