From 58e3246ffc7d49ce76312278882a4e84ab417311 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 11 May 2018 12:09:41 +0400 Subject: [PATCH] batch index txs --- config/config.go | 2 +- state/execution.go | 14 ++++-------- state/txindex/indexer.go | 2 +- state/txindex/indexer_service.go | 37 ++++++++++++++++++++++++++------ state/txindex/kv/kv_test.go | 4 ++-- 5 files changed, 39 insertions(+), 20 deletions(-) diff --git a/config/config.go b/config/config.go index b5bd87cee..b76f5ed19 100644 --- a/config/config.go +++ b/config/config.go @@ -515,7 +515,7 @@ type TxIndexConfig struct { // DefaultTxIndexConfig returns a default configuration for the transaction indexer. func DefaultTxIndexConfig() *TxIndexConfig { return &TxIndexConfig{ - Indexer: "null", + Indexer: "kv", IndexTags: "", IndexAllTags: false, } diff --git a/state/execution.go b/state/execution.go index 0ce5e44f1..3fe35e2fa 100644 --- a/state/execution.go +++ b/state/execution.go @@ -341,23 +341,17 @@ func updateState(s State, blockID types.BlockID, header *types.Header, // Fire TxEvent for every tx. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) { - // NOTE: do we still need this buffer ? - txEventBuffer := types.NewTxEventBuffer(eventBus, int(block.NumTxs)) + eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) + eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) + for i, tx := range block.Data.Txs { - txEventBuffer.PublishEventTx(types.EventDataTx{types.TxResult{ + eventBus.PublishEventTx(types.EventDataTx{types.TxResult{ Height: block.Height, Index: uint32(i), Tx: tx, Result: *(abciResponses.DeliverTx[i]), }}) } - - eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) - eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) - err := txEventBuffer.Flush() - if err != nil { - logger.Error("Failed to flush event buffer", "err", err) - } } //---------------------------------------------------------------------------------------------------- diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index bd51fbb29..e23840f14 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -34,7 +34,7 @@ type Batch struct { } // NewBatch creates a new Batch. -func NewBatch(n int) *Batch { +func NewBatch(n int64) *Batch { return &Batch{ Ops: make([]*types.TxResult, n), } diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index f5420f631..dd12bdf9d 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -11,6 +11,8 @@ const ( subscriber = "IndexerService" ) +// IndexerService connects event bus and transaction indexer together in order +// to index transactions coming from event bus. type IndexerService struct { cmn.BaseService @@ -18,6 +20,7 @@ type IndexerService struct { eventBus *types.EventBus } +// NewIndexerService returns a new service instance. func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService { is := &IndexerService{idr: idr, eventBus: eventBus} is.BaseService = *cmn.NewBaseService(nil, "IndexerService", is) @@ -27,15 +30,37 @@ func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService // OnStart implements cmn.Service by subscribing for all transactions // and indexing them by tags. func (is *IndexerService) OnStart() error { - ch := make(chan interface{}) - if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, ch); err != nil { + blockHeadersCh := make(chan interface{}) + if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader, blockHeadersCh); err != nil { return err } + + txsCh := make(chan interface{}) + if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, txsCh); err != nil { + return err + } + go func() { - for event := range ch { - // TODO: may be not perfomant to write one event at a time - txResult := event.(types.EventDataTx).TxResult - is.idr.Index(&txResult) + var numTxs, got int64 + var batch *Batch + for { + select { + case e := <-blockHeadersCh: + numTxs = e.(types.EventDataNewBlockHeader).Header.NumTxs + batch = NewBatch(numTxs) + case e := <-txsCh: + if batch == nil { + panic("Expected pubsub to send block header first, but got tx event") + } + txResult := e.(types.EventDataTx).TxResult + batch.Add(&txResult) + got++ + if numTxs == got { + is.idr.AddBatch(batch) + batch = nil + got = 0 + } + } } }() return nil diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 74a2dd7cb..a8537219d 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -190,7 +190,7 @@ func txResultWithTags(tags []cmn.KVPair) *types.TxResult { } } -func benchmarkTxIndex(txsCount int, b *testing.B) { +func benchmarkTxIndex(txsCount int64, b *testing.B) { tx := types.Tx("HELLO WORLD") txResult := &types.TxResult{ Height: 1, @@ -215,7 +215,7 @@ func benchmarkTxIndex(txsCount int, b *testing.B) { indexer := NewTxIndex(store) batch := txindex.NewBatch(txsCount) - for i := 0; i < txsCount; i++ { + for i := int64(0); i < txsCount; i++ { if err := batch.Add(txResult); err != nil { b.Fatal(err) }