|
|
@ -7,9 +7,58 @@ import ( |
|
|
|
"testing" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/stretchr/testify/assert" |
|
|
|
"github.com/stretchr/testify/require" |
|
|
|
|
|
|
|
abci "github.com/tendermint/abci/types" |
|
|
|
cmn "github.com/tendermint/tmlibs/common" |
|
|
|
tmpubsub "github.com/tendermint/tmlibs/pubsub" |
|
|
|
tmquery "github.com/tendermint/tmlibs/pubsub/query" |
|
|
|
) |
|
|
|
|
|
|
|
func TestEventBusPublishEventTx(t *testing.T) { |
|
|
|
eventBus := NewEventBus() |
|
|
|
err := eventBus.Start() |
|
|
|
require.NoError(t, err) |
|
|
|
defer eventBus.Stop() |
|
|
|
|
|
|
|
tx := Tx("foo") |
|
|
|
result := abci.ResponseDeliverTx{Data: []byte("bar"), Tags: []cmn.KVPair{}, Fee: cmn.KI64Pair{Key: []uint8{}, Value: 0}} |
|
|
|
|
|
|
|
txEventsCh := make(chan interface{}) |
|
|
|
|
|
|
|
// PublishEventTx adds all these 3 tags, so the query below should work
|
|
|
|
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X'", tx.Hash()) |
|
|
|
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh) |
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
done := make(chan struct{}) |
|
|
|
go func() { |
|
|
|
for e := range txEventsCh { |
|
|
|
edt := e.(TMEventData).Unwrap().(EventDataTx) |
|
|
|
assert.Equal(t, int64(1), edt.Height) |
|
|
|
assert.Equal(t, uint32(0), edt.Index) |
|
|
|
assert.Equal(t, tx, edt.Tx) |
|
|
|
assert.Equal(t, result, edt.Result) |
|
|
|
close(done) |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
|
err = eventBus.PublishEventTx(EventDataTx{TxResult{ |
|
|
|
Height: 1, |
|
|
|
Index: 0, |
|
|
|
Tx: tx, |
|
|
|
Result: result, |
|
|
|
}}) |
|
|
|
assert.NoError(t, err) |
|
|
|
|
|
|
|
select { |
|
|
|
case <-done: |
|
|
|
case <-time.After(1 * time.Second): |
|
|
|
t.Fatal("did not receive a transaction after 1 sec.") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func BenchmarkEventBus(b *testing.B) { |
|
|
|
benchmarks := []struct { |
|
|
|
name string |
|
|
|