Browse Source

replay: larger read buffer

pull/296/head
Ethan Buchman 8 years ago
parent
commit
3c18d841fa
3 changed files with 20 additions and 3 deletions
  1. +11
    -0
      consensus/common_test.go
  2. +8
    -2
      consensus/replay.go
  3. +1
    -1
      consensus/replay_test.go

+ 11
- 0
consensus/common_test.go View File

@ -18,6 +18,7 @@ import (
tmsp "github.com/tendermint/tmsp/types" tmsp "github.com/tendermint/tmsp/types"
"github.com/tendermint/tmsp/example/counter" "github.com/tendermint/tmsp/example/counter"
"github.com/tendermint/tmsp/example/dummy"
) )
var config cfg.Config // NOTE: must be reset for each _test.go file var config cfg.Config // NOTE: must be reset for each _test.go file
@ -320,6 +321,16 @@ func fixedConsensusState() *ConsensusState {
return cs return cs
} }
func fixedConsensusStateDummy() *ConsensusState {
stateDB := dbm.NewMemDB()
state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
privValidator.Reset()
cs := newConsensusState(state, privValidator, dummy.NewDummyApplication())
return cs
}
func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState { func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState {
// Get BlockStore // Get BlockStore
blockDB := dbm.NewMemDB() blockDB := dbm.NewMemDB()


+ 8
- 2
consensus/replay.go View File

@ -116,20 +116,26 @@ func (cs *ConsensusState) catchupReplay(height int) error {
// now we can replay the latest nLines on consensus state // now we can replay the latest nLines on consensus state
// note we can't use scan because we've already been reading from the file // note we can't use scan because we've already been reading from the file
reader := bufio.NewReader(cs.wal.fp)
// XXX: if a msg is too big we need to find out why or increase this for that case ...
maxMsgSize := 1000000
reader := bufio.NewReaderSize(cs.wal.fp, maxMsgSize)
for i := 0; i < nLines; i++ { for i := 0; i < nLines; i++ {
msgBytes, err := reader.ReadBytes('\n') msgBytes, err := reader.ReadBytes('\n')
if err == io.EOF { if err == io.EOF {
log.Warn("Replay: EOF", "bytes", string(msgBytes))
break break
} else if err != nil { } else if err != nil {
return err return err
} else if len(msgBytes) == 0 { } else if len(msgBytes) == 0 {
log.Warn("Replay: msg bytes is 0")
continue continue
} else if len(msgBytes) == 1 && msgBytes[0] == '\n' { } else if len(msgBytes) == 1 && msgBytes[0] == '\n' {
log.Warn("Replay: new line")
continue continue
} }
// the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it // the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it
if !beginning && i == 1 { if !beginning && i == 1 {
log.Warn("Replay: not beginning and 1")
continue continue
} }
@ -140,7 +146,7 @@ func (cs *ConsensusState) catchupReplay(height int) error {
return err return err
} }
} }
log.Notice("Done catchup replay")
log.Notice("Replay: Done")
return nil return nil
} }


+ 1
- 1
consensus/replay_test.go View File

@ -114,7 +114,7 @@ func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*Consensu
// we write those lines up to (not including) one with the signature // we write those lines up to (not including) one with the signature
fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n") fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n")
cs := fixedConsensusState()
cs := fixedConsensusStateDummy()
// set the last step according to when we crashed vs the wal // set the last step according to when we crashed vs the wal
cs.privValidator.LastHeight = 1 // first block cs.privValidator.LastHeight = 1 // first block


Loading…
Cancel
Save