diff --git a/node/node.go b/node/node.go index fdc466695..1bd382eb8 100644 --- a/node/node.go +++ b/node/node.go @@ -343,6 +343,7 @@ func NewNode(config *cfg.Config, } indexerService := txindex.NewIndexerService(txIndexer, eventBus) + indexerService.SetLogger(logger.With("module", "txindex")) // run the profile server profileHost := config.ProfListenAddress diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index edcb362e6..93e6269e8 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -41,32 +41,24 @@ func (is *IndexerService) OnStart() error { } go func() { - var numTxs, got int64 - var batch *Batch for { - select { - case e, ok := <-blockHeadersCh: - if !ok { - return - } - numTxs = e.(types.EventDataNewBlockHeader).Header.NumTxs - batch = NewBatch(numTxs) - case e, ok := <-txsCh: + e, ok := <-blockHeadersCh + if !ok { + return + } + header := e.(types.EventDataNewBlockHeader).Header + batch := NewBatch(header.NumTxs) + for i := int64(0); i < header.NumTxs; i++ { + e, ok := <-txsCh if !ok { + is.Logger.Error("Failed to index all transactions due to closed transactions channel", "height", header.Height, "numTxs", header.NumTxs, "numProcessed", i) return } - if batch == nil { - panic("Expected pubsub to send block header first, but got tx event") - } txResult := e.(types.EventDataTx).TxResult batch.Add(&txResult) - got++ - if numTxs == got { - is.idr.AddBatch(batch) - batch = nil - got = 0 - } } + is.idr.AddBatch(batch) + is.Logger.Info("Indexed block", "height", header.Height) } }() return nil