Browse Source

tx indexing (Refs #237)

save transactions to blockstore

move to a separate module

benchmark KVIndexer

batch write transactions

Benchmarks:

```
BenchmarkKVIndexerIndex-2         100000            516300 ns/op
PASS
ok      github.com/tendermint/tendermint/blockchain/tx  56.506s

5,16 s for 10000 transactions
1 s for 2000 transactions
```

```
BenchmarkKVIndexerIndex-2       h 3000000             8622 ns/op
PASS
ok      github.com/tendermint/tendermint/blockchain/tx  34.210s

86 ms for 10000 transactions
16 ms for 2000 transactions
```

```
BenchmarkKVIndexerIndex1-2               5000000              7160 ns/op
BenchmarkKVIndexerIndex500-2               20000           1750411 ns/op
BenchmarkKVIndexerIndex1000-2              10000           3573973 ns/op
BenchmarkKVIndexerIndex2000-2               5000           7836851 ns/op
BenchmarkKVIndexerIndex10000-2              1000          33438980 ns/op
PASS
ok      github.com/tendermint/tendermint/blockchain/tx  209.482s

7,8 ms for 2000 transactions
```

[state] write test for ApplyBlock

review comments

- move txindexer to state
- fix type

save Tx Index as well

do not store tx itself in the result
pull/412/head
Anton Kaliaev 8 years ago
parent
commit
c3f1b08b6a
No known key found for this signature in database GPG Key ID: 7B6881D965918214
13 changed files with 295 additions and 49 deletions
  1. +2
    -4
      blockchain/pool_test.go
  2. +1
    -1
      consensus/state.go
  3. +9
    -1
      glide.lock
  4. +1
    -1
      node/node.go
  5. +60
    -42
      state/execution.go
  6. +17
    -0
      state/state.go
  7. +21
    -0
      state/tx/indexer.go
  8. +32
    -0
      state/tx/indexer/batch.go
  9. +6
    -0
      state/tx/indexer/error.go
  10. +55
    -0
      state/tx/indexer/kv.go
  11. +61
    -0
      state/tx/indexer/kv_test.go
  12. +16
    -0
      state/tx/indexer/null.go
  13. +14
    -0
      types/tx_result.go

+ 2
- 4
blockchain/pool_test.go View File

@ -35,6 +35,7 @@ func TestBasic(t *testing.T) {
requestsCh := make(chan BlockRequest, 100)
pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool.Start()
defer pool.Stop()
// Introduce each peer.
go func() {
@ -76,8 +77,6 @@ func TestBasic(t *testing.T) {
}()
}
}
pool.Stop()
}
func TestTimeout(t *testing.T) {
@ -87,6 +86,7 @@ func TestTimeout(t *testing.T) {
requestsCh := make(chan BlockRequest, 100)
pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool.Start()
defer pool.Stop()
for _, peer := range peers {
log.Info("Peer", "id", peer.id)
@ -131,6 +131,4 @@ func TestTimeout(t *testing.T) {
log.Info("TEST: Pulled new BlockRequest", "request", request)
}
}
pool.Stop()
}

+ 1
- 1
consensus/state.go View File

@ -283,7 +283,7 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap
//----------------------------------------
// Public interface
// implements events.Eventable
// SetEventSwitch implements events.Eventable
func (cs *ConsensusState) SetEventSwitch(evsw types.EventSwitch) {
cs.evsw = evsw
}


+ 9
- 1
glide.lock View File

@ -164,4 +164,12 @@ imports:
- stats
- tap
- transport
testImports: []
testImports:
- name: github.com/davecgh/go-spew
version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9
subpackages:
- spew
- name: github.com/pmezard/go-difflib
version: d8ed2627bdf02c080bf22230dbb337003b7aba2d
subpackages:
- difflib

+ 1
- 1
node/node.go View File

@ -82,7 +82,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
}
// reload the state (it may have been updated by the handshake)
state = sm.LoadState(stateDB)
state = sm.GetState(config, stateDB)
// Generate node PrivKey
privKey := crypto.GenPrivKeyEd25519()


+ 60
- 42
state/execution.go View File

