|
|
@ -1,12 +1,16 @@ |
|
|
|
package consensus |
|
|
|
|
|
|
|
import ( |
|
|
|
"fmt" |
|
|
|
"io/ioutil" |
|
|
|
"os" |
|
|
|
"strings" |
|
|
|
"testing" |
|
|
|
"time" |
|
|
|
|
|
|
|
. "github.com/tendermint/go-common" |
|
|
|
"github.com/tendermint/go-events" |
|
|
|
"github.com/tendermint/go-wire" |
|
|
|
"github.com/tendermint/tendermint/types" |
|
|
|
) |
|
|
|
|
|
|
@ -56,51 +60,144 @@ var testLog = `{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254 |
|
|
|
{"time":"2016-04-03T11:23:54.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"4291966B8A9DFBA00AEC7C700F2718E61DF4331D","block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"signature":"39147DA595F08B73CF8C899967C8403B5872FD9042FFA4E239159E0B6C5D9665C9CA81D766EACA2AE658872F94C2FCD1E34BF51859CD5B274DA8512BACE4B50D"}}],"peer_key":""}]} |
|
|
|
` |
|
|
|
|
|
|
|
func TestReplayCatchup(t *testing.T) { |
|
|
|
// map lines in the above wal to privVal step
|
|
|
|
var mapPrivValStep = map[int]int8{ |
|
|
|
0: 0, |
|
|
|
1: 0, |
|
|
|
2: 1, |
|
|
|
3: 1, |
|
|
|
4: 1, |
|
|
|
5: 2, |
|
|
|
6: 2, |
|
|
|
7: 3, |
|
|
|
} |
|
|
|
|
|
|
|
func writeWAL(log string) string { |
|
|
|
fmt.Println("writing", log) |
|
|
|
// write the needed wal to file
|
|
|
|
f, err := ioutil.TempFile(os.TempDir(), "replay_test_") |
|
|
|
if err != nil { |
|
|
|
panic(err) |
|
|
|
} |
|
|
|
name := f.Name() |
|
|
|
_, err = f.WriteString(testLog) |
|
|
|
|
|
|
|
_, err = f.WriteString(log) |
|
|
|
if err != nil { |
|
|
|
panic(err) |
|
|
|
} |
|
|
|
name := f.Name() |
|
|
|
f.Close() |
|
|
|
return name |
|
|
|
} |
|
|
|
|
|
|
|
func waitForBlock(newBlockCh chan interface{}) { |
|
|
|
after := time.After(time.Second * 10) |
|
|
|
select { |
|
|
|
case <-newBlockCh: |
|
|
|
case <-after: |
|
|
|
panic("Timed out waiting for new block") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh chan interface{}) { |
|
|
|
cs.config.Set("cswal", fileName) |
|
|
|
cs.Start() |
|
|
|
// Wait to make a new block.
|
|
|
|
// This is just a signal that we haven't halted; its not something contained in the WAL itself.
|
|
|
|
// Assuming the consensus state is running, replay of any WAL, including the empty one,
|
|
|
|
// should eventually be followed by a new block, or else something is wrong
|
|
|
|
waitForBlock(newBlockCh) |
|
|
|
cs.Stop() |
|
|
|
} |
|
|
|
|
|
|
|
func setupReplayTest(nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) { |
|
|
|
fmt.Println("-------------------------------------") |
|
|
|
log.Notice(Fmt("Starting replay test of %d lines of WAL (crash before write)", nLines)) |
|
|
|
|
|
|
|
lineStep := nLines |
|
|
|
if crashAfter { |
|
|
|
lineStep -= 1 |
|
|
|
} |
|
|
|
|
|
|
|
split := strings.Split(testLog, "\n") |
|
|
|
lastMsg := split[nLines] |
|
|
|
|
|
|
|
// we write those lines up to (not including) one with the signature
|
|
|
|
fileName := writeWAL(strings.Join(split[:nLines], "\n") + "\n") |
|
|
|
|
|
|
|
cs := fixedConsensusState() |
|
|
|
|
|
|
|
// we've already precommitted on the first block
|
|
|
|
// without replay catchup we would be halted here forever
|
|
|
|
// set the last step according to when we crashed vs the wal
|
|
|
|
cs.privValidator.LastHeight = 1 // first block
|
|
|
|
cs.privValidator.LastStep = 3 // precommit
|
|
|
|
cs.privValidator.LastStep = mapPrivValStep[lineStep] |
|
|
|
|
|
|
|
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0) |
|
|
|
fmt.Println("LAST STEP", cs.privValidator.LastStep) |
|
|
|
|
|
|
|
// start timeout and receive routines
|
|
|
|
cs.startRoutines(0) |
|
|
|
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) |
|
|
|
|
|
|
|
// open wal and run catchup messages
|
|
|
|
openWAL(t, cs, name) |
|
|
|
if err := cs.catchupReplay(cs.Height); err != nil { |
|
|
|
panic(Fmt("Error on catchup replay %v", err)) |
|
|
|
} |
|
|
|
return cs, newBlockCh, lastMsg, fileName |
|
|
|
} |
|
|
|
|
|
|
|
after := time.After(time.Second * 15) |
|
|
|
select { |
|
|
|
case <-newBlockCh: |
|
|
|
case <-after: |
|
|
|
panic("Timed out waiting for new block") |
|
|
|
//-----------------------------------------------
|
|
|
|
// Test the log at every iteration, and set the privVal last step
|
|
|
|
// as if the log was written after signing, before the crash
|
|
|
|
|
|
|
|
func TestReplayCrashAfterWrite(t *testing.T) { |
|
|
|
split := strings.Split(testLog, "\n") |
|
|
|
for i := 0; i < len(split)-1; i++ { |
|
|
|
cs, newBlockCh, _, f := setupReplayTest(i+1, true) |
|
|
|
runReplayTest(t, cs, f, newBlockCh) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func openWAL(t *testing.T, cs *ConsensusState, file string) { |
|
|
|
// open the wal
|
|
|
|
wal, err := NewWAL(file, config.GetBool("cswal_light")) |
|
|
|
//-----------------------------------------------
|
|
|
|
// Test the log as if we crashed after signing but before writing.
|
|
|
|
// This relies on privValidator.LastSignature being set
|
|
|
|
|
|
|
|
func TestReplayCrashBeforeWritePropose(t *testing.T) { |
|
|
|
cs, newBlockCh, proposalMsg, f := setupReplayTest(2, false) // propose
|
|
|
|
// Set LastSig
|
|
|
|
var err error |
|
|
|
var msg ConsensusLogMessage |
|
|
|
wire.ReadJSON(&msg, []byte(proposalMsg), &err) |
|
|
|
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) |
|
|
|
if err != nil { |
|
|
|
panic(err) |
|
|
|
t.Fatalf("Error reading json data: %v", err) |
|
|
|
} |
|
|
|
wal.exists = true |
|
|
|
cs.wal = wal |
|
|
|
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) |
|
|
|
cs.privValidator.LastSignature = proposal.Proposal.Signature |
|
|
|
runReplayTest(t, cs, f, newBlockCh) |
|
|
|
} |
|
|
|
|
|
|
|
func TestReplayCrashBeforeWritePrevote(t *testing.T) { |
|
|
|
cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote
|
|
|
|
cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) { |
|
|
|
// Set LastSig
|
|
|
|
var err error |
|
|
|
var msg ConsensusLogMessage |
|
|
|
wire.ReadJSON(&msg, []byte(voteMsg), &err) |
|
|
|
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) |
|
|
|
if err != nil { |
|
|
|
t.Fatalf("Error reading json data: %v", err) |
|
|
|
} |
|
|
|
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) |
|
|
|
cs.privValidator.LastSignature = vote.Vote.Signature |
|
|
|
}) |
|
|
|
runReplayTest(t, cs, f, newBlockCh) |
|
|
|
} |
|
|
|
|
|
|
|
func TestReplayCrashBeforeWritePrecommit(t *testing.T) { |
|
|
|
cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit
|
|
|
|
cs.evsw.AddListenerForEvent("tester", types.EventStringPolka(), func(data events.EventData) { |
|
|
|
// Set LastSig
|
|
|
|
var err error |
|
|
|
var msg ConsensusLogMessage |
|
|
|
wire.ReadJSON(&msg, []byte(voteMsg), &err) |
|
|
|
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) |
|
|
|
if err != nil { |
|
|
|
t.Fatalf("Error reading json data: %v", err) |
|
|
|
} |
|
|
|
cs.privValidator.LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) |
|
|
|
cs.privValidator.LastSignature = vote.Vote.Signature |
|
|
|
}) |
|
|
|
runReplayTest(t, cs, f, newBlockCh) |
|
|
|
} |