Browse Source

wait 100ms before kicking a subscriber out

+ a test for indexer_service
pull/3227/head
Anton Kaliaev 6 years ago
parent
commit
165bb2abfe
No known key found for this signature in database GPG Key ID: 7B6881D965918214
4 changed files with 82 additions and 10 deletions
  1. +2
    -1
      libs/pubsub/pubsub.go
  2. +15
    -7
      state/txindex/indexer_service.go
  3. +64
    -0
      state/txindex/indexer_service_test.go
  4. +1
    -2
      types/event_bus_test.go

+ 2
- 1
libs/pubsub/pubsub.go View File

@ -38,6 +38,7 @@ import (
"context"
"errors"
"sync"
"time"
cmn "github.com/tendermint/tendermint/libs/common"
)
@ -394,7 +395,7 @@ func (state *state) send(msg interface{}, tags map[string]string) {
// don't block on buffered channels
select {
case subscription.out <- Message{msg, tags}:
default:
case <-time.After(100 * time.Millisecond):
state.remove(clientID, qStr, ErrOutOfCapacity)
}
}


+ 15
- 7
state/txindex/indexer_service.go View File

@ -51,22 +51,30 @@ func (is *IndexerService) OnStart() error {
select {
case msg2 := <-txsSub.Out():
txResult := msg2.Data().(types.EventDataTx).TxResult
batch.Add(&txResult)
if err = batch.Add(&txResult); err != nil {
is.Logger.Error("Can't add tx to batch",
"height", header.Height,
"index", txResult.Index,
"err", err)
}
case <-txsSub.Cancelled():
is.Logger.Error("Failed to index a block. txsSub was cancelled. Did the Tendermint stop?",
"err", txsSub.Err(),
is.Logger.Error("Failed to index block. txsSub was cancelled. Did the Tendermint stop?",
"height", header.Height,
"numTxs", header.NumTxs,
"numProcessed", i,
"err", txsSub.Err(),
)
return
}
}
is.idr.AddBatch(batch)
is.Logger.Info("Indexed block", "height", header.Height)
if err = is.idr.AddBatch(batch); err != nil {
is.Logger.Error("Failed to index block", "height", header.Height, "err", err)
} else {
is.Logger.Info("Indexed block", "height", header.Height)
}
case <-blockHeadersSub.Cancelled():
is.Logger.Error("Failed to index a block. blockHeadersSub was cancelled. Did the Tendermint stop?",
"reason", blockHeadersSub.Err())
is.Logger.Error("blockHeadersSub was cancelled. Did the Tendermint stop?",
"err", blockHeadersSub.Err())
return
}
}


+ 64
- 0
state/txindex/indexer_service_test.go View File

@ -0,0 +1,64 @@
package txindex_test
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/kv"
"github.com/tendermint/tendermint/types"
)
func TestIndexerServiceIndexesBlocks(t *testing.T) {
// event bus
eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger())
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()
// tx indexer
store := db.NewMemDB()
txIndexer := kv.NewTxIndex(store, kv.IndexAllTags())
service := txindex.NewIndexerService(txIndexer, eventBus)
service.SetLogger(log.TestingLogger())
err = service.Start()
require.NoError(t, err)
defer service.Stop()
// publish block with txs
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: types.Header{Height: 1, NumTxs: 2},
})
txResult1 := &types.TxResult{
Height: 1,
Index: uint32(0),
Tx: types.Tx("foo"),
Result: abci.ResponseDeliverTx{Code: 0},
}
eventBus.PublishEventTx(types.EventDataTx{*txResult1})
txResult2 := &types.TxResult{
Height: 1,
Index: uint32(1),
Tx: types.Tx("bar"),
Result: abci.ResponseDeliverTx{Code: 0},
}
eventBus.PublishEventTx(types.EventDataTx{*txResult2})
time.Sleep(100 * time.Millisecond)
// check the result
res, err := txIndexer.Get(types.Tx("foo").Hash())
assert.NoError(t, err)
assert.Equal(t, txResult1, res)
res, err = txIndexer.Get(types.Tx("bar").Hash())
assert.NoError(t, err)
assert.Equal(t, txResult2, res)
}

+ 1
- 2
types/event_bus_test.go View File

@ -139,8 +139,7 @@ func TestEventBusPublish(t *testing.T) {
require.NoError(t, err)
defer eventBus.Stop()
// FIXME: the test fails without a buffer
sub, err := eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, 14)
sub, err := eventBus.Subscribe(context.Background(), "test", tmquery.Empty{})
require.NoError(t, err)
const numEventsExpected = 14


Loading…
Cancel
Save