@ -2,26 +2,26 @@ package state
import (
"errors"
"fmt"
"github.com/ebuchman/fail-test"
fail "github.com/ebuchman/fail-test"
abci "github.com/tendermint/abci/types"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-crypto"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/proxy"
txindexer "github.com/tendermint/tendermint/state/tx/indexer"
"github.com/tendermint/tendermint/types"
)
//--------------------------------------------------
// Execute the block
// Execute the block to mutate State.
// Validates block and then executes Data.Txs in the block.
func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error {
// ExecBlock executes the block to mutate State.
// + validates the block
// + executes block.Txs on the proxyAppConn
// + updates validator sets
// + returns block.Txs results
func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) ([]*types.TxResult, error) {
// Validate the block.
if err := s.validateBlock(block); err != nil {
return ErrInvalidBlock(err)
return nil, ErrInvalidBlock(err)
}
// compute bitarray of validators that signed
@ -33,11 +33,11 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC
nextValSet := valSet.Copy()
// Execute the block txs
changedValidators, err := execBlockOnProxyApp(eventCache, proxyAppConn, block)
txResults, changedValidators, err := execBlockOnProxyApp(eventCache, proxyAppConn, block)
if err != nil {
// There was some error in proxyApp
// TODO Report error and wait for proxyApp to be available.
return ErrProxyAppConn(err)
return nil, ErrProxyAppConn(err)
}
// update the validator set
@ -54,16 +54,22 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC
fail.Fail() // XXX
return nil
return txResults, nil
}
// Executes block's transactions on proxyAppConn.
// Returns a list of updates to the validator set
// Returns a list of transaction results and updates to the validator set
// TODO: Generate a bitmap or otherwise store tx validity in state.
func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) ([]*abci.Validator, error) {
func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) ([]*types.TxResult, []*abci.Validator, error) {
var validTxs, invalidTxs = 0, 0
txResults := make([]*types.TxResult, len(block.Txs))
txHashToIndexMap := make(map[string]int)
for index, tx := range block.Txs {
txHashToIndexMap[string(tx.Hash())] = index
}
// Execute transactions and get hash
proxyCb := func(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) {
@ -73,21 +79,28 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
// Blocks may include invalid txs.
// reqDeliverTx := req.(abci.RequestDeliverTx)
txError := ""
apTx := r.DeliverTx
if apTx.Code == abci.CodeType_OK {
validTxs += 1
txResult := r.DeliverTx
if txResult.Code == abci.CodeType_OK {
validTxs++
} else {
log.Debug("Invalid tx", "code", r.DeliverTx.Code, "log", r.DeliverTx.Log)
invalidTxs += 1
txError = apTx.Code.String()
log.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log)
invalidTxs++
txError = txResult.Code.String()
}
tx := types.Tx(req.GetDeliverTx().Tx)
index, ok := txHashToIndexMap[string(tx.Hash())]
if ok {
txResults[index] = &types.TxResult{uint64(block.Height), uint32(index), *txResult}
}
// NOTE: if we count we can access the tx from the block instead of
// pulling it from the req
event := types.EventDataTx{
Tx: req.GetDeliverTx().Tx,
Data: apTx.Data,
Code: apTx.Code,
Log: apTx.Log,
Tx: tx,
Data: txResult.Data,
Code: txResult.Code,
Log: txResult.Log,
Error: txError,
}
types.FireEventTx(eventCache, event)
@ -99,7 +112,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header))
if err != nil {
log.Warn("Error in proxyAppConn.BeginBlock", "error", err)
return nil, err
return nil, nil, err
}
fail.Fail() // XXX
@ -109,7 +122,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
fail.FailRand(len(block.Txs)) // XXX
proxyAppConn.DeliverTxAsync(tx)
if err := proxyAppConn.Error(); err != nil {
return nil, err
return nil, nil, err
}
}
@ -119,7 +132,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
respEndBlock, err := proxyAppConn.EndBlockSync(uint64(block.Height))
if err != nil {
log.Warn("Error in proxyAppConn.EndBlock", "error", err)
return nil, err
return nil, nil, err
}
fail.Fail() // XXX
@ -128,7 +141,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
if len(respEndBlock.Diffs) > 0 {
log.Info("Update to validator set", "updates", abci.ValidatorsString(respEndBlock.Diffs))
}
return respEndBlock.Diffs, nil
return txResults, respEndBlock.Diffs, nil
}
func updateValidators(validators *types.ValidatorSet, changedValidators []*abci.Validator) error {
@ -218,26 +231,31 @@ func (s *State) validateBlock(block *types.Block) error {
return nil
}
//-----------------------------------------------------------------------------
// ApplyBlock executes the block, then commits and updates the mempool atomically
// Execute and commit block against app, save block and state
// ApplyBlock executes the block, then commits and updates the mempool
// atomically, optionally indexing transaction results.
func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus,
block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error {
// Run the block on the State:
// + update validator sets
// + run txs on the proxyAppConn
err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader)
txResults, err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader)
if err != nil {
return errors.New(Fmt("Exec failed for application: %v", err))
return fmt.Errorf("Exec failed for application: %v", err)
}
// lock mempool, commit state, update mempoool
err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool)
if err != nil {
return errors.New(Fmt("Commit failed for application: %v", err))
return fmt.Errorf("Commit failed for application: %v", err)
}
batch := txindexer.NewBatch()
for i, r := range txResults {
if r != nil {
tx := block.Txs[i]
batch.Index(string(tx.Hash()), *r)
}
}
s.TxIndexer.Batch(batch)
return nil
}
@ -272,7 +290,7 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
// Returns the application root hash (result of abci.Commit)
func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) {
var eventCache types.Fireable // nil
_, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
_, _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
if err != nil {
log.Warn("Error executing block on proxy app", "height", block.Height, "err", err)
return nil, err


+ 17
- 0
state/state.go View File

@ -10,6 +10,8 @@ import (
cfg "github.com/tendermint/go-config"
dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/state/tx"
txindexer "github.com/tendermint/tendermint/state/tx/indexer"
"github.com/tendermint/tendermint/types"
)
@ -38,6 +40,8 @@ type State struct {
// AppHash is updated after Commit
AppHash []byte
TxIndexer tx.Indexer `json:"-"` // Transaction indexer.
}
func LoadState(db dbm.DB) *State {
@ -72,6 +76,7 @@ func (s *State) Copy() *State {
Validators: s.Validators.Copy(),
LastValidators: s.LastValidators.Copy(),
AppHash: s.AppHash,
TxIndexer: s.TxIndexer, // pointer here, not value
}
}
@ -125,12 +130,20 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State {
state = MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
state.Save()
}
// Transaction indexing
store := dbm.NewDB("tx_indexer", config.GetString("db_backend"), config.GetString("db_dir"))
state.TxIndexer = txindexer.NewKV(store)
return state
}
//-----------------------------------------------------------------------------
// Genesis
// MakeGenesisStateFromFile reads and unmarshals state from the given file.
//
// Used during replay and in tests.
func MakeGenesisStateFromFile(db dbm.DB, genDocFile string) *State {
genDocJSON, err := ioutil.ReadFile(genDocFile)
if err != nil {
@ -143,6 +156,9 @@ func MakeGenesisStateFromFile(db dbm.DB, genDocFile string) *State {
return MakeGenesisState(db, genDoc)
}
// MakeGenesisState creates state from types.GenesisDoc.
//
// Used in tests.
func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) *State {
if len(genDoc.Validators) == 0 {
Exit(Fmt("The genesis file has no validators"))
@ -176,5 +192,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) *State {
Validators: types.NewValidatorSet(validators),
LastValidators: types.NewValidatorSet(nil),
AppHash: genDoc.AppHash,
TxIndexer: &txindexer.Null{}, // we do not need indexer during replay and in tests
}
}

+ 21
- 0
state/tx/indexer.go View File

@ -0,0 +1,21 @@
package tx
import (
txindexer "github.com/tendermint/tendermint/state/tx/indexer"
"github.com/tendermint/tendermint/types"
)
// Indexer interface defines methods to index and search transactions.
type Indexer interface {
// Batch analyzes, indexes or stores a batch of transactions.
//
// NOTE We do not specify Index method for analyzing a single transaction
// here because it bears heavy perfomance loses. Almost all advanced indexers
// support batching.
Batch(b *txindexer.Batch) error
// Tx returns specified transaction or nil if the transaction is not indexed
// or stored.
Tx(hash string) (*types.TxResult, error)
}

+ 32
- 0
state/tx/indexer/batch.go View File

@ -0,0 +1,32 @@
package indexer
import "github.com/tendermint/tendermint/types"
// A Batch groups together multiple Index operations you would like performed
// at the same time. The Batch structure is NOT thread-safe. You should only
// perform operations on a batch from a single thread at a time. Once batch
// execution has started, you may not modify it.
type Batch struct {
Ops map[string]types.TxResult
}
// NewBatch creates a new Batch.
func NewBatch() *Batch {
return &Batch{
Ops: make(map[string]types.TxResult),
}
}
// Index adds or updates entry for the given hash.
func (b *Batch) Index(hash string, result types.TxResult) error {
if hash == "" {
return ErrorEmptyHash
}
b.Ops[hash] = result
return nil
}
// Size returns the total number of operations inside the batch.
func (b *Batch) Size() int {
return len(b.Ops)
}

+ 6
- 0
state/tx/indexer/error.go View File

@ -0,0 +1,6 @@
package indexer
import "errors"
// ErrorEmptyHash indicates empty hash
var ErrorEmptyHash = errors.New("Transaction hash cannot be empty")

+ 55
- 0
state/tx/indexer/kv.go View File

@ -0,0 +1,55 @@
package indexer
import (
"bytes"
"fmt"
db "github.com/tendermint/go-db"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types"
)
// KV is a simplest possible indexer, backed by Key-Value storage (levelDB).
// It could only index transaction by its identifier.
type KV struct {
store db.DB
}
// NewKV returns new instance of KV indexer.
func NewKV(store db.DB) *KV {
return &KV{store: store}
}
// Tx gets transaction from the KV storage and returns it or nil if the
// transaction is not found.
func (indexer *KV) Tx(hash string) (*types.TxResult, error) {
if hash == "" {
return nil, ErrorEmptyHash
}
rawBytes := indexer.store.Get([]byte(hash))
if rawBytes == nil {
return nil, nil
}
r := bytes.NewReader(rawBytes)
var n int
var err error
txResult := wire.ReadBinary(&types.TxResult{}, r, 0, &n, &err).(*types.TxResult)
if err != nil {
return nil, fmt.Errorf("Error reading TxResult: %v", err)
}
return txResult, nil
}
// Batch writes a batch of transactions into the KV storage.
func (indexer *KV) Batch(b *Batch) error {
storeBatch := indexer.store.NewBatch()
for hash, result := range b.Ops {
rawBytes := wire.BinaryBytes(&result)
storeBatch.Set([]byte(hash), rawBytes)
}
storeBatch.Write()
return nil
}

+ 61
- 0
state/tx/indexer/kv_test.go View File

@ -0,0 +1,61 @@
package indexer
import (
"fmt"
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/abci/types"
db "github.com/tendermint/go-db"
"github.com/tendermint/tendermint/types"
)
func TestKVIndex(t *testing.T) {
indexer := &KV{store: db.NewMemDB()}
tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 1, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}}
hash := string(tx.Hash())
batch := NewBatch()
batch.Index(hash, *txResult)
err := indexer.Batch(batch)
require.Nil(t, err)
loadedTxResult, err := indexer.Tx(hash)
require.Nil(t, err)
assert.Equal(t, txResult, loadedTxResult)
}
func benchmarkKVIndex(txsCount int, b *testing.B) {
txResult := &types.TxResult{1, 1, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}}
dir, err := ioutil.TempDir("", "tx_indexer_db")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(dir)
store := db.NewDB("tx_indexer", "leveldb", dir)
indexer := &KV{store: store}
batch := NewBatch()
for i := 0; i < txsCount; i++ {
batch.Index(fmt.Sprintf("hash%v", i), *txResult)
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
err = indexer.Batch(batch)
}
}
func BenchmarkKVIndex1(b *testing.B) { benchmarkKVIndex(1, b) }
func BenchmarkKVIndex500(b *testing.B) { benchmarkKVIndex(500, b) }
func BenchmarkKVIndex1000(b *testing.B) { benchmarkKVIndex(1000, b) }
func BenchmarkKVIndex2000(b *testing.B) { benchmarkKVIndex(2000, b) }
func BenchmarkKVIndex10000(b *testing.B) { benchmarkKVIndex(10000, b) }

+ 16
- 0
state/tx/indexer/null.go View File

@ -0,0 +1,16 @@
package indexer
import "github.com/tendermint/tendermint/types"
// Null acts as a /dev/null.
type Null struct{}
// Tx panics.
func (indexer *Null) Tx(hash string) (*types.TxResult, error) {
panic("You are trying to get the transaction from a null indexer")
}
// Batch returns nil.
func (indexer *Null) Batch(batch *Batch) error {
return nil
}

+ 14
- 0
types/tx_result.go View File

@ -0,0 +1,14 @@
package types
import (
abci "github.com/tendermint/abci/types"
)
// TxResult contains results of executing the transaction.
//
// One usage is indexing transaction results.
type TxResult struct {
Height uint64
Index uint32
DeliverTxResponse abci.ResponseDeliverTx
}

Loading…
Cancel
Save