Browse Source

Merge branch 'consensus_wal_autofile' into develop_old

pull/317/head
Ethan Buchman 8 years ago
parent
commit
1fedf5b332
16 changed files with 158 additions and 297 deletions
  1. +3
    -3
      cmd/tendermint/main.go
  2. +1
    -1
      cmd/tendermint/reset_priv_validator.go
  3. +4
    -3
      config/tendermint/config.go
  4. +4
    -3
      config/tendermint_test/config.go
  5. +58
    -64
      consensus/replay.go
  6. +27
    -27
      consensus/replay_test.go
  7. +12
    -4
      consensus/state.go
  8. +0
    -1
      consensus/test_data/README.md
  9. +1
    -0
      consensus/test_data/empty_block.cswal
  10. +1
    -0
      consensus/test_data/small_block1.cswal
  11. +1
    -0
      consensus/test_data/small_block2.cswal
  12. +34
    -95
      consensus/wal.go
  13. +0
    -78
      consensus/wal_test.go
  14. +1
    -1
      glide.lock
  15. +9
    -3
      mempool/mempool.go
  16. +2
    -14
      node/node.go

+ 3
- 3
cmd/tendermint/main.go View File

@ -41,10 +41,10 @@ Commands:
case "node":
node.RunNode(config)
case "replay":
if len(args) > 1 && args[1] == "console" {
node.RunReplayConsole(config)
if len(args) > 2 && args[1] == "console" {
node.RunReplayConsole(config, args[2])
} else {
node.RunReplay(config)
node.RunReplay(config, args[1])
}
case "init":
init_files()


+ 1
- 1
cmd/tendermint/reset_priv_validator.go View File

@ -11,7 +11,7 @@ import (
func reset_all() {
reset_priv_validator()
os.RemoveAll(config.GetString("db_dir"))
os.Remove(config.GetString("cswal"))
os.RemoveAll(config.GetString("cs_wal_dir"))
}
// NOTE: this is totally unsafe.


+ 4
- 3
config/tendermint/config.go View File

@ -22,6 +22,7 @@ func getTMRoot(rootDir string) string {
func initTMRoot(rootDir string) {
rootDir = getTMRoot(rootDir)
EnsureDir(rootDir, 0700)
EnsureDir(rootDir+"/data", 0700)
configFilePath := path.Join(rootDir, "config.toml")
@ -68,8 +69,8 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:46657")
mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/data/cswal")
mapConfig.SetDefault("cswal_light", false)
mapConfig.SetDefault("cs_wal_dir", rootDir+"/data/cs.wal")
mapConfig.SetDefault("cs_wal_light", false)
mapConfig.SetDefault("filter_peers", false)
mapConfig.SetDefault("block_size", 10000)
@ -84,7 +85,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("mempool_recheck", true)
mapConfig.SetDefault("mempool_recheck_empty", true)
mapConfig.SetDefault("mempool_broadcast", true)
mapConfig.SetDefault("mempool_wal", rootDir+"/data/mempool_wal")
mapConfig.SetDefault("mempool_wal_dir", rootDir+"/data/mempool.wal")
return mapConfig
}


+ 4
- 3
config/tendermint_test/config.go View File

@ -33,6 +33,7 @@ func initTMRoot(rootDir string) {
}
// Create new dir
EnsureDir(rootDir, 0700)
EnsureDir(rootDir+"/data", 0700)
configFilePath := path.Join(rootDir, "config.toml")
genesisFilePath := path.Join(rootDir, "genesis.json")
@ -81,8 +82,8 @@ func ResetConfig(localPath string) cfg.Config {
mapConfig.SetDefault("rpc_laddr", "tcp://0.0.0.0:36657")
mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/data/cswal")
mapConfig.SetDefault("cswal_light", false)
mapConfig.SetDefault("cs_wal_dir", rootDir+"/data/cs.wal")
mapConfig.SetDefault("cs_wal_light", false)
mapConfig.SetDefault("filter_peers", false)
mapConfig.SetDefault("block_size", 10000)
@ -97,7 +98,7 @@ func ResetConfig(localPath string) cfg.Config {
mapConfig.SetDefault("mempool_recheck", true)
mapConfig.SetDefault("mempool_recheck_empty", true)
mapConfig.SetDefault("mempool_broadcast", true)
mapConfig.SetDefault("mempool_wal", "")
mapConfig.SetDefault("mempool_wal_dir", "")
return mapConfig
}


+ 58
- 64
consensus/replay.go View File

@ -11,6 +11,7 @@ import (
"strings"
"time"
auto "github.com/tendermint/go-autofile"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
@ -18,12 +19,17 @@ import (
"github.com/tendermint/tendermint/types"
)
// unmarshal and apply a single message to the consensus state
// Unmarshal and apply a single message to the consensus state
// as if it were received in receiveRoutine
// Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running
func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error {
// Skip over empty and meta lines
if len(msgBytes) == 0 || msgBytes[0] == '#' {
return nil
}
var err error
var msg ConsensusLogMessage
var msg TimedWALMessage
wire.ReadJSON(&msg, msgBytes, &err)
if err != nil {
fmt.Println("MsgBytes:", msgBytes, string(msgBytes))
@ -70,7 +76,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte
log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.handleTimeout(m, cs.RoundState)
default:
return fmt.Errorf("Replay: Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg))
return fmt.Errorf("Replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg))
}
return nil
}
@ -78,83 +84,45 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte
// replay only those messages since the last block.
// timeoutRoutine should run concurrently to read off tickChan
func (cs *ConsensusState) catchupReplay(csHeight int) error {
if !cs.wal.Exists() {
return nil
}
// set replayMode
cs.replayMode = true
defer func() { cs.replayMode = false }()
// starting from end of file,
// read messages until a new height is found
var walHeight int
nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool {
var err error
var msg ConsensusLogMessage
wire.ReadJSON(&msg, lineBytes, &err)
if err != nil {
panic(Fmt("Failed to read cs_msg_log json: %v", err))
}
m, ok := msg.Msg.(types.EventDataRoundState)
walHeight = m.Height
if ok && m.Step == RoundStepNewHeight.String() {
return true
}
return false
})
// Ensure that height+1 doesn't exist
gr, found, err := cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight+1))
if found {
return errors.New(Fmt("WAL should not contain height %d.", csHeight+1))
}
if gr != nil {
gr.Close()
}
// Search for height marker
gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight))
if err != nil {
return err
}
// ensure the height matches
if walHeight != csHeight {
var err error
if walHeight > csHeight {
err = errors.New(Fmt("WAL height (%d) exceeds cs height (%d). Is your cs.state corrupted?", walHeight, csHeight))
} else {
log.Notice("Replay: nothing to do", "cs.height", csHeight, "wal.height", walHeight)
}
return err
if !found {
return errors.New(Fmt("WAL does not contain height %d.", csHeight))
}
defer gr.Close()
var beginning bool // if we had to go back to the beginning
if c, _ := cs.wal.fp.Seek(0, 1); c == 0 {
beginning = true
}
log.Notice("Catchup by replaying consensus messages", "height", csHeight)
log.Notice("Catchup by replaying consensus messages", "n", nLines, "height", walHeight)
// now we can replay the latest nLines on consensus state
// note we can't use scan because we've already been reading from the file
// XXX: if a msg is too big we need to find out why or increase this for that case ...
maxMsgSize := 1000000
reader := bufio.NewReaderSize(cs.wal.fp, maxMsgSize)
for i := 0; i < nLines; i++ {
msgBytes, err := reader.ReadBytes('\n')
if err == io.EOF {
log.Warn("Replay: EOF", "bytes", string(msgBytes))
break
} else if err != nil {
return err
} else if len(msgBytes) == 0 {
log.Warn("Replay: msg bytes is 0")
continue
} else if len(msgBytes) == 1 && msgBytes[0] == '\n' {
log.Warn("Replay: new line")
continue
}
// the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it
if !beginning && i == 1 {
log.Warn("Replay: not beginning and 1")
continue
for {
line, err := gr.ReadLine()
if err != nil {
if err == io.EOF {
break
} else {
return err
}
}
// NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step
if err := cs.readReplayMessage(msgBytes, nil); err != nil {
if err := cs.readReplayMessage([]byte(line), nil); err != nil {
return err
}
}
@ -245,6 +213,7 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.
func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
pb.cs.Stop()
pb.cs.Wait()
newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool)
newCS.SetEventSwitch(pb.cs.evsw)
@ -376,3 +345,28 @@ func (pb *playback) replayConsoleLoop() int {
}
return 0
}
//--------------------------------------------------------------------------------
// Parses marker lines of the form:
// #HEIGHT: 12345
func makeHeightSearchFunc(height int) auto.SearchFunc {
return func(line string) (int, error) {
line = strings.TrimRight(line, "\n")
parts := strings.Split(line, " ")
if len(parts) != 2 {
return -1, errors.New("Line did not have 2 parts")
}
i, err := strconv.Atoi(parts[1])
if err != nil {
return -1, errors.New("Failed to parse INFO: " + err.Error())
}
if height < i {
return 1, nil
} else if height == i {
return 0, nil
} else {
return -1, nil
}
}
}

+ 27
- 27
consensus/replay_test.go View File

@ -17,18 +17,18 @@ import (
var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data")
// the priv validator changes step at these lines for a block with 1 val and 1 part
var baseStepChanges = []int{2, 5, 7}
var baseStepChanges = []int{3, 6, 8}
// test recovery from each line in each testCase
var testCases = []*testCase{
newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part)
newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part
newTestCase("small_block2", []int{2, 7, 9}), // small block with txs across 3 smaller block parts
newTestCase("small_block2", []int{3, 8, 10}), // small block with txs across 3 smaller block parts
}
type testCase struct {
name string
log string //full cswal
log string //full cs wal
stepMap map[int]int8 // map lines of log to privval step
proposeLine int
@ -71,21 +71,20 @@ func readWAL(p string) string {
return string(b)
}
func writeWAL(log string) string {
fmt.Println("writing", log)
// write the needed wal to file
f, err := ioutil.TempFile(os.TempDir(), "replay_test_")
func writeWAL(walMsgs string) string {
tempDir := os.TempDir()
walDir := tempDir + "/wal" + RandStr(12)
// Create WAL directory
err := EnsureDir(walDir, 0700)
if err != nil {
panic(err)
}
_, err = f.WriteString(log)
// Write the needed WAL to file
err = WriteFile(walDir+"/wal", []byte(walMsgs), 0600)
if err != nil {
panic(err)
}
name := f.Name()
f.Close()
return name
return walDir
}
func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) {
@ -97,10 +96,10 @@ func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) {
}
}
func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{},
func runReplayTest(t *testing.T, cs *ConsensusState, walDir string, newBlockCh chan interface{},
thisCase *testCase, i int) {
cs.config.Set("cswal", fileName)
cs.config.Set("cs_wal_dir", walDir)
cs.Start()
// Wait to make a new block.
// This is just a signal that we haven't halted; its not something contained in the WAL itself.
@ -108,6 +107,7 @@ func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh
// should eventually be followed by a new block, or else something is wrong
waitForBlock(newBlockCh, thisCase, i)
cs.Stop()
cs.Wait()
}
func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) {
@ -123,7 +123,7 @@ func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*Consensu
lastMsg := split[nLines]
// we write those lines up to (not including) one with the signature
fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n")
walDir := writeWAL(strings.Join(split[:nLines], "\n") + "\n")
cs := fixedConsensusStateDummy()
@ -135,7 +135,7 @@ func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*Consensu
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
return cs, newBlockCh, lastMsg, fileName
return cs, newBlockCh, lastMsg, walDir
}
//-----------------------------------------------
@ -146,8 +146,8 @@ func TestReplayCrashAfterWrite(t *testing.T) {
for _, thisCase := range testCases {
split := strings.Split(thisCase.log, "\n")
for i := 0; i < len(split)-1; i++ {
cs, newBlockCh, _, f := setupReplayTest(thisCase, i+1, true)
runReplayTest(t, cs, f, newBlockCh, thisCase, i+1)
cs, newBlockCh, _, walDir := setupReplayTest(thisCase, i+1, true)
runReplayTest(t, cs, walDir, newBlockCh, thisCase, i+1)
}
}
}
@ -159,10 +159,10 @@ func TestReplayCrashAfterWrite(t *testing.T) {
func TestReplayCrashBeforeWritePropose(t *testing.T) {
for _, thisCase := range testCases {
lineNum := thisCase.proposeLine
cs, newBlockCh, proposalMsg, f := setupReplayTest(thisCase, lineNum, false) // propose
cs, newBlockCh, proposalMsg, walDir := setupReplayTest(thisCase, lineNum, false) // propose
// Set LastSig
var err error
var msg ConsensusLogMessage
var msg TimedWALMessage
wire.ReadJSON(&msg, []byte(proposalMsg), &err)
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
if err != nil {
@ -170,18 +170,18 @@ func TestReplayCrashBeforeWritePropose(t *testing.T) {
}
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
cs.privValidator.LastSignature = proposal.Proposal.Signature
runReplayTest(t, cs, f, newBlockCh, thisCase, lineNum)
runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
}
}
func TestReplayCrashBeforeWritePrevote(t *testing.T) {
for _, thisCase := range testCases {
lineNum := thisCase.prevoteLine
cs, newBlockCh, voteMsg, f := setupReplayTest(thisCase, lineNum, false) // prevote
cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // prevote
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) {
// Set LastSig
var err error
var msg ConsensusLogMessage
var msg TimedWALMessage
wire.ReadJSON(&msg, []byte(voteMsg), &err)
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
if err != nil {
@ -190,18 +190,18 @@ func TestReplayCrashBeforeWritePrevote(t *testing.T) {
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
cs.privValidator.LastSignature = vote.Vote.Signature
})
runReplayTest(t, cs, f, newBlockCh, thisCase, lineNum)
runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
}
}
func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
for _, thisCase := range testCases {
lineNum := thisCase.precommitLine
cs, newBlockCh, voteMsg, f := setupReplayTest(thisCase, lineNum, false) // precommit
cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // precommit
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) {
// Set LastSig
var err error
var msg ConsensusLogMessage
var msg TimedWALMessage
wire.ReadJSON(&msg, []byte(voteMsg), &err)
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
if err != nil {
@ -210,6 +210,6 @@ func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
cs.privValidator.LastSignature = vote.Vote.Signature
})
runReplayTest(t, cs, f, newBlockCh, thisCase, lineNum)
runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
}
}

+ 12
- 4
consensus/state.go View File

@ -304,8 +304,15 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) {
func (cs *ConsensusState) OnStart() error {
cs.BaseService.OnStart()
err := cs.OpenWAL(cs.config.GetString("cswal"))
walDir := cs.config.GetString("cs_wal_dir")
err := EnsureDir(walDir, 0700)
if err != nil {
log.Error("Error ensuring ConsensusState wal dir", "error", err.Error())
return err
}
err = cs.OpenWAL(walDir)
if err != nil {
log.Error("Error loading ConsensusState wal", "error", err.Error())
return err
}
@ -343,16 +350,17 @@ func (cs *ConsensusState) startRoutines(maxSteps int) {
func (cs *ConsensusState) OnStop() {
cs.BaseService.OnStop()
// Make BaseService.Wait() wait until cs.wal.Wait()
if cs.wal != nil && cs.IsRunning() {
cs.wal.Wait()
}
}
// Open file to log all consensus messages and timeouts for deterministic accountability
func (cs *ConsensusState) OpenWAL(file string) (err error) {
func (cs *ConsensusState) OpenWAL(walDir string) (err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
wal, err := NewWAL(file, cs.config.GetBool("cswal_light"))
wal, err := NewWAL(walDir, cs.config.GetBool("cs_wal_light"))
if err != nil {
return err
}
@ -658,7 +666,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
// close wal now that we're done writing to it
if cs.wal != nil {
cs.wal.Close()
cs.wal.Stop()
}
return
}


+ 0
- 1
consensus/test_data/README.md View File

@ -2,7 +2,6 @@
The easiest way to generate this data is to copy `~/.tendermint_test/somedir/*` to `~/.tendermint`
and to run a local node.
Be sure to set the db to "leveldb" to create a cswal file in `~/.tendermint/data/cswal`.
If you need to change the signatures, you can use a script as follows:
The privBytes comes from `config/tendermint_test/...`:


+ 1
- 0
consensus/test_data/empty_block.cswal View File

@ -1,3 +1,4 @@
#HEIGHT: 1
{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]}
{"time":"2016-04-03T11:23:54.388Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
{"time":"2016-04-03T11:23:54.388Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"pol_round":-1,"signature":"3A2ECD5023B21EC144EC16CFF1B992A4321317B83EEDD8969FDFEA6EB7BF4389F38DDA3E7BB109D63A07491C16277A197B241CF1F05F5E485C59882ECACD9E07"}}],"peer_key":""}]}


+ 1
- 0
consensus/test_data/small_block1.cswal View File

@ -1,3 +1,4 @@
#HEIGHT: 1
{"time":"2016-10-11T15:29:08.113Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]}
{"time":"2016-10-11T15:29:08.115Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
{"time":"2016-10-11T15:29:08.115Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"A2C0B5D384DFF2692FF679D00CEAE93A55DCDD1A"},"pol_round":-1,"signature":"116961B715FB54C09983209F7F3BFD95C7DA2E0D7AB9CFE9F0750F2402E2AEB715CFD55FB2C5DB88F485391D426A48705E0474AB94328463290D086D88BAD704"}}],"peer_key":""}]}


+ 1
- 0
consensus/test_data/small_block2.cswal View File

@ -1,3 +1,4 @@
#HEIGHT: 1
{"time":"2016-10-11T16:21:23.438Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]}
{"time":"2016-10-11T16:21:23.440Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
{"time":"2016-10-11T16:21:23.440Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":3,"hash":"88BC082C86DED0A5E2BBC3677B610D155FEDBCEA"},"pol_round":-1,"signature":"8F74F7032E50DFBC17E8B42DD15FD54858B45EEB1B8DAF6432AFBBB1333AC1E850290DE82DF613A10430EB723023527498D45C106FD2946FEF03A9C8B301020B"}}],"peer_key":""}]}


+ 34
- 95
consensus/wal.go View File

@ -1,10 +1,9 @@
package consensus
import (
"bufio"
"os"
"time"
auto "github.com/tendermint/go-autofile"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types"
@ -13,15 +12,15 @@ import (
//--------------------------------------------------------
// types and functions for savings consensus messages
type ConsensusLogMessage struct {
Time time.Time `json:"time"`
Msg ConsensusLogMessageInterface `json:"msg"`
type TimedWALMessage struct {
Time time.Time `json:"time"`
Msg WALMessage `json:"msg"`
}
type ConsensusLogMessageInterface interface{}
type WALMessage interface{}
var _ = wire.RegisterInterface(
struct{ ConsensusLogMessageInterface }{},
struct{ WALMessage }{},
wire.ConcreteType{types.EventDataRoundState{}, 0x01},
wire.ConcreteType{msgInfo{}, 0x02},
wire.ConcreteType{timeoutInfo{}, 0x03},
@ -35,119 +34,59 @@ var _ = wire.RegisterInterface(
// TODO: currently the wal is overwritten during replay catchup
// give it a mode so it's either reading or appending - must read to end to start appending again
type WAL struct {
fp *os.File
exists bool // if the file already existed (restarted process)
done chan struct{}
BaseService
group *auto.Group
light bool // ignore block parts
}
func NewWAL(file string, light bool) (*WAL, error) {
var walExists bool
if _, err := os.Stat(file); err == nil {
walExists = true
func NewWAL(walDir string, light bool) (*WAL, error) {
head, err := auto.OpenAutoFile(walDir + "/wal")
if err != nil {
return nil, err
}
fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
group, err := auto.OpenGroup(head)
if err != nil {
return nil, err
}
return &WAL{
fp: fp,
exists: walExists,
done: make(chan struct{}),
light: light,
}, nil
wal := &WAL{
group: group,
light: light,
}
wal.BaseService = *NewBaseService(log, "WAL", wal)
return wal, nil
}
func (wal *WAL) Exists() bool {
if wal == nil {
log.Warn("consensus msg log is nil")
return false
}
return wal.exists
func (wal *WAL) OnStop() {
wal.BaseService.OnStop()
wal.group.Head.Close()
wal.group.Close()
}
// called in newStep and for each pass in receiveRoutine
func (wal *WAL) Save(clm ConsensusLogMessageInterface) {
func (wal *WAL) Save(wmsg WALMessage) {
if wal == nil {
return
}
if wal.light {
// in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
if mi, ok := clm.(msgInfo); ok {
if mi, ok := wmsg.(msgInfo); ok {
_ = mi
if mi.PeerKey != "" {
return
}
}
}
var clmBytes = wire.JSONBytes(ConsensusLogMessage{time.Now(), clm})
var n int
var err error
wire.WriteTo(append(clmBytes, byte('\n')), wal.fp, &n, &err) // one message per line
if err != nil {
PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, clm))
}
}
// Must not be called concurrently with a write.
func (wal *WAL) Close() {
if wal != nil {
wal.fp.Close()
// Write #HEIGHT: XYZ if new height
if edrs, ok := wmsg.(types.EventDataRoundState); ok {
if edrs.Step == RoundStepNewHeight.String() {
wal.group.WriteLine(Fmt("#HEIGHT: %v", edrs.Height))
}
}
wal.done <- struct{}{}
}
func (wal *WAL) Wait() {
<-wal.done
}
// TODO: remove once we stop supporting older golang version
const (
ioSeekStart = 0
ioSeekCurrent = 1
ioSeekEnd = 2
)
func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
var current int64
// start at the end
current, err = wal.fp.Seek(0, ioSeekEnd)
// Write the wal message
var wmsgBytes = wire.JSONBytes(TimedWALMessage{time.Now(), wmsg})
err := wal.group.WriteLine(string(wmsgBytes))
if err != nil {
return
}
// backup until we find the the right line
// current is how far we are from the beginning
for {
current -= 1
if current < 0 {
wal.fp.Seek(0, ioSeekStart) // back to beginning
return
}
// backup one and read a new byte
if _, err = wal.fp.Seek(current, ioSeekStart); err != nil {
return
}
b := make([]byte, 1)
if _, err = wal.fp.Read(b); err != nil {
return
}
if b[0] == '\n' || len(b) == 0 {
nLines += 1
// read a full line
reader := bufio.NewReader(wal.fp)
lineBytes, _ := reader.ReadBytes('\n')
if len(lineBytes) == 0 {
continue
}
if found(lineBytes) {
wal.fp.Seek(0, ioSeekCurrent) // (?)
wal.fp.Seek(current, ioSeekStart)
return
}
}
PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, wmsg))
}
}

+ 0
- 78
consensus/wal_test.go View File

@ -1,78 +0,0 @@
package consensus
import (
"io/ioutil"
"os"
"path"
"strings"
"testing"
. "github.com/tendermint/go-common"
)
var testTxt = `{"time":"2016-01-16T04:42:00.390Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrevote"}]}
{"time":"2016-01-16T04:42:00.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":1,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"4CC6845A128E723A299B470CCBB2A158612AA51321447F6492F3DA57D135C27FCF4124B3B19446A248252BDA45B152819C76AAA5FD35E1C07091885CE6955E05"}}],"peer_key":""}]}
{"time":"2016-01-16T04:42:00.392Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrecommit"}]}
{"time":"2016-01-16T04:42:00.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":2,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"1B9924E010F47E0817695DFE462C531196E5A12632434DE12180BBA3EFDAD6B3960FDB9357AFF085EB61729A7D4A6AD8408555D7569C87D9028F280192FD4E05"}}],"peer_key":""}]}
{"time":"2016-01-16T04:42:00.393Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepCommit"}]}
{"time":"2016-01-16T04:42:00.395Z","msg":[1,{"height":28220,"round":0,"step":"RoundStepNewHeight"}]}`
func TestSeek(t *testing.T) {
f, err := ioutil.TempFile(os.TempDir(), "seek_test_")
if err != nil {
panic(err)
}
stat, _ := f.Stat()
name := stat.Name()
_, err = f.WriteString(testTxt)
if err != nil {
panic(err)
}
f.Close()
wal, err := NewWAL(path.Join(os.TempDir(), name), config.GetBool("cswal_light"))
if err != nil {
panic(err)
}
keyWord := "Precommit"
n, err := wal.SeekFromEnd(func(b []byte) bool {
if strings.Contains(string(b), keyWord) {
return true
}
return false
})
if err != nil {
panic(err)
}
// confirm n
spl := strings.Split(testTxt, "\n")
var i int
var s string
for i, s = range spl {
if strings.Contains(s, keyWord) {
break
}
}
// n is lines from the end.
spl = spl[i:]
if n != len(spl) {
panic(Fmt("Wrong nLines. Got %d, expected %d", n, len(spl)))
}
b, err := ioutil.ReadAll(wal.fp)
if err != nil {
panic(err)
}
// first char is a \n
spl2 := strings.Split(strings.Trim(string(b), "\n"), "\n")
for i, s := range spl {
if s != spl2[i] {
panic(Fmt("Mismatch. Got %s, expected %s", spl2[i], s))
}
}
}

+ 1
- 1
glide.lock View File

@ -52,7 +52,7 @@ imports:
- name: github.com/tendermint/go-flowrate
version: a20c98e61957faa93b4014fbd902f20ab9317a6a
- name: github.com/tendermint/go-autofile
version: c26b857900009ac81c78c1bc03f85e0c8e47818a
version: 916f3d789b6afaf7bfe161aeec391c8a35e354a8
- name: github.com/tendermint/go-clist
version: 3baa390bbaf7634251c42ad69a8682e7e3990552
- name: github.com/tendermint/go-common


+ 9
- 3
mempool/mempool.go View File

@ -85,10 +85,16 @@ func NewMempool(config cfg.Config, proxyAppConn proxy.AppConnMempool) *Mempool {
}
func (mem *Mempool) initWAL() {
walFileName := mem.config.GetString("mempool_wal")
if walFileName != "" {
af, err := auto.OpenAutoFile(walFileName)
walDir := mem.config.GetString("mempool_wal_dir")
if walDir != "" {
err := EnsureDir(walDir, 0700)
if err != nil {
log.Error("Error ensuring Mempool wal dir", "error", err)
PanicSanity(err)
}
af, err := auto.OpenAutoFile(walDir + "/wal")
if err != nil {
log.Error("Error opening Mempool wal file", "error", err)
PanicSanity(err)
}
mem.wal = af


+ 2
- 14
node/node.go View File

@ -52,8 +52,6 @@ func NewNodeDefault(config cfg.Config) *Node {
func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator) *Node {
EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here
// Get BlockStore
blockStoreDB := dbm.NewDB("blockstore", config.GetString("db_backend"), config.GetString("db_dir"))
blockStore := bc.NewBlockStore(blockStoreDB)
@ -414,12 +412,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
return consensusState
}
func RunReplayConsole(config cfg.Config) {
walFile := config.GetString("cswal")
if walFile == "" {
Exit("cswal file name not set in tendermint config")
}
func RunReplayConsole(config cfg.Config, walFile string) {
consensusState := newConsensusState(config)
if err := consensusState.ReplayConsole(walFile); err != nil {
@ -427,12 +420,7 @@ func RunReplayConsole(config cfg.Config) {
}
}
func RunReplay(config cfg.Config) {
walFile := config.GetString("cswal")
if walFile == "" {
Exit("cswal file name not set in tendermint config")
}
func RunReplay(config cfg.Config, walFile string) {
consensusState := newConsensusState(config)
if err := consensusState.ReplayMessages(walFile); err != nil {


Loading…
Cancel
Save