Browse Source

Merge remote-tracking branch 'origin/replay'

pull/192/head
Jae Kwon 9 years ago
parent
commit
0f1cf243fd
15 changed files with 864 additions and 144 deletions
  1. +0
    -5
      DOCKER/docker.sh
  2. +6
    -0
      cmd/tendermint/main.go
  3. +1
    -0
      config/tendermint/config.go
  4. +3
    -3
      config/tendermint_test/config.go
  5. +15
    -0
      consensus/common.go
  6. +20
    -23
      consensus/common_test.go
  7. +1
    -1
      consensus/reactor.go
  8. +360
    -0
      consensus/replay.go
  9. +72
    -0
      consensus/replay_test.go
  10. +71
    -61
      consensus/state.go
  11. +20
    -19
      consensus/state_test.go
  12. +117
    -0
      consensus/wal.go
  13. +76
    -0
      consensus/wal_test.go
  14. +101
    -32
      node/node.go
  15. +1
    -0
      types/events.go

+ 0
- 5
DOCKER/docker.sh View File

@ -8,11 +8,6 @@ docker build -t tmbase -f Dockerfile .
# (config and blockchain data go in here)
docker run --name tmdata --entrypoint /bin/echo tmbase Data-only container for tmnode
# Copy files into the data-only container
# You should stop the containers before running this
# cd $DATA_SRC
# tar cf - . | docker run -i --rm --volumes-from mintdata mint tar xvf - -C /data/tendermint
# Run tendermint node
docker run --name tmnode --volumes-from tmdata -d -p 46656:46656 -p 46657:46657 -e TMSEEDS="goldenalchemist.chaintest.net:46657" -e TMNAME="testnode" -e TMREPO="github.com/tendermint/tendermint" -e TMHEAD="origin/develop" tmbase


+ 6
- 0
cmd/tendermint/main.go View File

@ -35,6 +35,12 @@ Commands:
switch args[0] {
case "node":
node.RunNode()
case "replay":
if len(args) > 1 && args[1] == "console" {
node.RunReplayConsole()
} else {
node.RunReplay()
}
case "init":
init_files()
case "show_validator":


+ 1
- 0
config/tendermint/config.go View File

@ -67,6 +67,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657")
mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/cswal")
return mapConfig
}


+ 3
- 3
config/tendermint_test/config.go View File

@ -57,9 +57,8 @@ func initTMRoot(rootDir string) {
if !FileExists(genesisFilePath) {
MustWriteFile(genesisFilePath, []byte(defaultGenesis), 0644)
}
if !FileExists(privFilePath) {
MustWriteFile(privFilePath, []byte(defaultPrivValidator), 0644)
}
// we always overwrite the priv val
MustWriteFile(privFilePath, []byte(defaultPrivValidator), 0644)
}
func GetConfig(rootDir string) cfg.Config {
@ -92,6 +91,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657")
mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/cswal")
return mapConfig
}


+ 15
- 0
consensus/common.go View File

@ -0,0 +1,15 @@
package consensus
import (
"github.com/tendermint/go-events"
)
// NOTE: this is blocking
func subscribeToEvent(evsw *events.EventSwitch, receiver, eventID string, chanCap int) chan interface{} {
// listen for new round
ch := make(chan interface{}, chanCap)
evsw.AddListenerForEvent(receiver, eventID, func(data events.EventData) {
ch <- data
})
return ch
}

+ 20
- 23
consensus/common_test.go View File

