diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index 7dd2ea902..9f3410517 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -41,10 +41,10 @@ Commands: case "node": node.RunNode(config) case "replay": - if len(args) > 1 && args[1] == "console" { - node.RunReplayConsole(config) + if len(args) > 2 && args[1] == "console" { + node.RunReplayConsole(config, args[2]) } else { - node.RunReplay(config) + node.RunReplay(config, args[1]) } case "init": init_files() diff --git a/cmd/tendermint/reset_priv_validator.go b/cmd/tendermint/reset_priv_validator.go index 2887c10d0..9ecbaa90b 100644 --- a/cmd/tendermint/reset_priv_validator.go +++ b/cmd/tendermint/reset_priv_validator.go @@ -11,7 +11,7 @@ import ( func reset_all() { reset_priv_validator() os.RemoveAll(config.GetString("db_dir")) - os.Remove(config.GetString("cswal")) + os.RemoveAll(config.GetString("cs_wal_dir")) } // NOTE: this is totally unsafe. diff --git a/config/tendermint/config.go b/config/tendermint/config.go index 465297ba3..86523473c 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -22,6 +22,7 @@ func getTMRoot(rootDir string) string { func initTMRoot(rootDir string) { rootDir = getTMRoot(rootDir) EnsureDir(rootDir, 0700) + EnsureDir(rootDir+"/data", 0700) configFilePath := path.Join(rootDir, "config.toml") @@ -68,8 +69,8 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:46657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") - mapConfig.SetDefault("cswal", rootDir+"/data/cswal") - mapConfig.SetDefault("cswal_light", false) + mapConfig.SetDefault("cs_wal_dir", rootDir+"/data/cs.wal") + mapConfig.SetDefault("cs_wal_light", false) mapConfig.SetDefault("filter_peers", false) mapConfig.SetDefault("block_size", 10000) @@ -84,7 +85,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("mempool_recheck", true) mapConfig.SetDefault("mempool_recheck_empty", true) mapConfig.SetDefault("mempool_broadcast", true) - mapConfig.SetDefault("mempool_wal", rootDir+"/data/mempool_wal") + mapConfig.SetDefault("mempool_wal_dir", rootDir+"/data/mempool.wal") return mapConfig } diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 6f3217475..0fe861ada 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -33,6 +33,7 @@ func initTMRoot(rootDir string) { } // Create new dir EnsureDir(rootDir, 0700) + EnsureDir(rootDir+"/data", 0700) configFilePath := path.Join(rootDir, "config.toml") genesisFilePath := path.Join(rootDir, "genesis.json") @@ -81,8 +82,8 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:36657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") - mapConfig.SetDefault("cswal", rootDir+"/data/cswal") - mapConfig.SetDefault("cswal_light", false) + mapConfig.SetDefault("cs_wal_dir", rootDir+"/data/cs.wal") + mapConfig.SetDefault("cs_wal_light", false) mapConfig.SetDefault("filter_peers", false) mapConfig.SetDefault("block_size", 10000) @@ -97,7 +98,7 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("mempool_recheck", true) mapConfig.SetDefault("mempool_recheck_empty", true) mapConfig.SetDefault("mempool_broadcast", true) - mapConfig.SetDefault("mempool_wal", "") + mapConfig.SetDefault("mempool_wal_dir", "") return mapConfig } diff --git a/consensus/replay.go b/consensus/replay.go index afdbcf9f2..54e3c134f 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -11,6 +11,7 @@ import ( "strings" "time" + auto "github.com/tendermint/go-autofile" . "github.com/tendermint/go-common" "github.com/tendermint/go-wire" @@ -18,12 +19,17 @@ import ( "github.com/tendermint/tendermint/types" ) -// unmarshal and apply a single message to the consensus state +// Unmarshal and apply a single message to the consensus state // as if it were received in receiveRoutine +// Lines that start with "#" are ignored. // NOTE: receiveRoutine should not be running func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error { + // Skip over empty and meta lines + if len(msgBytes) == 0 || msgBytes[0] == '#' { + return nil + } var err error - var msg ConsensusLogMessage + var msg TimedWALMessage wire.ReadJSON(&msg, msgBytes, &err) if err != nil { fmt.Println("MsgBytes:", msgBytes, string(msgBytes)) @@ -70,7 +76,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) cs.handleTimeout(m, cs.RoundState) default: - return fmt.Errorf("Replay: Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + return fmt.Errorf("Replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg)) } return nil } @@ -78,83 +84,45 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte // replay only those messages since the last block. // timeoutRoutine should run concurrently to read off tickChan func (cs *ConsensusState) catchupReplay(csHeight int) error { - if !cs.wal.Exists() { - return nil - } // set replayMode cs.replayMode = true defer func() { cs.replayMode = false }() - // starting from end of file, - // read messages until a new height is found - var walHeight int - nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { - var err error - var msg ConsensusLogMessage - wire.ReadJSON(&msg, lineBytes, &err) - if err != nil { - panic(Fmt("Failed to read cs_msg_log json: %v", err)) - } - m, ok := msg.Msg.(types.EventDataRoundState) - walHeight = m.Height - if ok && m.Step == RoundStepNewHeight.String() { - return true - } - return false - }) + // Ensure that height+1 doesn't exist + gr, found, err := cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight+1)) + if found { + return errors.New(Fmt("WAL should not contain height %d.", csHeight+1)) + } + if gr != nil { + gr.Close() + } + // Search for height marker + gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight)) if err != nil { return err } - - // ensure the height matches - if walHeight != csHeight { - var err error - if walHeight > csHeight { - err = errors.New(Fmt("WAL height (%d) exceeds cs height (%d). Is your cs.state corrupted?", walHeight, csHeight)) - } else { - log.Notice("Replay: nothing to do", "cs.height", csHeight, "wal.height", walHeight) - } - return err + if !found { + return errors.New(Fmt("WAL does not contain height %d.", csHeight)) } + defer gr.Close() - var beginning bool // if we had to go back to the beginning - if c, _ := cs.wal.fp.Seek(0, 1); c == 0 { - beginning = true - } + log.Notice("Catchup by replaying consensus messages", "height", csHeight) - log.Notice("Catchup by replaying consensus messages", "n", nLines, "height", walHeight) - - // now we can replay the latest nLines on consensus state - // note we can't use scan because we've already been reading from the file - // XXX: if a msg is too big we need to find out why or increase this for that case ... - maxMsgSize := 1000000 - reader := bufio.NewReaderSize(cs.wal.fp, maxMsgSize) - for i := 0; i < nLines; i++ { - msgBytes, err := reader.ReadBytes('\n') - if err == io.EOF { - log.Warn("Replay: EOF", "bytes", string(msgBytes)) - break - } else if err != nil { - return err - } else if len(msgBytes) == 0 { - log.Warn("Replay: msg bytes is 0") - continue - } else if len(msgBytes) == 1 && msgBytes[0] == '\n' { - log.Warn("Replay: new line") - continue - } - // the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it - if !beginning && i == 1 { - log.Warn("Replay: not beginning and 1") - continue + for { + line, err := gr.ReadLine() + if err != nil { + if err == io.EOF { + break + } else { + return err + } } - // NOTE: since the priv key is set when the msgs are received // it will attempt to eg double sign but we can just ignore it // since the votes will be replayed and we'll get to the next step - if err := cs.readReplayMessage(msgBytes, nil); err != nil { + if err := cs.readReplayMessage([]byte(line), nil); err != nil { return err } } @@ -245,6 +213,7 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm. func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { pb.cs.Stop() + pb.cs.Wait() newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool) newCS.SetEventSwitch(pb.cs.evsw) @@ -376,3 +345,28 @@ func (pb *playback) replayConsoleLoop() int { } return 0 } + +//-------------------------------------------------------------------------------- + +// Parses marker lines of the form: +// #HEIGHT: 12345 +func makeHeightSearchFunc(height int) auto.SearchFunc { + return func(line string) (int, error) { + line = strings.TrimRight(line, "\n") + parts := strings.Split(line, " ") + if len(parts) != 2 { + return -1, errors.New("Line did not have 2 parts") + } + i, err := strconv.Atoi(parts[1]) + if err != nil { + return -1, errors.New("Failed to parse INFO: " + err.Error()) + } + if height < i { + return 1, nil + } else if height == i { + return 0, nil + } else { + return -1, nil + } + } +} diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 1e7c1e810..b3626c869 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -17,18 +17,18 @@ import ( var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data") // the priv validator changes step at these lines for a block with 1 val and 1 part -var baseStepChanges = []int{2, 5, 7} +var baseStepChanges = []int{3, 6, 8} // test recovery from each line in each testCase var testCases = []*testCase{ newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part) newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part - newTestCase("small_block2", []int{2, 7, 9}), // small block with txs across 3 smaller block parts + newTestCase("small_block2", []int{3, 8, 10}), // small block with txs across 3 smaller block parts } type testCase struct { name string - log string //full cswal + log string //full cs wal stepMap map[int]int8 // map lines of log to privval step proposeLine int @@ -71,21 +71,20 @@ func readWAL(p string) string { return string(b) } -func writeWAL(log string) string { - fmt.Println("writing", log) - // write the needed wal to file - f, err := ioutil.TempFile(os.TempDir(), "replay_test_") +func writeWAL(walMsgs string) string { + tempDir := os.TempDir() + walDir := tempDir + "/wal" + RandStr(12) + // Create WAL directory + err := EnsureDir(walDir, 0700) if err != nil { panic(err) } - - _, err = f.WriteString(log) + // Write the needed WAL to file + err = WriteFile(walDir+"/wal", []byte(walMsgs), 0600) if err != nil { panic(err) } - name := f.Name() - f.Close() - return name + return walDir } func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) { @@ -97,10 +96,10 @@ func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) { } } -func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}, +func runReplayTest(t *testing.T, cs *ConsensusState, walDir string, newBlockCh chan interface{}, thisCase *testCase, i int) { - cs.config.Set("cswal", fileName) + cs.config.Set("cs_wal_dir", walDir) cs.Start() // Wait to make a new block. // This is just a signal that we haven't halted; its not something contained in the WAL itself. @@ -108,6 +107,7 @@ func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh // should eventually be followed by a new block, or else something is wrong waitForBlock(newBlockCh, thisCase, i) cs.Stop() + cs.Wait() } func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { @@ -123,7 +123,7 @@ func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*Consensu lastMsg := split[nLines] // we write those lines up to (not including) one with the signature - fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n") + walDir := writeWAL(strings.Join(split[:nLines], "\n") + "\n") cs := fixedConsensusStateDummy() @@ -135,7 +135,7 @@ func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*Consensu newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) - return cs, newBlockCh, lastMsg, fileName + return cs, newBlockCh, lastMsg, walDir } //----------------------------------------------- @@ -146,8 +146,8 @@ func TestReplayCrashAfterWrite(t *testing.T) { for _, thisCase := range testCases { split := strings.Split(thisCase.log, "\n") for i := 0; i < len(split)-1; i++ { - cs, newBlockCh, _, f := setupReplayTest(thisCase, i+1, true) - runReplayTest(t, cs, f, newBlockCh, thisCase, i+1) + cs, newBlockCh, _, walDir := setupReplayTest(thisCase, i+1, true) + runReplayTest(t, cs, walDir, newBlockCh, thisCase, i+1) } } } @@ -159,10 +159,10 @@ func TestReplayCrashAfterWrite(t *testing.T) { func TestReplayCrashBeforeWritePropose(t *testing.T) { for _, thisCase := range testCases { lineNum := thisCase.proposeLine - cs, newBlockCh, proposalMsg, f := setupReplayTest(thisCase, lineNum, false) // propose + cs, newBlockCh, proposalMsg, walDir := setupReplayTest(thisCase, lineNum, false) // propose // Set LastSig var err error - var msg ConsensusLogMessage + var msg TimedWALMessage wire.ReadJSON(&msg, []byte(proposalMsg), &err) proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) if err != nil { @@ -170,18 +170,18 @@ func TestReplayCrashBeforeWritePropose(t *testing.T) { } cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) cs.privValidator.LastSignature = proposal.Proposal.Signature - runReplayTest(t, cs, f, newBlockCh, thisCase, lineNum) + runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum) } } func TestReplayCrashBeforeWritePrevote(t *testing.T) { for _, thisCase := range testCases { lineNum := thisCase.prevoteLine - cs, newBlockCh, voteMsg, f := setupReplayTest(thisCase, lineNum, false) // prevote + cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // prevote types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) { // Set LastSig var err error - var msg ConsensusLogMessage + var msg TimedWALMessage wire.ReadJSON(&msg, []byte(voteMsg), &err) vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) if err != nil { @@ -190,18 +190,18 @@ func TestReplayCrashBeforeWritePrevote(t *testing.T) { cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) cs.privValidator.LastSignature = vote.Vote.Signature }) - runReplayTest(t, cs, f, newBlockCh, thisCase, lineNum) + runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum) } } func TestReplayCrashBeforeWritePrecommit(t *testing.T) { for _, thisCase := range testCases { lineNum := thisCase.precommitLine - cs, newBlockCh, voteMsg, f := setupReplayTest(thisCase, lineNum, false) // precommit + cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // precommit types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) { // Set LastSig var err error - var msg ConsensusLogMessage + var msg TimedWALMessage wire.ReadJSON(&msg, []byte(voteMsg), &err) vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) if err != nil { @@ -210,6 +210,6 @@ func TestReplayCrashBeforeWritePrecommit(t *testing.T) { cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) cs.privValidator.LastSignature = vote.Vote.Signature }) - runReplayTest(t, cs, f, newBlockCh, thisCase, lineNum) + runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum) } } diff --git a/consensus/state.go b/consensus/state.go index b8052e9f2..e9dd2f974 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -304,8 +304,15 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart() - err := cs.OpenWAL(cs.config.GetString("cswal")) + walDir := cs.config.GetString("cs_wal_dir") + err := EnsureDir(walDir, 0700) if err != nil { + log.Error("Error ensuring ConsensusState wal dir", "error", err.Error()) + return err + } + err = cs.OpenWAL(walDir) + if err != nil { + log.Error("Error loading ConsensusState wal", "error", err.Error()) return err } @@ -343,16 +350,17 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.BaseService.OnStop() + // Make BaseService.Wait() wait until cs.wal.Wait() if cs.wal != nil && cs.IsRunning() { cs.wal.Wait() } } // Open file to log all consensus messages and timeouts for deterministic accountability -func (cs *ConsensusState) OpenWAL(file string) (err error) { +func (cs *ConsensusState) OpenWAL(walDir string) (err error) { cs.mtx.Lock() defer cs.mtx.Unlock() - wal, err := NewWAL(file, cs.config.GetBool("cswal_light")) + wal, err := NewWAL(walDir, cs.config.GetBool("cs_wal_light")) if err != nil { return err } @@ -658,7 +666,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // close wal now that we're done writing to it if cs.wal != nil { - cs.wal.Close() + cs.wal.Stop() } return } diff --git a/consensus/test_data/README.md b/consensus/test_data/README.md index e3bfca70b..8d14b8e68 100644 --- a/consensus/test_data/README.md +++ b/consensus/test_data/README.md @@ -2,7 +2,6 @@ The easiest way to generate this data is to copy `~/.tendermint_test/somedir/*` to `~/.tendermint` and to run a local node. -Be sure to set the db to "leveldb" to create a cswal file in `~/.tendermint/data/cswal`. If you need to change the signatures, you can use a script as follows: The privBytes comes from `config/tendermint_test/...`: diff --git a/consensus/test_data/empty_block.cswal b/consensus/test_data/empty_block.cswal index 65800c429..29361c53e 100644 --- a/consensus/test_data/empty_block.cswal +++ b/consensus/test_data/empty_block.cswal @@ -1,3 +1,4 @@ +#HEIGHT: 1 {"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]} {"time":"2016-04-03T11:23:54.388Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-04-03T11:23:54.388Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"pol_round":-1,"signature":"3A2ECD5023B21EC144EC16CFF1B992A4321317B83EEDD8969FDFEA6EB7BF4389F38DDA3E7BB109D63A07491C16277A197B241CF1F05F5E485C59882ECACD9E07"}}],"peer_key":""}]} diff --git a/consensus/test_data/small_block1.cswal b/consensus/test_data/small_block1.cswal index 738f7951a..232bf20f2 100644 --- a/consensus/test_data/small_block1.cswal +++ b/consensus/test_data/small_block1.cswal @@ -1,3 +1,4 @@ +#HEIGHT: 1 {"time":"2016-10-11T15:29:08.113Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]} {"time":"2016-10-11T15:29:08.115Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-10-11T15:29:08.115Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"A2C0B5D384DFF2692FF679D00CEAE93A55DCDD1A"},"pol_round":-1,"signature":"116961B715FB54C09983209F7F3BFD95C7DA2E0D7AB9CFE9F0750F2402E2AEB715CFD55FB2C5DB88F485391D426A48705E0474AB94328463290D086D88BAD704"}}],"peer_key":""}]} diff --git a/consensus/test_data/small_block2.cswal b/consensus/test_data/small_block2.cswal index fdb07b0b2..84d860223 100644 --- a/consensus/test_data/small_block2.cswal +++ b/consensus/test_data/small_block2.cswal @@ -1,3 +1,4 @@ +#HEIGHT: 1 {"time":"2016-10-11T16:21:23.438Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]} {"time":"2016-10-11T16:21:23.440Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} {"time":"2016-10-11T16:21:23.440Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":3,"hash":"88BC082C86DED0A5E2BBC3677B610D155FEDBCEA"},"pol_round":-1,"signature":"8F74F7032E50DFBC17E8B42DD15FD54858B45EEB1B8DAF6432AFBBB1333AC1E850290DE82DF613A10430EB723023527498D45C106FD2946FEF03A9C8B301020B"}}],"peer_key":""}]} diff --git a/consensus/wal.go b/consensus/wal.go index aa2dabe32..8460a8322 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -1,10 +1,9 @@ package consensus import ( - "bufio" - "os" "time" + auto "github.com/tendermint/go-autofile" . "github.com/tendermint/go-common" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" @@ -13,15 +12,15 @@ import ( //-------------------------------------------------------- // types and functions for savings consensus messages -type ConsensusLogMessage struct { - Time time.Time `json:"time"` - Msg ConsensusLogMessageInterface `json:"msg"` +type TimedWALMessage struct { + Time time.Time `json:"time"` + Msg WALMessage `json:"msg"` } -type ConsensusLogMessageInterface interface{} +type WALMessage interface{} var _ = wire.RegisterInterface( - struct{ ConsensusLogMessageInterface }{}, + struct{ WALMessage }{}, wire.ConcreteType{types.EventDataRoundState{}, 0x01}, wire.ConcreteType{msgInfo{}, 0x02}, wire.ConcreteType{timeoutInfo{}, 0x03}, @@ -35,119 +34,59 @@ var _ = wire.RegisterInterface( // TODO: currently the wal is overwritten during replay catchup // give it a mode so it's either reading or appending - must read to end to start appending again type WAL struct { - fp *os.File - exists bool // if the file already existed (restarted process) - - done chan struct{} + BaseService + group *auto.Group light bool // ignore block parts } -func NewWAL(file string, light bool) (*WAL, error) { - var walExists bool - if _, err := os.Stat(file); err == nil { - walExists = true +func NewWAL(walDir string, light bool) (*WAL, error) { + head, err := auto.OpenAutoFile(walDir + "/wal") + if err != nil { + return nil, err } - fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) + group, err := auto.OpenGroup(head) if err != nil { return nil, err } - return &WAL{ - fp: fp, - exists: walExists, - done: make(chan struct{}), - light: light, - }, nil + wal := &WAL{ + group: group, + light: light, + } + wal.BaseService = *NewBaseService(log, "WAL", wal) + return wal, nil } -func (wal *WAL) Exists() bool { - if wal == nil { - log.Warn("consensus msg log is nil") - return false - } - return wal.exists +func (wal *WAL) OnStop() { + wal.BaseService.OnStop() + wal.group.Head.Close() + wal.group.Close() } // called in newStep and for each pass in receiveRoutine -func (wal *WAL) Save(clm ConsensusLogMessageInterface) { +func (wal *WAL) Save(wmsg WALMessage) { if wal == nil { return } if wal.light { // in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts) - if mi, ok := clm.(msgInfo); ok { + if mi, ok := wmsg.(msgInfo); ok { _ = mi if mi.PeerKey != "" { return } } } - var clmBytes = wire.JSONBytes(ConsensusLogMessage{time.Now(), clm}) - var n int - var err error - wire.WriteTo(append(clmBytes, byte('\n')), wal.fp, &n, &err) // one message per line - if err != nil { - PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, clm)) - } -} - -// Must not be called concurrently with a write. -func (wal *WAL) Close() { - if wal != nil { - wal.fp.Close() + // Write #HEIGHT: XYZ if new height + if edrs, ok := wmsg.(types.EventDataRoundState); ok { + if edrs.Step == RoundStepNewHeight.String() { + wal.group.WriteLine(Fmt("#HEIGHT: %v", edrs.Height)) + } } - wal.done <- struct{}{} -} - -func (wal *WAL) Wait() { - <-wal.done -} - -// TODO: remove once we stop supporting older golang version -const ( - ioSeekStart = 0 - ioSeekCurrent = 1 - ioSeekEnd = 2 -) - -func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { - var current int64 - // start at the end - current, err = wal.fp.Seek(0, ioSeekEnd) + // Write the wal message + var wmsgBytes = wire.JSONBytes(TimedWALMessage{time.Now(), wmsg}) + err := wal.group.WriteLine(string(wmsgBytes)) if err != nil { - return - } - - // backup until we find the the right line - // current is how far we are from the beginning - for { - current -= 1 - if current < 0 { - wal.fp.Seek(0, ioSeekStart) // back to beginning - return - } - // backup one and read a new byte - if _, err = wal.fp.Seek(current, ioSeekStart); err != nil { - return - } - b := make([]byte, 1) - if _, err = wal.fp.Read(b); err != nil { - return - } - if b[0] == '\n' || len(b) == 0 { - nLines += 1 - // read a full line - reader := bufio.NewReader(wal.fp) - lineBytes, _ := reader.ReadBytes('\n') - if len(lineBytes) == 0 { - continue - } - - if found(lineBytes) { - wal.fp.Seek(0, ioSeekCurrent) // (?) - wal.fp.Seek(current, ioSeekStart) - return - } - } + PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, wmsg)) } } diff --git a/consensus/wal_test.go b/consensus/wal_test.go deleted file mode 100644 index 648692e40..000000000 --- a/consensus/wal_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package consensus - -import ( - "io/ioutil" - "os" - "path" - "strings" - "testing" - - . "github.com/tendermint/go-common" -) - -var testTxt = `{"time":"2016-01-16T04:42:00.390Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrevote"}]} -{"time":"2016-01-16T04:42:00.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":1,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"4CC6845A128E723A299B470CCBB2A158612AA51321447F6492F3DA57D135C27FCF4124B3B19446A248252BDA45B152819C76AAA5FD35E1C07091885CE6955E05"}}],"peer_key":""}]} -{"time":"2016-01-16T04:42:00.392Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrecommit"}]} -{"time":"2016-01-16T04:42:00.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":2,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"1B9924E010F47E0817695DFE462C531196E5A12632434DE12180BBA3EFDAD6B3960FDB9357AFF085EB61729A7D4A6AD8408555D7569C87D9028F280192FD4E05"}}],"peer_key":""}]} -{"time":"2016-01-16T04:42:00.393Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepCommit"}]} -{"time":"2016-01-16T04:42:00.395Z","msg":[1,{"height":28220,"round":0,"step":"RoundStepNewHeight"}]}` - -func TestSeek(t *testing.T) { - f, err := ioutil.TempFile(os.TempDir(), "seek_test_") - if err != nil { - panic(err) - } - - stat, _ := f.Stat() - name := stat.Name() - - _, err = f.WriteString(testTxt) - if err != nil { - panic(err) - } - f.Close() - - wal, err := NewWAL(path.Join(os.TempDir(), name), config.GetBool("cswal_light")) - if err != nil { - panic(err) - } - - keyWord := "Precommit" - n, err := wal.SeekFromEnd(func(b []byte) bool { - if strings.Contains(string(b), keyWord) { - return true - } - return false - }) - if err != nil { - panic(err) - } - - // confirm n - spl := strings.Split(testTxt, "\n") - var i int - var s string - for i, s = range spl { - if strings.Contains(s, keyWord) { - break - } - } - // n is lines from the end. - spl = spl[i:] - if n != len(spl) { - panic(Fmt("Wrong nLines. Got %d, expected %d", n, len(spl))) - } - - b, err := ioutil.ReadAll(wal.fp) - if err != nil { - panic(err) - } - // first char is a \n - spl2 := strings.Split(strings.Trim(string(b), "\n"), "\n") - for i, s := range spl { - if s != spl2[i] { - panic(Fmt("Mismatch. Got %s, expected %s", spl2[i], s)) - } - } - -} diff --git a/glide.lock b/glide.lock index 8210de19e..859794930 100644 --- a/glide.lock +++ b/glide.lock @@ -52,7 +52,7 @@ imports: - name: github.com/tendermint/go-flowrate version: a20c98e61957faa93b4014fbd902f20ab9317a6a - name: github.com/tendermint/go-autofile - version: c26b857900009ac81c78c1bc03f85e0c8e47818a + version: 916f3d789b6afaf7bfe161aeec391c8a35e354a8 - name: github.com/tendermint/go-clist version: 3baa390bbaf7634251c42ad69a8682e7e3990552 - name: github.com/tendermint/go-common diff --git a/mempool/mempool.go b/mempool/mempool.go index 66df850ea..a5426991e 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -85,10 +85,16 @@ func NewMempool(config cfg.Config, proxyAppConn proxy.AppConnMempool) *Mempool { } func (mem *Mempool) initWAL() { - walFileName := mem.config.GetString("mempool_wal") - if walFileName != "" { - af, err := auto.OpenAutoFile(walFileName) + walDir := mem.config.GetString("mempool_wal_dir") + if walDir != "" { + err := EnsureDir(walDir, 0700) if err != nil { + log.Error("Error ensuring Mempool wal dir", "error", err) + PanicSanity(err) + } + af, err := auto.OpenAutoFile(walDir + "/wal") + if err != nil { + log.Error("Error opening Mempool wal file", "error", err) PanicSanity(err) } mem.wal = af diff --git a/node/node.go b/node/node.go index bb191b55e..7c4a08e3c 100644 --- a/node/node.go +++ b/node/node.go @@ -52,8 +52,6 @@ func NewNodeDefault(config cfg.Config) *Node { func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator) *Node { - EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here - // Get BlockStore blockStoreDB := dbm.NewDB("blockstore", config.GetString("db_backend"), config.GetString("db_dir")) blockStore := bc.NewBlockStore(blockStoreDB) @@ -414,12 +412,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { return consensusState } -func RunReplayConsole(config cfg.Config) { - walFile := config.GetString("cswal") - if walFile == "" { - Exit("cswal file name not set in tendermint config") - } - +func RunReplayConsole(config cfg.Config, walFile string) { consensusState := newConsensusState(config) if err := consensusState.ReplayConsole(walFile); err != nil { @@ -427,12 +420,7 @@ func RunReplayConsole(config cfg.Config) { } } -func RunReplay(config cfg.Config) { - walFile := config.GetString("cswal") - if walFile == "" { - Exit("cswal file name not set in tendermint config") - } - +func RunReplay(config cfg.Config, walFile string) { consensusState := newConsensusState(config) if err := consensusState.ReplayMessages(walFile); err != nil {