diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 9c631f5f9..e652df3fd 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -14,10 +14,8 @@ import ( "github.com/tendermint/tendermint/types" ) -func newBlockchainReactor(maxBlockHeight int64) *BlockchainReactor { - logger := log.TestingLogger() +func makeStateAndBlockStore(logger log.Logger) (*sm.State, *BlockStore) { config := cfg.ResetTestRoot("blockchain_reactor_test") - blockStore := NewBlockStore(dbm.NewMemDB()) // Get State @@ -25,6 +23,12 @@ func newBlockchainReactor(maxBlockHeight int64) *BlockchainReactor { state.SetLogger(logger.With("module", "state")) state.Save() + return state, blockStore +} + +func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainReactor { + state, blockStore := makeStateAndBlockStore(logger) + // Make the blockchainReactor itself fastSync := true bcReactor := NewBlockchainReactor(state.Copy(), nil, blockStore, fastSync) @@ -47,7 +51,7 @@ func newBlockchainReactor(maxBlockHeight int64) *BlockchainReactor { func TestNoBlockMessageResponse(t *testing.T) { maxBlockHeight := int64(20) - bcr := newBlockchainReactor(maxBlockHeight) + bcr := newBlockchainReactor(log.NewNopLogger(), maxBlockHeight) bcr.Start() defer bcr.Stop() diff --git a/blockchain/store.go b/blockchain/store.go index c77f67ed8..1033999fe 100644 --- a/blockchain/store.go +++ b/blockchain/store.go @@ -14,7 +14,7 @@ import ( ) /* -Simple low level store for blocks. +BlockStore is a simple low level store for blocks. There are three types of information stored: - BlockMeta: Meta information about each block @@ -23,7 +23,7 @@ There are three types of information stored: Currently the precommit signatures are duplicated in the Block parts as well as the Commit. In the future this may change, perhaps by moving -the Commit data outside the Block. +the Commit data outside the Block. (TODO) // NOTE: BlockStore methods will panic if they encounter errors // deserializing loaded data, indicating probable corruption on disk. @@ -35,6 +35,8 @@ type BlockStore struct { height int64 } +// NewBlockStore returns a new BlockStore with the given DB, +// initialized to the last height that was committed to the DB. func NewBlockStore(db dbm.DB) *BlockStore { bsjson := LoadBlockStoreStateJSON(db) return &BlockStore{ @@ -43,13 +45,16 @@ func NewBlockStore(db dbm.DB) *BlockStore { } } -// Height() returns the last known contiguous block height. +// Height returns the last known contiguous block height. func (bs *BlockStore) Height() int64 { bs.mtx.RLock() defer bs.mtx.RUnlock() return bs.height } +// GetReader returns the value associated with the given key wrapped in an io.Reader. +// If no value is found, it returns nil. +// It's mainly for use with wire.ReadBinary. func (bs *BlockStore) GetReader(key []byte) io.Reader { bytez := bs.db.Get(key) if bytez == nil { @@ -58,6 +63,8 @@ func (bs *BlockStore) GetReader(key []byte) io.Reader { return bytes.NewReader(bytez) } +// LoadBlock returns the block with the given height. +// If no block is found for that height, it returns nil. func (bs *BlockStore) LoadBlock(height int64) *types.Block { var n int var err error @@ -81,6 +88,9 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block { return block } +// LoadBlockPart returns the Part at the given index +// from the block at the given height. +// If no part is found for the given height and index, it returns nil. func (bs *BlockStore) LoadBlockPart(height int64, index int) *types.Part { var n int var err error @@ -95,6 +105,8 @@ func (bs *BlockStore) LoadBlockPart(height int64, index int) *types.Part { return part } +// LoadBlockMeta returns the BlockMeta for the given height. +// If no block is found for the given height, it returns nil. func (bs *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta { var n int var err error @@ -109,8 +121,10 @@ func (bs *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta { return blockMeta } -// The +2/3 and other Precommit-votes for block at `height`. -// This Commit comes from block.LastCommit for `height+1`. +// LoadBlockCommit returns the Commit for the given height. +// This commit consists of the +2/3 and other Precommit-votes for block at `height`, +// and it comes from the block.LastCommit for `height+1`. +// If no commit is found for the given height, it returns nil. func (bs *BlockStore) LoadBlockCommit(height int64) *types.Commit { var n int var err error @@ -125,7 +139,9 @@ func (bs *BlockStore) LoadBlockCommit(height int64) *types.Commit { return commit } -// NOTE: the Precommit-vote heights are for the block at `height` +// LoadSeenCommit returns the locally seen Commit for the given height. +// This is useful when we've seen a commit, but there has not yet been +// a new block at `height + 1` that includes this commit in its block.LastCommit. func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit { var n int var err error @@ -140,15 +156,19 @@ func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit { return commit } +// SaveBlock persists the given block, blockParts, and seenCommit to the underlying db. // blockParts: Must be parts of the block // seenCommit: The +2/3 precommits that were seen which committed at height. // If all the nodes restart after committing a block, // we need this to reload the precommits to catch-up nodes to the // most recent height. Otherwise they'd stall at H-1. func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { + if block == nil { + cmn.PanicSanity("BlockStore can only save a non-nil block") + } height := block.Height - if height != bs.Height()+1 { - cmn.PanicSanity(cmn.Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height)) + if g, w := height, bs.Height()+1; g != w { + cmn.PanicSanity(cmn.Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", w, g)) } if !blockParts.IsComplete() { cmn.PanicSanity(cmn.Fmt("BlockStore can only save complete block part sets")) @@ -219,6 +239,7 @@ type BlockStoreStateJSON struct { Height int64 } +// Save persists the blockStore state to the database as JSON. func (bsj BlockStoreStateJSON) Save(db dbm.DB) { bytes, err := json.Marshal(bsj) if err != nil { @@ -227,6 +248,8 @@ func (bsj BlockStoreStateJSON) Save(db dbm.DB) { db.SetSync(blockStoreKey, bytes) } +// LoadBlockStoreStateJSON returns the BlockStoreStateJSON as loaded from disk. +// If no BlockStoreStateJSON was previously persisted, it returns the zero value. func LoadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON { bytes := db.Get(blockStoreKey) if bytes == nil { diff --git a/blockchain/store_test.go b/blockchain/store_test.go new file mode 100644 index 000000000..2b77371f6 --- /dev/null +++ b/blockchain/store_test.go @@ -0,0 +1,413 @@ +package blockchain + +import ( + "bytes" + "fmt" + "io/ioutil" + "runtime/debug" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/log" +) + +func TestLoadBlockStoreStateJSON(t *testing.T) { + db := db.NewMemDB() + + bsj := &BlockStoreStateJSON{Height: 1000} + bsj.Save(db) + + retrBSJ := LoadBlockStoreStateJSON(db) + + assert.Equal(t, *bsj, retrBSJ, "expected the retrieved DBs to match") +} + +func TestNewBlockStore(t *testing.T) { + db := db.NewMemDB() + db.Set(blockStoreKey, []byte(`{"height": 10000}`)) + bs := NewBlockStore(db) + assert.Equal(t, bs.Height(), int64(10000), "failed to properly parse blockstore") + + panicCausers := []struct { + data []byte + wantErr string + }{ + {[]byte("artful-doger"), "not unmarshal bytes"}, + {[]byte(""), "unmarshal bytes"}, + {[]byte(" "), "unmarshal bytes"}, + } + + for i, tt := range panicCausers { + // Expecting a panic here on trying to parse an invalid blockStore + _, _, panicErr := doFn(func() (interface{}, error) { + db.Set(blockStoreKey, tt.data) + _ = NewBlockStore(db) + return nil, nil + }) + require.NotNil(t, panicErr, "#%d panicCauser: %q expected a panic", i, tt.data) + assert.Contains(t, panicErr.Error(), tt.wantErr, "#%d data: %q", i, tt.data) + } + + db.Set(blockStoreKey, nil) + bs = NewBlockStore(db) + assert.Equal(t, bs.Height(), int64(0), "expecting nil bytes to be unmarshaled alright") +} + +func TestBlockStoreGetReader(t *testing.T) { + db := db.NewMemDB() + // Initial setup + db.Set([]byte("Foo"), []byte("Bar")) + db.Set([]byte("Foo1"), nil) + + bs := NewBlockStore(db) + + tests := [...]struct { + key []byte + want []byte + }{ + 0: {key: []byte("Foo"), want: []byte("Bar")}, + 1: {key: []byte("KnoxNonExistent"), want: nil}, + 2: {key: []byte("Foo1"), want: nil}, + } + + for i, tt := range tests { + r := bs.GetReader(tt.key) + if r == nil { + assert.Nil(t, tt.want, "#%d: expected a non-nil reader", i) + continue + } + slurp, err := ioutil.ReadAll(r) + if err != nil { + t.Errorf("#%d: unexpected Read err: %v", i, err) + } else { + assert.Equal(t, slurp, tt.want, "#%d: mismatch", i) + } + } +} + +func freshBlockStore() (*BlockStore, db.DB) { + db := db.NewMemDB() + return NewBlockStore(db), db +} + +var ( + state, _ = makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) + + block = makeBlock(1, state) + partSet = block.MakePartSet(2) + part1 = partSet.GetPart(0) + part2 = partSet.GetPart(1) + seenCommit1 = &types.Commit{Precommits: []*types.Vote{{Height: 10}}} +) + +// TODO: This test should be simplified ... + +func TestBlockStoreSaveLoadBlock(t *testing.T) { + state, bs := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) + require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") + + // check there are no blocks at various heights + noBlockHeights := []int64{0, -1, 100, 1000, 2} + for i, height := range noBlockHeights { + if g := bs.LoadBlock(height); g != nil { + t.Errorf("#%d: height(%d) got a block; want nil", i, height) + } + } + + // save a block + block := makeBlock(bs.Height()+1, state) + validPartSet := block.MakePartSet(2) + seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10}}} + bs.SaveBlock(block, partSet, seenCommit) + require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed") + + incompletePartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 2}) + uncontiguousPartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 0}) + uncontiguousPartSet.AddPart(part2, false) + + header1 := types.Header{ + Height: 1, + NumTxs: 100, + ChainID: "block_test", + Time: time.Now(), + } + header2 := header1 + header2.Height = 4 + + // End of setup, test data + + commitAtH10 := &types.Commit{Precommits: []*types.Vote{{Height: 10}}} + tuples := []struct { + block *types.Block + parts *types.PartSet + seenCommit *types.Commit + wantErr bool + wantPanic string + + corruptBlockInDB bool + corruptCommitInDB bool + corruptSeenCommitInDB bool + eraseCommitInDB bool + eraseSeenCommitInDB bool + }{ + { + block: newBlock(&header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + }, + + { + block: nil, + wantPanic: "only save a non-nil block", + }, + + { + block: newBlock(&header2, commitAtH10), + parts: uncontiguousPartSet, + wantPanic: "only save contiguous blocks", // and incomplete and uncontiguous parts + }, + + { + block: newBlock(&header1, commitAtH10), + parts: incompletePartSet, + wantPanic: "only save complete block", // incomplete parts + }, + + { + block: newBlock(&header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + corruptCommitInDB: true, // Corrupt the DB's commit entry + wantPanic: "rror reading commit", + }, + + { + block: newBlock(&header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + wantPanic: "rror reading block", + corruptBlockInDB: true, // Corrupt the DB's block entry + }, + + { + block: newBlock(&header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + + // Expecting no error and we want a nil back + eraseSeenCommitInDB: true, + }, + + { + block: newBlock(&header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + + corruptSeenCommitInDB: true, + wantPanic: "rror reading commit", + }, + + { + block: newBlock(&header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + + // Expecting no error and we want a nil back + eraseCommitInDB: true, + }, + } + + type quad struct { + block *types.Block + commit *types.Commit + meta *types.BlockMeta + + seenCommit *types.Commit + } + + for i, tuple := range tuples { + bs, db := freshBlockStore() + // SaveBlock + res, err, panicErr := doFn(func() (interface{}, error) { + bs.SaveBlock(tuple.block, tuple.parts, tuple.seenCommit) + if tuple.block == nil { + return nil, nil + } + + if tuple.corruptBlockInDB { + db.Set(calcBlockMetaKey(tuple.block.Height), []byte("block-bogus")) + } + bBlock := bs.LoadBlock(tuple.block.Height) + bBlockMeta := bs.LoadBlockMeta(tuple.block.Height) + + if tuple.eraseSeenCommitInDB { + db.Delete(calcSeenCommitKey(tuple.block.Height)) + } + if tuple.corruptSeenCommitInDB { + db.Set(calcSeenCommitKey(tuple.block.Height), []byte("bogus-seen-commit")) + } + bSeenCommit := bs.LoadSeenCommit(tuple.block.Height) + + commitHeight := tuple.block.Height - 1 + if tuple.eraseCommitInDB { + db.Delete(calcBlockCommitKey(commitHeight)) + } + if tuple.corruptCommitInDB { + db.Set(calcBlockCommitKey(commitHeight), []byte("foo-bogus")) + } + bCommit := bs.LoadBlockCommit(commitHeight) + return &quad{block: bBlock, seenCommit: bSeenCommit, commit: bCommit, meta: bBlockMeta}, nil + }) + + if subStr := tuple.wantPanic; subStr != "" { + if panicErr == nil { + t.Errorf("#%d: want a non-nil panic", i) + } else if got := panicErr.Error(); !strings.Contains(got, subStr) { + t.Errorf("#%d:\n\tgotErr: %q\nwant substring: %q", i, got, subStr) + } + continue + } + + if tuple.wantErr { + if err == nil { + t.Errorf("#%d: got nil error", i) + } + continue + } + + assert.Nil(t, panicErr, "#%d: unexpected panic", i) + assert.Nil(t, err, "#%d: expecting a non-nil error", i) + qua, ok := res.(*quad) + if !ok || qua == nil { + t.Errorf("#%d: got nil quad back; gotType=%T", i, res) + continue + } + if tuple.eraseSeenCommitInDB { + assert.Nil(t, qua.seenCommit, "erased the seenCommit in the DB hence we should get back a nil seenCommit") + } + if tuple.eraseCommitInDB { + assert.Nil(t, qua.commit, "erased the commit in the DB hence we should get back a nil commit") + } + } +} + +func binarySerializeIt(v interface{}) []byte { + var n int + var err error + buf := new(bytes.Buffer) + wire.WriteBinary(v, buf, &n, &err) + return buf.Bytes() +} + +func TestLoadBlockPart(t *testing.T) { + bs, db := freshBlockStore() + height, index := int64(10), 1 + loadPart := func() (interface{}, error) { + part := bs.LoadBlockPart(height, index) + return part, nil + } + + // Initially no contents. + // 1. Requesting for a non-existent block shouldn't fail + res, _, panicErr := doFn(loadPart) + require.Nil(t, panicErr, "a non-existent block part shouldn't cause a panic") + require.Nil(t, res, "a non-existent block part should return nil") + + // 2. Next save a corrupted block then try to load it + db.Set(calcBlockPartKey(height, index), []byte("Tendermint")) + res, _, panicErr = doFn(loadPart) + require.NotNil(t, panicErr, "expecting a non-nil panic") + require.Contains(t, panicErr.Error(), "Error reading block part") + + // 3. A good block serialized and saved to the DB should be retrievable + db.Set(calcBlockPartKey(height, index), binarySerializeIt(part1)) + gotPart, _, panicErr := doFn(loadPart) + require.Nil(t, panicErr, "an existent and proper block should not panic") + require.Nil(t, res, "a properly saved block should return a proper block") + require.Equal(t, gotPart.(*types.Part).Hash(), part1.Hash(), "expecting successful retrieval of previously saved block") +} + +func TestLoadBlockMeta(t *testing.T) { + bs, db := freshBlockStore() + height := int64(10) + loadMeta := func() (interface{}, error) { + meta := bs.LoadBlockMeta(height) + return meta, nil + } + + // Initially no contents. + // 1. Requesting for a non-existent blockMeta shouldn't fail + res, _, panicErr := doFn(loadMeta) + require.Nil(t, panicErr, "a non-existent blockMeta shouldn't cause a panic") + require.Nil(t, res, "a non-existent blockMeta should return nil") + + // 2. Next save a corrupted blockMeta then try to load it + db.Set(calcBlockMetaKey(height), []byte("Tendermint-Meta")) + res, _, panicErr = doFn(loadMeta) + require.NotNil(t, panicErr, "expecting a non-nil panic") + require.Contains(t, panicErr.Error(), "Error reading block meta") + + // 3. A good blockMeta serialized and saved to the DB should be retrievable + meta := &types.BlockMeta{} + db.Set(calcBlockMetaKey(height), binarySerializeIt(meta)) + gotMeta, _, panicErr := doFn(loadMeta) + require.Nil(t, panicErr, "an existent and proper block should not panic") + require.Nil(t, res, "a properly saved blockMeta should return a proper blocMeta ") + require.Equal(t, binarySerializeIt(meta), binarySerializeIt(gotMeta), "expecting successful retrieval of previously saved blockMeta") +} + +func TestBlockFetchAtHeight(t *testing.T) { + state, bs := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) + require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") + block := makeBlock(bs.Height()+1, state) + + partSet := block.MakePartSet(2) + seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10}}} + + bs.SaveBlock(block, partSet, seenCommit) + require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed") + + blockAtHeight := bs.LoadBlock(bs.Height()) + require.Equal(t, block.Hash(), blockAtHeight.Hash(), "expecting a successful load of the last saved block") + + blockAtHeightPlus1 := bs.LoadBlock(bs.Height() + 1) + require.Nil(t, blockAtHeightPlus1, "expecting an unsuccessful load of Height()+1") + blockAtHeightPlus2 := bs.LoadBlock(bs.Height() + 2) + require.Nil(t, blockAtHeightPlus2, "expecting an unsuccessful load of Height()+2") +} + +func doFn(fn func() (interface{}, error)) (res interface{}, err error, panicErr error) { + defer func() { + if r := recover(); r != nil { + switch e := r.(type) { + case error: + panicErr = e + case string: + panicErr = fmt.Errorf("%s", e) + default: + if st, ok := r.(fmt.Stringer); ok { + panicErr = fmt.Errorf("%s", st) + } else { + panicErr = fmt.Errorf("%s", debug.Stack()) + } + } + } + }() + + res, err = fn() + return res, err, panicErr +} + +func newBlock(hdr *types.Header, lastCommit *types.Commit) *types.Block { + return &types.Block{ + Header: hdr, + LastCommit: lastCommit, + } +} diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 9f6a9490e..29c35548e 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -264,6 +264,7 @@ func (r *PEXReactor) ensurePeers() { if dialling := r.Switch.IsDialing(try); dialling { continue } + // XXX: does this work ?! if connected := r.Switch.Peers().Has(try.IP.String()); connected { continue }