@ -140,7 +140,7 @@ func addVoteToFromMany(to *ConsensusState, votes []*types.Vote, froms ...*valida
func addVoteToFrom(to *ConsensusState, from *validatorStub, vote *types.Vote) {
valIndex, _ := to.Validators.GetByAddress(from.PrivValidator.Address)
to.peerMsgQueue <- msgInfo{msg: &VoteMessage{valIndex, vote}}
to.peerMsgQueue <- msgInfo{Msg: &VoteMessage{valIndex, vote}}
// added, err := to.TryAddVote(valIndex, vote, "")
/*
if _, ok := err.(*types.ErrVoteConflictingSignature); ok {
@ -296,16 +296,16 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo
cs.mtx.Unlock()
}
func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
// Get State
state, privVals := randGenesisState(nValidators, false, 10)
func fixedConsensusState() *ConsensusState {
stateDB := dbm.NewMemDB()
state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
return newConsensusState(state, privValidator)
// fmt.Println(state.Validators)
vss := make([]*validatorStub, nValidators)
// make consensus state for lead validator
}
func newConsensusState(state *sm.State, pv *types.PrivValidator) *ConsensusState {
// Get BlockStore
blockDB := dbm.NewMemDB()
blockStore := bc.NewBlockStore(blockDB)
@ -320,14 +320,21 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
// Make ConsensusReactor
cs := NewConsensusState(state, proxyAppConnCon, blockStore, mempool)
cs.SetPrivValidator(privVals[0])
cs.SetPrivValidator(pv)
evsw := events.NewEventSwitch()
cs.SetEventSwitch(evsw)
evsw.Start()
return cs
}
// start the transition routines
// cs.startRoutines()
func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
// Get State
state, privVals := randGenesisState(nValidators, false, 10)
vss := make([]*validatorStub, nValidators)
cs := newConsensusState(state, privVals[0])
for i := 0; i < nValidators; i++ {
vss[i] = NewValidatorStub(privVals[i])
@ -344,7 +351,7 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
go func() {
for {
v := <-voteCh0
vote := v.(*types.EventDataVote)
vote := v.(types.EventDataVote)
// we only fire for our own votes
if bytes.Equal(addr, vote.Address) {
voteCh <- v
@ -386,13 +393,3 @@ func startTestRound(cs *ConsensusState, height, round int) {
cs.enterNewRound(height, round)
cs.startRoutines(0)
}
// NOTE: this is blocking
func subscribeToEvent(evsw *events.EventSwitch, receiver, eventID string, chanCap int) chan interface{} {
// listen for new round
ch := make(chan interface{}, chanCap)
evsw.AddListenerForEvent(receiver, eventID, func(data events.EventData) {
ch <- data
})
return ch
}

+ 1
- 1
consensus/reactor.go View File

@ -240,7 +240,7 @@ func (conR *ConsensusReactor) registerEventCallbacks() {
})
conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) {
edv := data.(*types.EventDataVote)
edv := data.(types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
})
}


+ 360
- 0
consensus/replay.go View File

@ -0,0 +1,360 @@
package consensus
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"reflect"
"strconv"
"strings"
"time"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
// unmarshal and apply a single message to the consensus state
func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error {
var err error
var msg ConsensusLogMessage
wire.ReadJSON(&msg, msgBytes, &err)
if err != nil {
fmt.Println(string(msgBytes))
return fmt.Errorf("Error reading json data: %v", err)
}
// for logging
switch m := msg.Msg.(type) {
case types.EventDataRoundState:
log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step)
// these are playback checks
ticker := time.After(time.Second * 2)
if newStepCh != nil {
select {
case mi := <-newStepCh:
m2 := mi.(types.EventDataRoundState)
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
}
case <-ticker:
return fmt.Errorf("Failed to read off newStepCh")
}
}
case msgInfo:
peerKey := m.PeerKey
if peerKey == "" {
peerKey = "local"
}
switch msg := m.Msg.(type) {
case *ProposalMessage:
p := msg.Proposal
log.Notice("Proposal", "height", p.Height, "round", p.Round, "header",
p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey)
case *BlockPartMessage:
log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey)
case *VoteMessage:
v := msg.Vote
log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey)
}
// internal or from peer
if m.PeerKey == "" {
cs.internalMsgQueue <- m
} else {
cs.peerMsgQueue <- m
}
case timeoutInfo:
log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.tockChan <- m
default:
return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg))
}
return nil
}
// replay only those messages since the last block
func (cs *ConsensusState) catchupReplay(height int) error {
if cs.wal == nil {
log.Warn("consensus msg log is nil")
return nil
}
if !cs.wal.exists {
// new wal, nothing to catchup on
return nil
}
// starting from end of file,
// read messages until a new height is found
nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool {
var err error
var msg ConsensusLogMessage
wire.ReadJSON(&msg, lineBytes, &err)
if err != nil {
panic(Fmt("Failed to read cs_msg_log json: %v", err))
}
m, ok := msg.Msg.(types.EventDataRoundState)
if ok && m.Step == RoundStepNewHeight.String() {
// TODO: ensure the height matches
return true
}
return false
})
if err != nil {
return err
}
var beginning bool // if we had to go back to the beginning
if c, _ := cs.wal.fp.Seek(0, 1); c == 0 {
beginning = true
}
log.Notice("Catchup by replaying consensus messages", "n", nLines)
// now we can replay the latest nLines on consensus state
// note we can't use scan because we've already been reading from the file
reader := bufio.NewReader(cs.wal.fp)
for i := 0; i < nLines; i++ {
msgBytes, err := reader.ReadBytes('\n')
if err == io.EOF {
break
} else if err != nil {
return err
} else if len(msgBytes) == 0 {
continue
}
// the first msg is (usually) the NewHeight event, so we can ignore it
if !beginning && i == 1 {
continue
}
// NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step
if err := cs.readReplayMessage(msgBytes, nil); err != nil {
return err
}
}
log.Info("Done catchup replay")
return nil
}
//--------------------------------------------------------
// replay messages interactively or all at once
// Interactive playback
func (cs ConsensusState) ReplayConsole(file string) error {
return cs.replay(file, true)
}
// Full playback, with tests
func (cs ConsensusState) ReplayMessages(file string) error {
return cs.replay(file, false)
}
// replay all msgs or start the console
func (cs *ConsensusState) replay(file string, console bool) error {
if cs.IsRunning() {
return errors.New("cs is already running, cannot replay")
}
if cs.wal != nil {
return errors.New("cs wal is open, cannot replay")
}
cs.startForReplay()
// ensure all new step events are regenerated as expected
newStepCh := subscribeToEvent(cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1)
// just open the file for reading, no need to use wal
fp, err := os.OpenFile(file, os.O_RDONLY, 0666)
if err != nil {
return err
}
pb := newPlayback(file, fp, cs, cs.state.Copy())
defer pb.fp.Close()
var nextN int // apply N msgs in a row
for pb.scanner.Scan() {
if nextN == 0 && console {
nextN = pb.replayConsoleLoop()
}
if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil {
return err
}
if nextN > 0 {
nextN -= 1
}
pb.count += 1
}
return nil
}
//------------------------------------------------
// playback manager
type playback struct {
cs *ConsensusState
fp *os.File
scanner *bufio.Scanner
count int // how many lines/msgs into the file are we
// replays can be reset to beginning
fileName string // so we can close/reopen the file
genesisState *sm.State // so the replay session knows where to restart from
}
func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback {
return &playback{
cs: cs,
fp: fp,
fileName: fileName,
genesisState: genState,
scanner: bufio.NewScanner(fp),
}
}
// go back count steps by resetting the state and running (pb.count - count) steps
func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
pb.cs.Stop()
newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool)
newCs.SetEventSwitch(pb.cs.evsw)
newCs.startForReplay()
pb.fp.Close()
fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0666)
if err != nil {
return err
}
pb.fp = fp
pb.scanner = bufio.NewScanner(fp)
count = pb.count - count
log.Notice(Fmt("Reseting from %d to %d", pb.count, count))
pb.count = 0
pb.cs = newCs
for i := 0; pb.scanner.Scan() && i < count; i++ {
if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil {
return err
}
pb.count += 1
}
return nil
}
func (cs *ConsensusState) startForReplay() {
cs.BaseService.OnStart()
go cs.receiveRoutine(0)
// since we replay tocks we just ignore ticks
go func() {
for {
select {
case <-cs.tickChan:
case <-cs.Quit:
return
}
}
}()
}
// console function for parsing input and running commands
func (pb *playback) replayConsoleLoop() int {
for {
fmt.Printf("> ")
bufReader := bufio.NewReader(os.Stdin)
line, more, err := bufReader.ReadLine()
if more {
Exit("input is too long")
} else if err != nil {
Exit(err.Error())
}
tokens := strings.Split(string(line), " ")
if len(tokens) == 0 {
continue
}
switch tokens[0] {
case "next":
// "next" -> replay next message
// "next N" -> replay next N messages
if len(tokens) == 1 {
return 0
} else {
i, err := strconv.Atoi(tokens[1])
if err != nil {
fmt.Println("next takes an integer argument")
} else {
return i
}
}
case "back":
// "back" -> go back one message
// "back N" -> go back N messages
// NOTE: "back" is not supported in the state machine design,
// so we restart and replay up to
// ensure all new step events are regenerated as expected
newStepCh := subscribeToEvent(pb.cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1)
if len(tokens) == 1 {
pb.replayReset(1, newStepCh)
} else {
i, err := strconv.Atoi(tokens[1])
if err != nil {
fmt.Println("back takes an integer argument")
} else if i > pb.count {
fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count)
} else {
pb.replayReset(i, newStepCh)
}
}
case "rs":
// "rs" -> print entire round state
// "rs short" -> print height/round/step
// "rs <field>" -> print another field of the round state
rs := pb.cs.RoundState
if len(tokens) == 1 {
fmt.Println(rs)
} else {
switch tokens[1] {
case "short":
fmt.Printf("%v/%v/%v\n", rs.Height, rs.Round, rs.Step)
case "validators":
fmt.Println(rs.Validators)
case "proposal":
fmt.Println(rs.Proposal)
case "proposal_block":
fmt.Printf("%v %v\n", rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort())
case "locked_round":
fmt.Println(rs.LockedRound)
case "locked_block":
fmt.Printf("%v %v\n", rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort())
case "votes":
fmt.Println(rs.Votes.StringIndented(" "))
default:
fmt.Println("Unknown option", tokens[1])
}
}
case "n":
fmt.Println(pb.count)
}
}
return 0
}

+ 72
- 0
consensus/replay_test.go View File

@ -0,0 +1,72 @@
package consensus
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/tendermint/tendermint/types"
)
var testLog = `{"time":"2016-01-18T20:46:00.774Z","msg":[3,{"duration":982632969,"height":1,"round":0,"step":1}]}
{"time":"2016-01-18T20:46:00.776Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
{"time":"2016-01-18T20:46:00.776Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"pol_round":-1,"signature":"A1803A1364F6398C154FE45D5649A89129039F18A0FE42B211BADFDF6E81EA53F48F83D3610DDD848C3A5284D3F09BDEB26FA1D856BDF70D48C507BF2453A70E"}}],"peer_key":""}]}
{"time":"2016-01-18T20:46:00.777Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F746573740101142AA030B15DDFC000000000000000000000000000000114C4B01D3810579550997AC5641E759E20D99B51C10001000100","proof":{"aunts":[]}}}],"peer_key":""}]}
{"time":"2016-01-18T20:46:00.781Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]}
{"time":"2016-01-18T20:46:00.781Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"E05D1DB8DEC7CDA507A42C8FF208EE4317C663F6","block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"signature":"88F5708C802BEE54EFBF438967FBC6C6EAAFC41258A85D92B9B055481175BE9FA71007B1AAF2BFBC3BF3CC0542DB48A9812324B7BBA7307446CCDBF029077F07"}}],"peer_key":""}]}
{"time":"2016-01-18T20:46:00.786Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]}
{"time":"2016-01-18T20:46:00.786Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"E05D1DB8DEC7CDA507A42C8FF208EE4317C663F6","block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"signature":"65B0C9D2A8C9919FC9B036F82C3F1818E706E8BC066A78D99D3316E4814AB06594841E387B323AA7773F926D253C1E4D4A0930F7A8C8AE1E838CA15C673B2B02"}}],"peer_key":""}]}
`
func TestReplayCatchup(t *testing.T) {
// write the needed wal to file
f, err := ioutil.TempFile(os.TempDir(), "replay_test_")
if err != nil {
t.Fatal(err)
}
name := f.Name()
_, err = f.WriteString(testLog)
if err != nil {
t.Fatal(err)
}
f.Close()
cs := fixedConsensusState()
// we've already precommitted on the first block
// without replay catchup we would be halted here forever
cs.privValidator.LastHeight = 1 // first block
cs.privValidator.LastStep = 3 // precommit
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0)
// start timeout and receive routines
cs.startRoutines(0)
// open wal and run catchup messages
openWAL(t, cs, name)
if err := cs.catchupReplay(cs.Height); err != nil {
t.Fatalf("Error on catchup replay %v", err)
}
cs.enterNewRound(cs.Height, cs.Round)
after := time.After(time.Second * 2)
select {
case <-newBlockCh:
case <-after:
t.Fatal("Timed out waiting for new block")
}
}
func openWAL(t *testing.T, cs *ConsensusState, file string) {
// open the wal
wal, err := NewWAL(file)
if err != nil {
t.Fatal(err)
}
wal.exists = true
cs.wal = wal
}

+ 71
- 61
consensus/state.go View File

@ -19,7 +19,8 @@ import (
)
var (
timeoutPropose = 3000 * time.Millisecond // Maximum duration of RoundStepPropose
timeoutPropose0 = 3000 * time.Millisecond // Wait this long for a proposal
timeoutProposeDelta = 0500 * time.Millisecond // timeoutProposeN is timeoutPropose0 + timeoutProposeDelta*N
timeoutPrevote0 = 1000 * time.Millisecond // After any +2/3 prevotes received, wait this long for stragglers.
timeoutPrevoteDelta = 0500 * time.Millisecond // timeoutPrevoteN is timeoutPrevote0 + timeoutPrevoteDelta*N
timeoutPrecommit0 = 1000 * time.Millisecond // After any +2/3 precommits received, wait this long for stragglers.
@ -153,20 +154,20 @@ var (
// msgs from the reactor which may update the state
type msgInfo struct {
msg ConsensusMessage
peerKey string
Msg ConsensusMessage `json:"msg"`
PeerKey string `json:"peer_key"`
}
// internally generated messages which may update the state
type timeoutInfo struct {
duration time.Duration
height int
round int
step RoundStepType
Duration time.Duration `json:"duration"`
Height int `json:"height"`
Round int `json:"round"`
Step RoundStepType `json:"step"`
}
func (ti *timeoutInfo) String() string {
return fmt.Sprintf("%v ; %d/%d %v", ti.duration, ti.height, ti.round, ti.step)
return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step)
}
// Tracks consensus state across block heights and rounds.
@ -190,6 +191,8 @@ type ConsensusState struct {
evsw *events.EventSwitch
wal *WAL
nSteps int // used for testing to limit the number of transitions the state makes
}
@ -248,14 +251,21 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) {
}
func (cs *ConsensusState) OnStart() error {
cs.BaseService.OnStart()
cs.QuitService.OnStart()
// first we schedule the round (no go routines)
// then we start the timeout and receive routines.
// tickChan is buffered so scheduleRound0 will finish.
// Then all further access to the RoundState is through the receiveRoutine
cs.scheduleRound0(cs.Height)
// start timeout and receive routines
cs.startRoutines(0)
// we may have lost some votes if the process crashed
// reload from consensus log to catchup
if err := cs.catchupReplay(cs.Height); err != nil {
log.Error("Error on catchup replay", "error", err.Error())
// let's go for it anyways, maybe we're fine
}
// schedule the first round!
cs.scheduleRound0(cs.Height)
return nil
}
@ -270,6 +280,18 @@ func (cs *ConsensusState) OnStop() {
cs.QuitService.OnStop()
}
// Open file to log all consensus messages and timeouts for deterministic accountability
func (cs *ConsensusState) OpenWAL(file string) (err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
wal, err := NewWAL(file)
if err != nil {
return err
}
cs.wal = wal
return nil
}
//------------------------------------------------------------
// Public interface for passing messages into the consensus state,
// possibly causing a state transition
@ -372,8 +394,8 @@ func (cs *ConsensusState) reconstructLastCommit(state *sm.State) {
if state.LastBlockHeight == 0 {
return
}
lastPrecommits := types.NewVoteSet(state.LastBlockHeight, 0, types.VoteTypePrecommit, state.LastValidators)
seenValidation := cs.blockStore.LoadSeenValidation(state.LastBlockHeight)
lastPrecommits := types.NewVoteSet(state.LastBlockHeight, seenValidation.Round(), types.VoteTypePrecommit, state.LastValidators)
for idx, precommit := range seenValidation.Precommits {
if precommit == nil {
continue
@ -455,10 +477,12 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
}
func (cs *ConsensusState) newStep() {
rs := cs.RoundStateEvent()
cs.wal.Save(rs)
cs.nSteps += 1
// newStep is called by updateToStep in NewConsensusState before the evsw is set!
if cs.evsw != nil {
cs.evsw.FireEvent(types.EventStringNewRoundStep(), cs.RoundStateEvent())
cs.evsw.FireEvent(types.EventStringNewRoundStep(), rs)
}
}
@ -477,13 +501,13 @@ func (cs *ConsensusState) timeoutRoutine() {
log.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// ignore tickers for old height/round/step
if newti.height < ti.height {
if newti.Height < ti.Height {
continue
} else if newti.height == ti.height {
if newti.round < ti.round {
} else if newti.Height == ti.Height {
if newti.Round < ti.Round {
continue
} else if newti.round == ti.round {
if ti.step > 0 && newti.step <= ti.step {
} else if newti.Round == ti.Round {
if ti.Step > 0 && newti.Step <= ti.Step {
continue
}
}
@ -492,16 +516,16 @@ func (cs *ConsensusState) timeoutRoutine() {
ti = newti
// if the newti has duration == 0, we relay to the tockChan immediately (no timeout)
if ti.duration == time.Duration(0) {
if ti.Duration == time.Duration(0) {
go func(t timeoutInfo) { cs.tockChan <- t }(ti)
continue
}
log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
cs.timeoutTicker.Stop()
cs.timeoutTicker = time.NewTicker(ti.duration)
cs.timeoutTicker = time.NewTicker(ti.Duration)
case <-cs.timeoutTicker.C:
log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
cs.timeoutTicker.Stop()
// go routine here gaurantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
@ -537,17 +561,24 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
select {
case mi = <-cs.peerMsgQueue:
cs.wal.Save(mi)
// handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi, rs)
case mi = <-cs.internalMsgQueue:
cs.wal.Save(mi)
// handles proposals, block parts, votes
cs.handleMsg(mi, rs)
case ti := <-cs.tockChan:
cs.wal.Save(ti)
// if the timeout is relevant to the rs
// go to the next step
cs.handleTimeout(ti, rs)
case <-cs.Quit:
// close wal now that we're done writing to it
if cs.wal != nil {
cs.wal.Close()
}
return
}
}
@ -559,7 +590,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
defer cs.mtx.Unlock()
var err error
msg, peerKey := mi.msg, mi.peerKey
msg, peerKey := mi.Msg, mi.PeerKey
switch msg := msg.(type) {
case *ProposalMessage:
// will not cause transition.
@ -592,10 +623,10 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
}
func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
log.Debug("Received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
// timeouts must be for current height, round, step
if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) {
if ti.Height != rs.Height || ti.Round < rs.Round || (ti.Round == rs.Round && ti.Step < rs.Step) {
log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step)
return
}
@ -604,22 +635,22 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
switch ti.step {
switch ti.Step {
case RoundStepNewHeight:
// NewRound event fired from enterNewRound.
// Do we want a timeout event too?
cs.enterNewRound(ti.height, 0)
// XXX: should we fire timeout here?
cs.enterNewRound(ti.Height, 0)
case RoundStepPropose:
cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent())
cs.enterPrevote(ti.height, ti.round)
cs.enterPrevote(ti.Height, ti.Round)
case RoundStepPrevoteWait:
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
cs.enterPrecommit(ti.height, ti.round)
cs.enterPrecommit(ti.Height, ti.Round)
case RoundStepPrecommitWait:
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
cs.enterNewRound(ti.height, ti.round+1)
cs.enterNewRound(ti.Height, ti.Round+1)
default:
panic(Fmt("Invalid timeout step: %v", ti.step))
panic(Fmt("Invalid timeout step: %v", ti.Step))
}
}
@ -676,9 +707,6 @@ func (cs *ConsensusState) enterNewRound(height int, round int) {
// Enter: from NewRound(height,round).
func (cs *ConsensusState) enterPropose(height int, round int) {
// cs.mtx.Lock()
// cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) {
log.Debug(Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
@ -699,7 +727,7 @@ func (cs *ConsensusState) enterPropose(height int, round int) {
}()
// This step times out after `timeoutPropose`
cs.scheduleTimeout(timeoutPropose, height, round, RoundStepPropose)
cs.scheduleTimeout(timeoutPropose0+timeoutProposeDelta*time.Duration(round), height, round, RoundStepPropose)
// Nothing more to do if we're not a validator
if cs.privValidator == nil {
@ -826,8 +854,6 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
// Prevote for LockedBlock if we're locked, or ProposalBlock if valid.
// Otherwise vote nil.
func (cs *ConsensusState) enterPrevote(height int, round int) {
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevote <= cs.Step) {
log.Debug(Fmt("enterPrevote(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
@ -891,8 +917,6 @@ func (cs *ConsensusState) doPrevote(height int, round int) {
// Enter: any +2/3 prevotes at next round.
func (cs *ConsensusState) enterPrevoteWait(height int, round int) {
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevoteWait <= cs.Step) {
log.Debug(Fmt("enterPrevoteWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
@ -919,8 +943,6 @@ func (cs *ConsensusState) enterPrevoteWait(height int, round int) {
// else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil,
// else, precommit nil otherwise.
func (cs *ConsensusState) enterPrecommit(height int, round int) {
//cs.mtx.Lock()
// defer cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommit <= cs.Step) {
log.Debug(Fmt("enterPrecommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
@ -1016,8 +1038,6 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
// Enter: any +2/3 precommits for next round.
func (cs *ConsensusState) enterPrecommitWait(height int, round int) {
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommitWait <= cs.Step) {
log.Debug(Fmt("enterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
@ -1040,8 +1060,6 @@ func (cs *ConsensusState) enterPrecommitWait(height int, round int) {
// Enter: +2/3 precommits for block
func (cs *ConsensusState) enterCommit(height int, commitRound int) {
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
if cs.Height != height || RoundStepCommit <= cs.Step {
log.Debug(Fmt("enterCommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step))
return
@ -1107,9 +1125,6 @@ func (cs *ConsensusState) tryFinalizeCommit(height int) {
// Increment height and goto RoundStepNewHeight
func (cs *ConsensusState) finalizeCommit(height int) {
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
if cs.Height != height || cs.Step != RoundStepCommit {
log.Debug(Fmt("finalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step))
return
@ -1189,9 +1204,6 @@ func (cs *ConsensusState) finalizeCommit(height int) {
//-----------------------------------------------------------------------------
func (cs *ConsensusState) setProposal(proposal *types.Proposal) error {
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
// Already have one
if cs.Proposal != nil {
return nil
@ -1226,9 +1238,6 @@ func (cs *ConsensusState) setProposal(proposal *types.Proposal) error {
// NOTE: block is not necessarily valid.
// This can trigger us to go into enterPrevote asynchronously (before we timeout of propose) or to attempt to commit
func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) {
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
// Blocks might be reused, so round mismatch is OK
if cs.Height != height {
return false, nil
@ -1248,7 +1257,8 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad
var n int
var err error
cs.ProposalBlock = wire.ReadBinary(&types.Block{}, cs.ProposalBlockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block)
log.Info("Received complete proposal", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
log.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
if cs.Step == RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(height, cs.Round)
@ -1305,7 +1315,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
added, address, err = cs.LastCommit.AddByIndex(valIndex, vote)
if added {
log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort()))
cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote})
cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote})
}
return
@ -1316,7 +1326,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
height := cs.Height
added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey)
if added {
cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote})
cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote})
switch vote.Type {
case types.VoteTypePrevote:


+ 20
- 19
consensus/state_test.go View File

@ -46,11 +46,12 @@ x * TestHalt1 - if we see +2/3 precommits after timing out into new round, we sh
func init() {
fmt.Println("")
timeoutPropose = 500 * time.Millisecond
timeoutPropose0 = 100 * time.Millisecond
timeoutProposeDelta = 1 * time.Millisecond
}
func TestProposerSelection0(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
height, round := cs1.Height, cs1.Round
newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1)
@ -84,7 +85,7 @@ func TestProposerSelection0(t *testing.T) {
// Now let's do it all again, but starting from round 2 instead of 0
func TestProposerSelection2(t *testing.T) {
cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators
cs1, vss := randConsensusState(4) // test needs more work for more than 3 validators
newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1)
@ -113,7 +114,7 @@ func TestProposerSelection2(t *testing.T) {
// a non-validator should timeout into the prevote round
func TestEnterProposeNoPrivValidator(t *testing.T) {
cs, _ := simpleConsensusState(1)
cs, _ := randConsensusState(1)
cs.SetPrivValidator(nil)
height, round := cs.Height, cs.Round
@ -123,7 +124,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) {
startTestRound(cs, height, round)
// if we're not a validator, EnterPropose should timeout
ticker := time.NewTicker(timeoutPropose * 2)
ticker := time.NewTicker(timeoutPropose0 * 2)
select {
case <-timeoutCh:
case <-ticker.C:
@ -138,7 +139,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) {
// a validator should not timeout of the prevote round (TODO: unless the block is really big!)
func TestEnterProposeYesPrivValidator(t *testing.T) {
cs, _ := simpleConsensusState(1)
cs, _ := randConsensusState(1)
height, round := cs.Height, cs.Round
// Listen for propose timeout event
@ -164,7 +165,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) {
}
// if we're a validator, enterPropose should not timeout
ticker := time.NewTicker(timeoutPropose * 2)
ticker := time.NewTicker(timeoutPropose0 * 2)
select {
case <-timeoutCh:
t.Fatal("Expected EnterPropose not to timeout")
@ -174,7 +175,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) {
}
func TestBadProposal(t *testing.T) {
cs1, vss := simpleConsensusState(2)
cs1, vss := randConsensusState(2)
height, round := cs1.Height, cs1.Round
cs2 := vss[1]
@ -230,7 +231,7 @@ func TestBadProposal(t *testing.T) {
// propose, prevote, and precommit a block
func TestFullRound1(t *testing.T) {
cs, vss := simpleConsensusState(1)
cs, vss := randConsensusState(1)
height, round := cs.Height, cs.Round
voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
@ -258,7 +259,7 @@ func TestFullRound1(t *testing.T) {
// nil is proposed, so prevote and precommit nil
func TestFullRoundNil(t *testing.T) {
cs, vss := simpleConsensusState(1)
cs, vss := randConsensusState(1)
height, round := cs.Height, cs.Round
voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
@ -276,7 +277,7 @@ func TestFullRoundNil(t *testing.T) {
// run through propose, prevote, precommit commit with two validators
// where the first validator has to wait for votes from the second
func TestFullRound2(t *testing.T) {
cs1, vss := simpleConsensusState(2)
cs1, vss := randConsensusState(2)
cs2 := vss[1]
height, round := cs1.Height, cs1.Round
@ -317,7 +318,7 @@ func TestFullRound2(t *testing.T) {
// two validators, 4 rounds.
// two vals take turns proposing. val1 locks on first one, precommits nil on everything else
func TestLockNoPOL(t *testing.T) {
cs1, vss := simpleConsensusState(2)
cs1, vss := randConsensusState(2)
cs2 := vss[1]
height := cs1.Height
@ -480,7 +481,7 @@ func TestLockNoPOL(t *testing.T) {
// 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka
func TestLockPOLRelock(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1)
@ -588,7 +589,7 @@ func TestLockPOLRelock(t *testing.T) {
// 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka
func TestLockPOLUnlock(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
@ -679,7 +680,7 @@ func TestLockPOLUnlock(t *testing.T) {
// then a polka at round 2 that we lock on
// then we see the polka from round 1 but shouldn't unlock
func TestLockPOLSafety1(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
@ -798,7 +799,7 @@ func TestLockPOLSafety1(t *testing.T) {
// What we want:
// dont see P0, lock on P1 at R1, dont unlock using P0 at R2
func TestLockPOLSafety2(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
@ -888,7 +889,7 @@ func TestLockPOLSafety2(t *testing.T) {
/*
func TestSlashingPrevotes(t *testing.T) {
cs1, vss := simpleConsensusState(2)
cs1, vss := randConsensusState(2)
cs2 := vss[1]
@ -923,7 +924,7 @@ func TestSlashingPrevotes(t *testing.T) {
}
func TestSlashingPrecommits(t *testing.T) {
cs1, vss := simpleConsensusState(2)
cs1, vss := randConsensusState(2)
cs2 := vss[1]
@ -968,7 +969,7 @@ func TestSlashingPrecommits(t *testing.T) {
// 4 vals.
// we receive a final precommit after going into next round, but others might have gone to commit already!
func TestHalt1(t *testing.T) {
cs1, vss := simpleConsensusState(4)
cs1, vss := randConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)


+ 117
- 0
consensus/wal.go View File

@ -0,0 +1,117 @@
package consensus
import (
"bufio"
"os"
"time"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types"
)
//--------------------------------------------------------
// types and functions for savings consensus messages
type ConsensusLogMessage struct {
Time time.Time `json:"time"`
Msg ConsensusLogMessageInterface `json:"msg"`
}
type ConsensusLogMessageInterface interface{}
var _ = wire.RegisterInterface(
struct{ ConsensusLogMessageInterface }{},
wire.ConcreteType{types.EventDataRoundState{}, 0x01},
wire.ConcreteType{msgInfo{}, 0x02},
wire.ConcreteType{timeoutInfo{}, 0x03},
)
//--------------------------------------------------------
// Simple write-ahead logger
// Write ahead logger writes msgs to disk before they are processed.
// Can be used for crash-recovery and deterministic replay
// TODO: currently the wal is overwritten during replay catchup
// give it a mode so it's either reading or appending - must read to end to start appending again
type WAL struct {
fp *os.File
exists bool // if the file already existed (restarted process)
}
func NewWAL(file string) (*WAL, error) {
var walExists bool
if _, err := os.Stat(file); err == nil {
walExists = true
}
fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
return &WAL{
fp: fp,
exists: walExists,
}, nil
}
// called in newStep and for each pass in receiveRoutine
func (wal *WAL) Save(msg ConsensusLogMessageInterface) {
if wal != nil {
var n int
var err error
wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, wal.fp, &n, &err)
wire.WriteTo([]byte("\n"), wal.fp, &n, &err) // one message per line
if err != nil {
PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, msg))
}
}
}
// Must not be called concurrently.
func (wal *WAL) Close() {
if wal != nil {
wal.fp.Close()
}
}
func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
var current int64
// start at the end
current, err = wal.fp.Seek(0, 2)
if err != nil {
return
}
// backup until we find the the right line
// current is how far we are from the beginning
for {
current -= 1
if current < 0 {
wal.fp.Seek(0, 0) // back to beginning
return
}
// backup one and read a new byte
if _, err = wal.fp.Seek(current, 0); err != nil {
return
}
b := make([]byte, 1)
if _, err = wal.fp.Read(b); err != nil {
return
}
if b[0] == '\n' || len(b) == 0 {
nLines += 1
// read a full line
reader := bufio.NewReader(wal.fp)
lineBytes, _ := reader.ReadBytes('\n')
if len(lineBytes) == 0 {
continue
}
if found(lineBytes) {
wal.fp.Seek(0, 1) // (?)
wal.fp.Seek(current, 0)
return
}
}
}
}

+ 76
- 0
consensus/wal_test.go View File

@ -0,0 +1,76 @@
package consensus
import (
"io/ioutil"
"os"
"path"
"strings"
"testing"
)
var testTxt = `{"time":"2016-01-16T04:42:00.390Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrevote"}]}
{"time":"2016-01-16T04:42:00.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":1,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"4CC6845A128E723A299B470CCBB2A158612AA51321447F6492F3DA57D135C27FCF4124B3B19446A248252BDA45B152819C76AAA5FD35E1C07091885CE6955E05"}}],"peer_key":""}]}
{"time":"2016-01-16T04:42:00.392Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrecommit"}]}
{"time":"2016-01-16T04:42:00.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":2,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"1B9924E010F47E0817695DFE462C531196E5A12632434DE12180BBA3EFDAD6B3960FDB9357AFF085EB61729A7D4A6AD8408555D7569C87D9028F280192FD4E05"}}],"peer_key":""}]}
{"time":"2016-01-16T04:42:00.393Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepCommit"}]}
{"time":"2016-01-16T04:42:00.395Z","msg":[1,{"height":28220,"round":0,"step":"RoundStepNewHeight"}]}`
func TestSeek(t *testing.T) {
f, err := ioutil.TempFile(os.TempDir(), "seek_test_")
if err != nil {
t.Fatal(err)
}
stat, _ := f.Stat()
name := stat.Name()
_, err = f.WriteString(testTxt)
if err != nil {
t.Fatal(err)
}
f.Close()
wal, err := NewWAL(path.Join(os.TempDir(), name))
if err != nil {
t.Fatal(err)
}
keyWord := "Precommit"
n, err := wal.SeekFromEnd(func(b []byte) bool {
if strings.Contains(string(b), keyWord) {
return true
}
return false
})
if err != nil {
t.Fatal(err)
}
// confirm n
spl := strings.Split(testTxt, "\n")
var i int
var s string
for i, s = range spl {
if strings.Contains(s, keyWord) {
break
}
}
// n is lines from the end.
spl = spl[i:]
if n != len(spl) {
t.Fatalf("Wrong nLines. Got %d, expected %d", n, len(spl))
}
b, err := ioutil.ReadAll(wal.fp)
if err != nil {
t.Fatal(err)
}
// first char is a \n
spl2 := strings.Split(strings.Trim(string(b), "\n"), "\n")
for i, s := range spl {
if s != spl2[i] {
t.Fatalf("Mismatch. Got %s, expected %s", spl2[i], s)
}
}
}

+ 101
- 32
node/node.go View File

@ -84,6 +84,12 @@ func NewNode(privValidator *types.PrivValidator) *Node {
consensusReactor.SetPrivValidator(privValidator)
}
// deterministic accountability
err = consensusState.OpenWAL(config.GetString("cswal"))
if err != nil {
log.Error("Failed to open cswal", "error", err.Error())
}
// Make p2p network switch
sw := p2p.NewSwitch()
sw.AddReactor("MEMPOOL", mempoolReactor)
@ -219,6 +225,49 @@ func makeNodeInfo(sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
return nodeInfo
}
// Get a connection to the proxyAppConn addr.
// Check the current hash, and panic if it doesn't match.
func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) {
// use local app (for testing)
if addr == "local" {
app := example.NewCounterApplication(true)
mtx := new(sync.Mutex)
proxyAppConn = proxy.NewLocalAppConn(mtx, app)
} else {
proxyConn, err := Connect(addr)
if err != nil {
Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
}
remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024)
remoteApp.Start()
proxyAppConn = remoteApp
}
// Check the hash
currentHash, err := proxyAppConn.GetHashSync()
if err != nil {
PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", err))
}
if !bytes.Equal(hash, currentHash) {
PanicCrisis(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, currentHash))
}
return proxyAppConn
}
// Load the most recent state from "state" db,
// or create a new one (and save) from genesis.
func getState() *sm.State {
stateDB := dbm.GetDB("state")
state := sm.LoadState(stateDB)
if state == nil {
state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
state.Save()
}
return state
}
//------------------------------------------------------------------------------
// Users wishing to use an external signer for their validators
@ -283,45 +332,65 @@ func RunNode() {
})
}
// Load the most recent state from "state" db,
// or create a new one (and save) from genesis.
func getState() *sm.State {
//------------------------------------------------------------------------------
// replay
// convenience for replay mode
func newConsensusState() *consensus.ConsensusState {
// Get BlockStore
blockStoreDB := dbm.GetDB("blockstore")
blockStore := bc.NewBlockStore(blockStoreDB)
// Get State
stateDB := dbm.GetDB("state")
state := sm.LoadState(stateDB)
if state == nil {
state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
state.Save()
state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
// Create two proxyAppConn connections,
// one for the consensus and one for the mempool.
proxyAddr := config.GetString("proxy_app")
proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash)
proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash)
// add the chainid to the global config
config.Set("chain_id", state.ChainID)
// Make event switch
eventSwitch := events.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
Exit(Fmt("Failed to start event switch: %v", err))
}
return state
}
// Get a connection to the proxyAppConn addr.
// Check the current hash, and panic if it doesn't match.
func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) {
// use local app (for testing)
if addr == "local" {
app := example.NewCounterApplication(true)
mtx := new(sync.Mutex)
proxyAppConn = proxy.NewLocalAppConn(mtx, app)
} else {
proxyConn, err := Connect(addr)
if err != nil {
Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
}
remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024)
remoteApp.Start()
mempool := mempl.NewMempool(proxyAppConnMempool)
proxyAppConn = remoteApp
consensusState := consensus.NewConsensusState(state.Copy(), proxyAppConnConsensus, blockStore, mempool)
consensusState.SetEventSwitch(eventSwitch)
return consensusState
}
func RunReplayConsole() {
walFile := config.GetString("cswal")
if walFile == "" {
Exit("cswal file name not set in tendermint config")
}
// Check the hash
currentHash, err := proxyAppConn.GetHashSync()
if err != nil {
PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", err))
consensusState := newConsensusState()
if err := consensusState.ReplayConsole(walFile); err != nil {
Exit(Fmt("Error during consensus replay: %v", err))
}
if !bytes.Equal(hash, currentHash) {
PanicCrisis(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, currentHash))
}
func RunReplay() {
walFile := config.GetString("cswal")
if walFile == "" {
Exit("cswal file name not set in tendermint config")
}
return proxyAppConn
consensusState := newConsensusState()
if err := consensusState.ReplayMessages(walFile); err != nil {
Exit(Fmt("Error during consensus replay: %v", err))
}
log.Notice("Replay run successfully")
}

+ 1
- 0
types/events.go View File

@ -75,6 +75,7 @@ type EventDataApp struct {
Data []byte `json:"bytes"`
}
// NOTE: This goes into the replay WAL
type EventDataRoundState struct {
Height int `json:"height"`
Round int `json:"round"`


Loading…
Cancel
Save