Browse Source

consensus: fix makeBlockchainFromWAL

pull/968/merge
Ethan Buchman 7 years ago
parent
commit
e0296d6c3c
2 changed files with 46 additions and 22 deletions
  1. +37
    -21
      consensus/replay_test.go
  2. +9
    -1
      consensus/wal_generator.go

+ 37
- 21
consensus/replay_test.go View File

@ -44,13 +44,6 @@ func init() {
// the `Handshake Tests` are for failures in applying the block. // the `Handshake Tests` are for failures in applying the block.
// With the help of the WAL, we can recover from it all! // With the help of the WAL, we can recover from it all!
// NOTE: Files in this dir are generated by running the `build.sh` therein.
// It's a simple way to generate wals for a single block, or multiple blocks, with random transactions,
// and different part sizes. The output is not deterministic.
// It should only have to be re-run if there is some breaking change to the consensus data structures (eg. blocks, votes)
// or to the behaviour of the app (eg. computes app hash differently)
var data_dir = path.Join(cmn.GoPath(), "src/github.com/tendermint/tendermint/consensus", "test_data")
//------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------
// WAL Tests // WAL Tests
@ -496,10 +489,13 @@ func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
// log.Notice("Build a blockchain by reading from the WAL") // log.Notice("Build a blockchain by reading from the WAL")
var blockParts *types.PartSet
var blocks []*types.Block var blocks []*types.Block
var commits []*types.Commit var commits []*types.Commit
var thisBlockParts *types.PartSet
var thisBlockCommit *types.Commit
var height int64
dec := NewWALDecoder(gr) dec := NewWALDecoder(gr)
for { for {
msg, err := dec.Decode() msg, err := dec.Decode()
@ -515,42 +511,60 @@ func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
} }
switch p := piece.(type) { switch p := piece.(type) {
case *types.PartSetHeader:
case EndHeightMessage:
// if its not the first one, we have a full block // if its not the first one, we have a full block
if blockParts != nil {
if thisBlockParts != nil {
var n int var n int
block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), 0, &n, &err).(*types.Block)
block := wire.ReadBinary(&types.Block{}, thisBlockParts.GetReader(), 0, &n, &err).(*types.Block)
if err != nil {
panic(err)
}
if block.Height != height+1 {
panic(cmn.Fmt("read bad block from wal. got height %d, expected %d", block.Height, height+1))
}
commitHeight := thisBlockCommit.Precommits[0].Height
if commitHeight != height+1 {
panic(cmn.Fmt("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
}
blocks = append(blocks, block) blocks = append(blocks, block)
commits = append(commits, thisBlockCommit)
height += 1
} }
blockParts = types.NewPartSetFromHeader(*p)
case *types.PartSetHeader:
thisBlockParts = types.NewPartSetFromHeader(*p)
case *types.Part: case *types.Part:
_, err := blockParts.AddPart(p, false)
_, err := thisBlockParts.AddPart(p, false)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
case *types.Vote: case *types.Vote:
if p.Type == types.VoteTypePrecommit { if p.Type == types.VoteTypePrecommit {
commit := &types.Commit{
thisBlockCommit = &types.Commit{
BlockID: p.BlockID, BlockID: p.BlockID,
Precommits: []*types.Vote{p}, Precommits: []*types.Vote{p},
} }
commits = append(commits, commit)
} }
} }
} }
// grab the last block too // grab the last block too
var n int var n int
block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), 0, &n, &err).(*types.Block)
block := wire.ReadBinary(&types.Block{}, thisBlockParts.GetReader(), 0, &n, &err).(*types.Block)
if err != nil {
panic(err)
}
if block.Height != height+1 {
panic(cmn.Fmt("read bad block from wal. got height %d, expected %d", block.Height, height+1))
}
commitHeight := thisBlockCommit.Precommits[0].Height
if commitHeight != height+1 {
panic(cmn.Fmt("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
}
blocks = append(blocks, block) blocks = append(blocks, block)
commits = append(commits, thisBlockCommit)
return blocks, commits, nil return blocks, commits, nil
} }
func readPieceFromWAL(msg *TimedWALMessage) interface{} { func readPieceFromWAL(msg *TimedWALMessage) interface{} {
// skip meta messages
if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil
}
// for logging // for logging
switch m := msg.Msg.(type) { switch m := msg.Msg.(type) {
case msgInfo: case msgInfo:
@ -562,6 +576,8 @@ func readPieceFromWAL(msg *TimedWALMessage) interface{} {
case *VoteMessage: case *VoteMessage:
return msg.Vote return msg.Vote
} }
case EndHeightMessage:
return m
} }
return nil return nil


+ 9
- 1
consensus/wal_generator.go View File

@ -77,7 +77,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
var b bytes.Buffer var b bytes.Buffer
wr := bufio.NewWriter(&b) wr := bufio.NewWriter(&b)
numBlocksWritten := make(chan struct{}) numBlocksWritten := make(chan struct{})
wal := &byteBufferWAL{enc: NewWALEncoder(wr), heightToStop: int64(numBlocks), signalWhenStopsTo: numBlocksWritten}
wal := newByteBufferWAL(NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
// see wal.go#103 // see wal.go#103
wal.Save(EndHeightMessage{0}) wal.Save(EndHeightMessage{0})
consensusState.wal = wal consensusState.wal = wal
@ -147,6 +147,14 @@ type byteBufferWAL struct {
// needed for determinism // needed for determinism
var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z") var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
func newByteBufferWAL(enc *WALEncoder, nBlocks int64, signalStop chan struct{}) *byteBufferWAL {
return &byteBufferWAL{
enc: enc,
heightToStop: nBlocks,
signalWhenStopsTo: signalStop,
}
}
// Save writes message to the internal buffer except when heightToStop is // Save writes message to the internal buffer except when heightToStop is
// reached, in which case it will signal the caller via signalWhenStopsTo and // reached, in which case it will signal the caller via signalWhenStopsTo and
// skip writing. // skip writing.


Loading…
Cancel
Save