Browse Source

batch index txs

pull/1550/head
Anton Kaliaev 7 years ago
parent
commit
58e3246ffc
No known key found for this signature in database GPG Key ID: 7B6881D965918214
5 changed files with 39 additions and 20 deletions
  1. +1
    -1
      config/config.go
  2. +4
    -10
      state/execution.go
  3. +1
    -1
      state/txindex/indexer.go
  4. +31
    -6
      state/txindex/indexer_service.go
  5. +2
    -2
      state/txindex/kv/kv_test.go

+ 1
- 1
config/config.go View File

@ -515,7 +515,7 @@ type TxIndexConfig struct {
// DefaultTxIndexConfig returns a default configuration for the transaction indexer.
func DefaultTxIndexConfig() *TxIndexConfig {
return &TxIndexConfig{
Indexer: "null",
Indexer: "kv",
IndexTags: "",
IndexAllTags: false,
}


+ 4
- 10
state/execution.go View File

@ -341,23 +341,17 @@ func updateState(s State, blockID types.BlockID, header *types.Header,
// Fire TxEvent for every tx.
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
// NOTE: do we still need this buffer ?
txEventBuffer := types.NewTxEventBuffer(eventBus, int(block.NumTxs))
eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
for i, tx := range block.Data.Txs {
txEventBuffer.PublishEventTx(types.EventDataTx{types.TxResult{
eventBus.PublishEventTx(types.EventDataTx{types.TxResult{
Height: block.Height,
Index: uint32(i),
Tx: tx,
Result: *(abciResponses.DeliverTx[i]),
}})
}
eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
err := txEventBuffer.Flush()
if err != nil {
logger.Error("Failed to flush event buffer", "err", err)
}
}
//----------------------------------------------------------------------------------------------------


+ 1
- 1
state/txindex/indexer.go View File

@ -34,7 +34,7 @@ type Batch struct {
}
// NewBatch creates a new Batch.
func NewBatch(n int) *Batch {
func NewBatch(n int64) *Batch {
return &Batch{
Ops: make([]*types.TxResult, n),
}


+ 31
- 6
state/txindex/indexer_service.go View File

@ -11,6 +11,8 @@ const (
subscriber = "IndexerService"
)
// IndexerService connects event bus and transaction indexer together in order
// to index transactions coming from event bus.
type IndexerService struct {
cmn.BaseService
@ -18,6 +20,7 @@ type IndexerService struct {
eventBus *types.EventBus
}
// NewIndexerService returns a new service instance.
func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService {
is := &IndexerService{idr: idr, eventBus: eventBus}
is.BaseService = *cmn.NewBaseService(nil, "IndexerService", is)
@ -27,15 +30,37 @@ func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService
// OnStart implements cmn.Service by subscribing for all transactions
// and indexing them by tags.
func (is *IndexerService) OnStart() error {
ch := make(chan interface{})
if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, ch); err != nil {
blockHeadersCh := make(chan interface{})
if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader, blockHeadersCh); err != nil {
return err
}
txsCh := make(chan interface{})
if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, txsCh); err != nil {
return err
}
go func() {
for event := range ch {
// TODO: may be not perfomant to write one event at a time
txResult := event.(types.EventDataTx).TxResult
is.idr.Index(&txResult)
var numTxs, got int64
var batch *Batch
for {
select {
case e := <-blockHeadersCh:
numTxs = e.(types.EventDataNewBlockHeader).Header.NumTxs
batch = NewBatch(numTxs)
case e := <-txsCh:
if batch == nil {
panic("Expected pubsub to send block header first, but got tx event")
}
txResult := e.(types.EventDataTx).TxResult
batch.Add(&txResult)
got++
if numTxs == got {
is.idr.AddBatch(batch)
batch = nil
got = 0
}
}
}
}()
return nil


+ 2
- 2
state/txindex/kv/kv_test.go View File

@ -190,7 +190,7 @@ func txResultWithTags(tags []cmn.KVPair) *types.TxResult {
}
}
func benchmarkTxIndex(txsCount int, b *testing.B) {
func benchmarkTxIndex(txsCount int64, b *testing.B) {
tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{
Height: 1,
@ -215,7 +215,7 @@ func benchmarkTxIndex(txsCount int, b *testing.B) {
indexer := NewTxIndex(store)
batch := txindex.NewBatch(txsCount)
for i := 0; i < txsCount; i++ {
for i := int64(0); i < txsCount; i++ {
if err := batch.Add(txResult); err != nil {
b.Fatal(err)
}


Loading…
Cancel
Save