Browse Source

pubsub 2.0 (#3227)

* green pubsub tests :OK:

* get rid of clientToQueryMap

* Subscribe and SubscribeUnbuffered

* start adapting other pkgs to new pubsub

* nope

* rename MsgAndTags to Message

* remove TagMap

it does not bring any additional benefits

* bring back EventSubscriber

* fix test

* fix data race in TestStartNextHeightCorrectly

```
Write at 0x00c0001c7418 by goroutine 796:
  github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly()
      /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162

Previous read at 0x00c0001c7418 by goroutine 858:
  github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote()
      /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366
  github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote()
      /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f
  github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg()
      /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e
  github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine()
      /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794

Goroutine 796 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:878 +0x659
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1119 +0xa8
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1117 +0x4ee
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1034 +0x2ee
  main.main()
      _testmain.go:214 +0x332

Goroutine 858 (running) created at:
  github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines()
      /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221
  github.com/tendermint/tendermint/consensus.startTestRound()
      /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63
  github.com/tendermint/tendermint/consensus.TestStateFullRound1()
      /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162
```

* fixes after my own review

* fix formatting

* wait 100ms before kicking a subscriber out

+ a test for indexer_service

* fixes after my second review

* no timeout

* add changelog entries

* fix merge conflicts

* fix typos after Thane's review

Co-Authored-By: melekes <anton.kalyaev@gmail.com>

* reformat code

* rewrite indexer service in the attempt to fix failing test

https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527

* Revert "rewrite indexer service in the attempt to fix failing test"

This reverts commit 0d9107a098.

* another attempt to fix indexer

* fixes after Ethan's review

* use unbuffered channel when indexing transactions

Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716

* add a comment for EventBus#SubscribeUnbuffered

* format code
pull/3348/head
Anton Kaliaev 5 years ago
committed by Ethan Buchman
parent
commit
2137ecc130
31 changed files with 830 additions and 609 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +5
    -5
      consensus/byzantine_test.go
  3. +94
    -113
      consensus/common_test.go
  4. +3
    -3
      consensus/mempool_test.go
  5. +83
    -68
      consensus/reactor_test.go
  6. +7
    -5
      consensus/replay.go
  7. +9
    -33
      consensus/replay_file.go
  8. +5
    -4
      consensus/replay_test.go
  9. +3
    -4
      consensus/state_test.go
  10. +3
    -4
      libs/pubsub/example_test.go
  11. +113
    -156
      libs/pubsub/pubsub.go
  12. +148
    -59
      libs/pubsub/pubsub_test.go
  13. +1
    -3
      libs/pubsub/query/empty.go
  14. +4
    -5
      libs/pubsub/query/empty_test.go
  15. +4
    -6
      libs/pubsub/query/query.go
  16. +2
    -3
      libs/pubsub/query/query_test.go
  17. +89
    -0
      libs/pubsub/subscription.go
  18. +4
    -3
      node/node_test.go
  19. +6
    -18
      rpc/client/helpers.go
  20. +51
    -14
      rpc/client/httpclient.go
  21. +2
    -2
      rpc/client/localclient.go
  22. +19
    -5
      rpc/core/events.go
  23. +12
    -19
      rpc/core/mempool.go
  24. +1
    -0
      rpc/core/types/responses.go
  25. +1
    -0
      rpc/lib/server/handlers.go
  26. +8
    -7
      rpc/lib/types/types.go
  27. +6
    -5
      state/execution_test.go
  28. +22
    -17
      state/txindex/indexer_service.go
  29. +64
    -0
      state/txindex/indexer_service_test.go
  30. +19
    -7
      types/event_bus.go
  31. +40
    -41
      types/event_bus_test.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -7,6 +7,7 @@ Special thanks to external contributors on this release:
### BREAKING CHANGES:
* CLI/RPC/Config
- [httpclient] Update Subscribe interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
* Apps
@ -21,3 +22,4 @@ Special thanks to external contributors on this release:
### IMPROVEMENTS:
### BUG FIXES:
- [libs/pubsub] \#951, \#1880 use non-blocking send when dispatching messages [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)

+ 5
- 5
consensus/byzantine_test.go View File

@ -46,7 +46,7 @@ func TestByzantine(t *testing.T) {
switches[i].SetLogger(p2pLogger.With("validator", i))
}
eventChans := make([]chan interface{}, N)
blocksSubs := make([]types.Subscription, N)
reactors := make([]p2p.Reactor, N)
for i := 0; i < N; i++ {
// make first val byzantine
@ -65,8 +65,8 @@ func TestByzantine(t *testing.T) {
eventBus := css[i].eventBus
eventBus.SetLogger(logger.With("module", "events", "validator", i))
eventChans[i] = make(chan interface{}, 1)
err := eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i])
var err error
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states
@ -131,7 +131,7 @@ func TestByzantine(t *testing.T) {
p2p.Connect2Switches(switches, ind1, ind2)
// wait for someone in the big partition (B) to make a block
<-eventChans[ind2]
<-blocksSubs[ind2].Out()
t.Log("A block has been committed. Healing partition")
p2p.Connect2Switches(switches, ind0, ind1)
@ -143,7 +143,7 @@ func TestByzantine(t *testing.T) {
wg.Add(2)
for i := 1; i < N-1; i++ {
go func(j int) {
<-eventChans[j]
<-blocksSubs[j].Out()
wg.Done()
}(i)
}


+ 94
- 113
consensus/common_test.go View File

@ -7,7 +7,6 @@ import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sort"
"sync"
"testing"
@ -25,6 +24,7 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/privval"
@ -227,24 +227,22 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo
cs.mtx.Unlock()
}
// genesis
func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
voteCh0 := make(chan interface{})
err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote, voteCh0)
func subscribeToVoter(cs *ConsensusState, addr []byte) <-chan tmpubsub.Message {
votesSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote)
if err != nil {
panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote))
}
voteCh := make(chan interface{})
ch := make(chan tmpubsub.Message)
go func() {
for v := range voteCh0 {
vote := v.(types.EventDataVote)
for msg := range votesSub.Out() {
vote := msg.Data().(types.EventDataVote)
// we only fire for our own votes
if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
voteCh <- v
ch <- msg
}
}
}()
return voteCh
return ch
}
//-------------------------------------------------------------------------------
@ -321,7 +319,7 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
//-------------------------------------------------------------------------------
func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration,
func ensureNoNewEvent(ch <-chan tmpubsub.Message, timeout time.Duration,
errorMessage string) {
select {
case <-time.After(timeout):
@ -331,28 +329,28 @@ func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration,
}
}
func ensureNoNewEventOnChannel(ch <-chan interface{}) {
func ensureNoNewEventOnChannel(ch <-chan tmpubsub.Message) {
ensureNoNewEvent(
ch,
ensureTimeout,
"We should be stuck waiting, not receiving new event on the channel")
}
func ensureNoNewRoundStep(stepCh <-chan interface{}) {
func ensureNoNewRoundStep(stepCh <-chan tmpubsub.Message) {
ensureNoNewEvent(
stepCh,
ensureTimeout,
"We should be stuck waiting, not receiving NewRoundStep event")
}
func ensureNoNewUnlock(unlockCh <-chan interface{}) {
func ensureNoNewUnlock(unlockCh <-chan tmpubsub.Message) {
ensureNoNewEvent(
unlockCh,
ensureTimeout,
"We should be stuck waiting, not receiving Unlock event")
}
func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) {
func ensureNoNewTimeout(stepCh <-chan tmpubsub.Message, timeout int64) {
timeoutDuration := time.Duration(timeout*5) * time.Nanosecond
ensureNoNewEvent(
stepCh,
@ -360,187 +358,170 @@ func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) {
"We should be stuck waiting, not receiving NewTimeout event")
}
func ensureNewEvent(
ch <-chan interface{},
height int64,
round int,
timeout time.Duration,
errorMessage string) {
func ensureNewEvent(ch <-chan tmpubsub.Message, height int64, round int, timeout time.Duration, errorMessage string) {
select {
case <-time.After(timeout):
panic(errorMessage)
case ev := <-ch:
rs, ok := ev.(types.EventDataRoundState)
case msg := <-ch:
roundStateEvent, ok := msg.Data().(types.EventDataRoundState)
if !ok {
panic(
fmt.Sprintf(
"expected a EventDataRoundState, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
panic(fmt.Sprintf("expected a EventDataRoundState, got %T. Wrong subscription channel?",
msg.Data()))
}
if rs.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
if roundStateEvent.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, roundStateEvent.Height))
}
if rs.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
if roundStateEvent.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, roundStateEvent.Round))
}
// TODO: We could check also for a step at this point!
}
}
func ensureNewRound(roundCh <-chan interface{}, height int64, round int) {
func ensureNewRound(roundCh <-chan tmpubsub.Message, height int64, round int) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewRound event")
case ev := <-roundCh:
rs, ok := ev.(types.EventDataNewRound)
case msg := <-roundCh:
newRoundEvent, ok := msg.Data().(types.EventDataNewRound)
if !ok {
panic(
fmt.Sprintf(
"expected a EventDataNewRound, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
panic(fmt.Sprintf("expected a EventDataNewRound, got %T. Wrong subscription channel?",
msg.Data()))
}
if rs.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
if newRoundEvent.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, newRoundEvent.Height))
}
if rs.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
if newRoundEvent.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, newRoundEvent.Round))
}
}
}
func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) {
func ensureNewTimeout(timeoutCh <-chan tmpubsub.Message, height int64, round int, timeout int64) {
timeoutDuration := time.Duration(timeout*5) * time.Nanosecond
ensureNewEvent(timeoutCh, height, round, timeoutDuration,
"Timeout expired while waiting for NewTimeout event")
}
func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) {
func ensureNewProposal(proposalCh <-chan tmpubsub.Message, height int64, round int) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh:
rs, ok := ev.(types.EventDataCompleteProposal)
case msg := <-proposalCh:
proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal)
if !ok {
panic(
fmt.Sprintf(
"expected a EventDataCompleteProposal, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
panic(fmt.Sprintf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
msg.Data()))
}
if rs.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
if proposalEvent.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, proposalEvent.Height))
}
if rs.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
if proposalEvent.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, proposalEvent.Round))
}
}
}
func ensureNewValidBlock(validBlockCh <-chan interface{}, height int64, round int) {
func ensureNewValidBlock(validBlockCh <-chan tmpubsub.Message, height int64, round int) {
ensureNewEvent(validBlockCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewValidBlock event")
}
func ensureNewBlock(blockCh <-chan interface{}, height int64) {
func ensureNewBlock(blockCh <-chan tmpubsub.Message, height int64) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewBlock event")
case ev := <-blockCh:
block, ok := ev.(types.EventDataNewBlock)
case msg := <-blockCh:
blockEvent, ok := msg.Data().(types.EventDataNewBlock)
if !ok {
panic(fmt.Sprintf("expected a *types.EventDataNewBlock, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(block)))
panic(fmt.Sprintf("expected a EventDataNewBlock, got %T. Wrong subscription channel?",
msg.Data()))
}
if block.Block.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, block.Block.Height))
if blockEvent.Block.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, blockEvent.Block.Height))
}
}
}
func ensureNewBlockHeader(blockCh <-chan interface{}, height int64, blockHash cmn.HexBytes) {
func ensureNewBlockHeader(blockCh <-chan tmpubsub.Message, height int64, blockHash cmn.HexBytes) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewBlockHeader event")
case ev := <-blockCh:
blockHeader, ok := ev.(types.EventDataNewBlockHeader)
case msg := <-blockCh:
blockHeaderEvent, ok := msg.Data().(types.EventDataNewBlockHeader)
if !ok {
panic(fmt.Sprintf("expected a *types.EventDataNewBlockHeader, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(blockHeader)))
panic(fmt.Sprintf("expected a EventDataNewBlockHeader, got %T. Wrong subscription channel?",
msg.Data()))
}
if blockHeader.Header.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, blockHeader.Header.Height))
if blockHeaderEvent.Header.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, blockHeaderEvent.Header.Height))
}
if !bytes.Equal(blockHeader.Header.Hash(), blockHash) {
panic(fmt.Sprintf("expected header %X, got %X", blockHash, blockHeader.Header.Hash()))
if !bytes.Equal(blockHeaderEvent.Header.Hash(), blockHash) {
panic(fmt.Sprintf("expected header %X, got %X", blockHash, blockHeaderEvent.Header.Hash()))
}
}
}
func ensureNewUnlock(unlockCh <-chan interface{}, height int64, round int) {
func ensureNewUnlock(unlockCh <-chan tmpubsub.Message, height int64, round int) {
ensureNewEvent(unlockCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewUnlock event")
}
func ensureVote(voteCh <-chan interface{}, height int64, round int,
voteType types.SignedMsgType) {
func ensureProposal(proposalCh <-chan tmpubsub.Message, height int64, round int, propID types.BlockID) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewVote event")
case v := <-voteCh:
edv, ok := v.(types.EventDataVote)
panic("Timeout expired while waiting for NewProposal event")
case msg := <-proposalCh:
proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal)
if !ok {
panic(fmt.Sprintf("expected a *types.Vote, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(v)))
panic(fmt.Sprintf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
msg.Data()))
}
vote := edv.Vote
if vote.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height))
if proposalEvent.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, proposalEvent.Height))
}
if vote.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round))
if proposalEvent.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, proposalEvent.Round))
}
if vote.Type != voteType {
panic(fmt.Sprintf("expected type %v, got %v", voteType, vote.Type))
if !proposalEvent.BlockID.Equals(propID) {
panic("Proposed block does not match expected block")
}
}
}
func ensureProposal(proposalCh <-chan interface{}, height int64, round int, propId types.BlockID) {
func ensurePrecommit(voteCh <-chan tmpubsub.Message, height int64, round int) {
ensureVote(voteCh, height, round, types.PrecommitType)
}
func ensurePrevote(voteCh <-chan tmpubsub.Message, height int64, round int) {
ensureVote(voteCh, height, round, types.PrevoteType)
}
func ensureVote(voteCh <-chan tmpubsub.Message, height int64, round int,
voteType types.SignedMsgType) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh:
rs, ok := ev.(types.EventDataCompleteProposal)
panic("Timeout expired while waiting for NewVote event")
case msg := <-voteCh:
voteEvent, ok := msg.Data().(types.EventDataVote)
if !ok {
panic(
fmt.Sprintf(
"expected a EventDataCompleteProposal, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
panic(fmt.Sprintf("expected a EventDataVote, got %T. Wrong subscription channel?",
msg.Data()))
}
if rs.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
vote := voteEvent.Vote
if vote.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height))
}
if rs.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
if vote.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round))
}
if !rs.BlockID.Equals(propId) {
panic("Proposed block does not match expected block")
if vote.Type != voteType {
panic(fmt.Sprintf("expected type %v, got %v", voteType, vote.Type))
}
}
}
func ensurePrecommit(voteCh <-chan interface{}, height int64, round int) {
ensureVote(voteCh, height, round, types.PrecommitType)
}
func ensurePrevote(voteCh <-chan interface{}, height int64, round int) {
ensureVote(voteCh, height, round, types.PrevoteType)
}
func ensureNewEventOnChannel(ch <-chan interface{}) {
func ensureNewEventOnChannel(ch <-chan tmpubsub.Message) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for new activity on the channel")


+ 3
- 3
consensus/mempool_test.go View File

@ -117,9 +117,9 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
for nTxs := 0; nTxs < NTxs; {
ticker := time.NewTicker(time.Second * 30)
select {
case b := <-newBlockCh:
evt := b.(types.EventDataNewBlock)
nTxs += int(evt.Block.Header.NumTxs)
case msg := <-newBlockCh:
blockEvent := msg.Data().(types.EventDataNewBlock)
nTxs += int(blockEvent.Block.Header.NumTxs)
case <-ticker.C:
panic("Timed out waiting to commit blocks with transactions")
}


+ 83
- 68
consensus/reactor_test.go View File

@ -30,9 +30,13 @@ import (
//----------------------------------------------
// in-process testnets
func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*ConsensusReactor, []chan interface{}, []*types.EventBus) {
func startConsensusNet(t *testing.T, css []*ConsensusState, N int) (
[]*ConsensusReactor,
[]types.Subscription,
[]*types.EventBus,
) {
reactors := make([]*ConsensusReactor, N)
eventChans := make([]chan interface{}, N)
blocksSubs := make([]types.Subscription, 0)
eventBuses := make([]*types.EventBus, N)
for i := 0; i < N; i++ {
/*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info")
@ -44,9 +48,9 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
eventBuses[i] = css[i].eventBus
reactors[i].SetEventBus(eventBuses[i])
eventChans[i] = make(chan interface{}, 1)
err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i])
blocksSub, err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
blocksSubs = append(blocksSubs, blocksSub)
}
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
@ -63,7 +67,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s, 0)
}
return reactors, eventChans, eventBuses
return reactors, blocksSubs, eventBuses
}
func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuses []*types.EventBus) {
@ -84,11 +88,11 @@ func TestReactorBasic(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
defer cleanup()
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block
timeoutWaitGroup(t, N, func(j int) {
<-eventChans[j]
<-blocksSubs[j].Out()
}, css)
}
@ -161,20 +165,20 @@ func TestReactorWithEvidence(t *testing.T) {
css[i] = cs
}
reactors, eventChans, eventBuses := startConsensusNet(t, css, nValidators)
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, nValidators)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block with no evidence
timeoutWaitGroup(t, nValidators, func(j int) {
blockI := <-eventChans[j]
block := blockI.(types.EventDataNewBlock).Block
msg := <-blocksSubs[j].Out()
block := msg.Data().(types.EventDataNewBlock).Block
assert.True(t, len(block.Evidence.Evidence) == 0)
}, css)
// second block should have evidence
timeoutWaitGroup(t, nValidators, func(j int) {
blockI := <-eventChans[j]
block := blockI.(types.EventDataNewBlock).Block
msg := <-blocksSubs[j].Out()
block := msg.Data().(types.EventDataNewBlock).Block
assert.True(t, len(block.Evidence.Evidence) > 0)
}, css)
}
@ -221,7 +225,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
c.Consensus.CreateEmptyBlocks = false
})
defer cleanup()
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// send a tx
@ -231,7 +235,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
// wait till everyone makes the first new block
timeoutWaitGroup(t, N, func(j int) {
<-eventChans[j]
<-blocksSubs[j].Out()
}, css)
}
@ -240,12 +244,12 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
defer cleanup()
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block
timeoutWaitGroup(t, N, func(j int) {
<-eventChans[j]
<-blocksSubs[j].Out()
}, css)
// Get peer
@ -265,7 +269,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
logger := log.TestingLogger()
css, cleanup := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentKVStore)
defer cleanup()
reactors, eventChans, eventBuses := startConsensusNet(t, css, nVals)
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, nVals)
defer stopConsensusNet(logger, reactors, eventBuses)
// map of active validators
@ -277,7 +281,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
// wait till everyone makes block 1
timeoutWaitGroup(t, nVals, func(j int) {
<-eventChans[j]
<-blocksSubs[j].Out()
}, css)
//---------------------------------------------------------------------------
@ -288,10 +292,10 @@ func TestReactorVotingPowerChange(t *testing.T) {
updateValidatorTx := kvstore.MakeValSetChangeTx(val1PubKeyABCI, 25)
previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
@ -300,10 +304,10 @@ func TestReactorVotingPowerChange(t *testing.T) {
updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 2)
previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
@ -312,10 +316,10 @@ func TestReactorVotingPowerChange(t *testing.T) {
updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 26)
previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
@ -329,7 +333,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
defer cleanup()
logger := log.TestingLogger()
reactors, eventChans, eventBuses := startConsensusNet(t, css, nPeers)
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, nPeers)
defer stopConsensusNet(logger, reactors, eventBuses)
// map of active validators
@ -341,7 +345,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
// wait till everyone makes block 1
timeoutWaitGroup(t, nPeers, func(j int) {
<-eventChans[j]
<-blocksSubs[j].Out()
}, css)
//---------------------------------------------------------------------------
@ -354,22 +358,22 @@ func TestReactorValidatorSetChanges(t *testing.T) {
// wait till everyone makes block 2
// ensure the commit includes all validators
// send newValTx to change vals in block 3
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx1)
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, newValidatorTx1)
// wait till everyone makes block 3.
// it includes the commit for block 2, which is by the original validator set
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx1)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, newValidatorTx1)
// wait till everyone makes block 4.
// it includes the commit for block 3, which is by the original validator set
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css)
// the commits for block 4 should be with the updated validator set
activeVals[string(newValidatorPubKey1.Address())] = struct{}{}
// wait till everyone makes block 5
// it includes the commit for block 4, which should have the updated validator set
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css)
//---------------------------------------------------------------------------
logger.Info("---------------------------- Testing changing the voting power of one validator")
@ -379,10 +383,10 @@ func TestReactorValidatorSetChanges(t *testing.T) {
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, updateValidatorTx1)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, updateValidatorTx1)
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css)
if css[nVals].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
t.Errorf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[nVals].GetRoundState().LastValidators.TotalVotingPower())
@ -399,12 +403,12 @@ func TestReactorValidatorSetChanges(t *testing.T) {
newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3)
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css)
activeVals[string(newValidatorPubKey2.Address())] = struct{}{}
activeVals[string(newValidatorPubKey3.Address())] = struct{}{}
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css)
//---------------------------------------------------------------------------
logger.Info("---------------------------- Testing removing two validators at once")
@ -412,12 +416,12 @@ func TestReactorValidatorSetChanges(t *testing.T) {
removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0)
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css)
delete(activeVals, string(newValidatorPubKey2.Address()))
delete(activeVals, string(newValidatorPubKey3.Address()))
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css)
}
// Check we can make blocks with skip_timeout_commit=false
@ -430,23 +434,27 @@ func TestReactorWithTimeoutCommit(t *testing.T) {
css[i].config.SkipTimeoutCommit = false
}
reactors, eventChans, eventBuses := startConsensusNet(t, css, N-1)
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N-1)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block
timeoutWaitGroup(t, N-1, func(j int) {
<-eventChans[j]
<-blocksSubs[j].Out()
}, css)
}
func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
func waitForAndValidateBlock(
t *testing.T,
n int,
activeVals map[string]struct{},
blocksSubs []types.Subscription,
css []*ConsensusState,
txs ...[]byte,
) {
timeoutWaitGroup(t, n, func(j int) {
css[j].Logger.Debug("waitForAndValidateBlock")
newBlockI, ok := <-eventChans[j]
if !ok {
return
}
newBlock := newBlockI.(types.EventDataNewBlock).Block
msg := <-blocksSubs[j].Out()
newBlock := msg.Data().(types.EventDataNewBlock).Block
css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
@ -457,17 +465,21 @@ func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}
}, css)
}
func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
func waitForAndValidateBlockWithTx(
t *testing.T,
n int,
activeVals map[string]struct{},
blocksSubs []types.Subscription,
css []*ConsensusState,
txs ...[]byte,
) {
timeoutWaitGroup(t, n, func(j int) {
ntxs := 0
BLOCK_TX_LOOP:
for {
css[j].Logger.Debug("waitForAndValidateBlockWithTx", "ntxs", ntxs)
newBlockI, ok := <-eventChans[j]
if !ok {
return
}
newBlock := newBlockI.(types.EventDataNewBlock).Block
msg := <-blocksSubs[j].Out()
newBlock := msg.Data().(types.EventDataNewBlock).Block
css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
@ -488,18 +500,21 @@ func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]st
}, css)
}
func waitForBlockWithUpdatedValsAndValidateIt(t *testing.T, n int, updatedVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState) {
func waitForBlockWithUpdatedValsAndValidateIt(
t *testing.T,
n int,
updatedVals map[string]struct{},
blocksSubs []types.Subscription,
css []*ConsensusState,
) {
timeoutWaitGroup(t, n, func(j int) {
var newBlock *types.Block
LOOP:
for {
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt")
newBlockI, ok := <-eventChans[j]
if !ok {
return
}
newBlock = newBlockI.(types.EventDataNewBlock).Block
msg := <-blocksSubs[j].Out()
newBlock = msg.Data().(types.EventDataNewBlock).Block
if newBlock.LastCommit.Size() == len(updatedVals) {
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height)
break LOOP


+ 7
- 5
consensus/replay.go View File

@ -42,7 +42,7 @@ var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Unmarshal and apply a single message to the consensus state as if it were
// received in receiveRoutine. Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running.
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error {
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscription) error {
// Skip meta messages which exist for demarcating boundaries.
if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil
@ -54,15 +54,17 @@ func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan
cs.Logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
// these are playback checks
ticker := time.After(time.Second * 2)
if newStepCh != nil {
if newStepSub != nil {
select {
case mi := <-newStepCh:
m2 := mi.(types.EventDataRoundState)
case stepMsg := <-newStepSub.Out():
m2 := stepMsg.Data().(types.EventDataRoundState)
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
}
case <-newStepSub.Cancelled():
return fmt.Errorf("Failed to read off newStepSub.Out(). newStepSub was cancelled")
case <-ticker:
return fmt.Errorf("Failed to read off newStepCh")
return fmt.Errorf("Failed to read off newStepSub.Out()")
}
}
case msgInfo:


+ 9
- 33
consensus/replay_file.go View File

@ -51,25 +51,13 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
cs.startForReplay()
// ensure all new step events are regenerated as expected
newStepCh := make(chan interface{}, 1)
ctx := context.Background()
err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh)
newStepSub, err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
if err != nil {
return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)
}
defer func() {
// drain newStepCh to make sure we don't block
LOOP:
for {
select {
case <-newStepCh:
default:
break LOOP
}
}
cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
}()
defer cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
// just open the file for reading, no need to use wal
fp, err := os.OpenFile(file, os.O_RDONLY, 0600)
@ -94,7 +82,7 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
return err
}
if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil {
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
return err
}
@ -131,7 +119,7 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState sm.S
}
// go back count steps by resetting the state and running (pb.count - count) steps
func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
func (pb *playback) replayReset(count int, newStepSub types.Subscription) error {
pb.cs.Stop()
pb.cs.Wait()
@ -161,7 +149,7 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
} else if err != nil {
return err
}
if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil {
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
return err
}
pb.count++
@ -225,27 +213,15 @@ func (pb *playback) replayConsoleLoop() int {
ctx := context.Background()
// ensure all new step events are regenerated as expected
newStepCh := make(chan interface{}, 1)
err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh)
newStepSub, err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
if err != nil {
cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep))
}
defer func() {
// drain newStepCh to make sure we don't block
LOOP:
for {
select {
case <-newStepCh:
default:
break LOOP
}
}
pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
}()
defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
if len(tokens) == 1 {
if err := pb.replayReset(1, newStepCh); err != nil {
if err := pb.replayReset(1, newStepSub); err != nil {
pb.cs.Logger.Error("Replay reset error", "err", err)
}
} else {
@ -255,7 +231,7 @@ func (pb *playback) replayConsoleLoop() int {
} else if i > pb.count {
fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count)
} else {
if err := pb.replayReset(i, newStepCh); err != nil {
if err := pb.replayReset(i, newStepSub); err != nil {
pb.cs.Logger.Error("Replay reset error", "err", err)
}
}


+ 5
- 4
consensus/replay_test.go View File

@ -78,13 +78,14 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, consensusReplayConfig *
// in the WAL itself. Assuming the consensus state is running, replay of any
// WAL, including the empty one, should eventually be followed by a new
// block, or else something is wrong.
newBlockCh := make(chan interface{}, 1)
err = cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, newBlockCh)
newBlockSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
select {
case <-newBlockCh:
case <-newBlockSub.Out():
case <-newBlockSub.Cancelled():
t.Fatal("newBlockSub was cancelled")
case <-time.After(120 * time.Second):
t.Fatalf("Timed out waiting for new block (see trace above)")
t.Fatal("Timed out waiting for new block (see trace above)")
}
}


+ 3
- 4
consensus/state_test.go View File

@ -1620,11 +1620,10 @@ func TestStateOutputVoteStats(t *testing.T) {
}
// subscribe subscribes test client to the given query and returns a channel with cap = 1.
func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan interface{} {
out := make(chan interface{}, 1)
err := eventBus.Subscribe(context.Background(), testSubscriber, q, out)
func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.Message {
sub, err := eventBus.Subscribe(context.Background(), testSubscriber, q)
if err != nil {
panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, q))
}
return out
return sub.Out()
}

+ 3
- 4
libs/pubsub/example_test.go View File

@ -19,10 +19,9 @@ func TestExample(t *testing.T) {
defer s.Stop()
ctx := context.Background()
ch := make(chan interface{}, 1)
err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch)
subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"))
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]string{"abci.account.name": "John"}))
err = s.PublishWithTags(ctx, "Tombstone", map[string]string{"abci.account.name": "John"})
require.NoError(t, err)
assertReceive(t, "Tombstone", ch)
assertReceive(t, "Tombstone", subscription.Out())
}

+ 113
- 156
libs/pubsub/pubsub.go View File

@ -10,41 +10,25 @@
// match, this message will be pushed to all clients, subscribed to that query.
// See query subpackage for our implementation.
//
// Due to the blocking send implementation, a single subscriber can freeze an
// entire server by not reading messages before it unsubscribes. To avoid such
// scenario, subscribers must either:
// Example:
//
// a) make sure they continue to read from the out channel until
// Unsubscribe(All) is called
//
// s.Subscribe(ctx, sub, qry, out)
// go func() {
// for msg := range out {
// // handle msg
// // will exit automatically when out is closed by Unsubscribe(All)
// }
// }()
// s.UnsubscribeAll(ctx, sub)
//
// b) drain the out channel before calling Unsubscribe(All)
// q, err := query.New("account.name='John'")
// if err != nil {
// return err
// }
// ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
// defer cancel()
// subscription, err := pubsub.Subscribe(ctx, "johns-transactions", q)
// if err != nil {
// return err
// }
//
// s.Subscribe(ctx, sub, qry, out)
// defer func() {
// // drain out to make sure we don't block
// LOOP:
// for {
// select {
// case <-out:
// default:
// break LOOP
// }
// }
// s.UnsubscribeAll(ctx, sub)
// }()
// for msg := range out {
// // handle msg
// if err != nil {
// return err
// for {
// select {
// case msg <- subscription.Out():
// // handle msg.Data() and msg.Tags()
// case <-subscription.Cancelled():
// return subscription.Err()
// }
// }
//
@ -77,21 +61,25 @@ var (
ErrAlreadySubscribed = errors.New("already subscribed")
)
type cmd struct {
op operation
query Query
ch chan<- interface{}
clientID string
msg interface{}
tags TagMap
}
// Query defines an interface for a query to be used for subscribing.
type Query interface {
Matches(tags TagMap) bool
Matches(tags map[string]string) bool
String() string
}
type cmd struct {
op operation
// subscribe, unsubscribe
query Query
subscription *Subscription
clientID string
// publish
msg interface{}
tags map[string]string
}
// Server allows clients to subscribe/unsubscribe for messages, publishing
// messages with or without tags, and manages internal state.
type Server struct {
@ -107,37 +95,6 @@ type Server struct {
// Option sets a parameter for the server.
type Option func(*Server)
// TagMap is used to associate tags to a message.
// They can be queried by subscribers to choose messages they will received.
type TagMap interface {
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
Get(key string) (value string, ok bool)
// Len returns the number of tags.
Len() int
}
type tagMap map[string]string
var _ TagMap = (*tagMap)(nil)
// NewTagMap constructs a new immutable tag set from a map.
func NewTagMap(data map[string]string) TagMap {
return tagMap(data)
}
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
func (ts tagMap) Get(key string) (value string, ok bool) {
value, ok = ts[key]
return
}
// Len returns the number of tags.
func (ts tagMap) Len() int {
return len(ts)
}
// NewServer returns a new server. See the commentary on the Option functions
// for a detailed description of how to configure buffering. If no options are
// provided, the resulting server's queue is unbuffered.
@ -174,11 +131,34 @@ func (s *Server) BufferCapacity() int {
return s.cmdsCap
}
// Subscribe creates a subscription for the given client. It accepts a channel
// on which messages matching the given query can be received. An error will be
// returned to the caller if the context is canceled or if subscription already
// exist for pair clientID and query.
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
// Subscribe creates a subscription for the given client.
//
// An error will be returned to the caller if the context is canceled or if
// subscription already exist for pair clientID and query.
//
// outCapacity can be used to set a capacity for Subscription#Out channel (1 by
// default). Panics if outCapacity is less than or equal to zero. If you want
// an unbuffered channel, use SubscribeUnbuffered.
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, outCapacity ...int) (*Subscription, error) {
outCap := 1
if len(outCapacity) > 0 {
if outCapacity[0] <= 0 {
panic("Negative or zero capacity. Use SubscribeUnbuffered if you want an unbuffered channel")
}
outCap = outCapacity[0]
}
return s.subscribe(ctx, clientID, query, outCap)
}
// SubscribeUnbuffered does the same as Subscribe, except it returns a
// subscription with unbuffered channel. Use with caution as it can freeze the
// server.
func (s *Server) SubscribeUnbuffered(ctx context.Context, clientID string, query Query) (*Subscription, error) {
return s.subscribe(ctx, clientID, query, 0)
}
func (s *Server) subscribe(ctx context.Context, clientID string, query Query, outCapacity int) (*Subscription, error) {
s.mtx.RLock()
clientSubscriptions, ok := s.subscriptions[clientID]
if ok {
@ -186,22 +166,23 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou
}
s.mtx.RUnlock()
if ok {
return ErrAlreadySubscribed
return nil, ErrAlreadySubscribed
}
subscription := NewSubscription(outCapacity)
select {
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, subscription: subscription}:
s.mtx.Lock()
if _, ok = s.subscriptions[clientID]; !ok {
s.subscriptions[clientID] = make(map[string]struct{})
}
s.subscriptions[clientID][query.String()] = struct{}{}
s.mtx.Unlock()
return nil
return subscription, nil
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
case <-s.Quit():
return nil
return nil, nil
}
}
@ -261,13 +242,13 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]string)))
return s.PublishWithTags(ctx, msg, make(map[string]string))
}
// PublishWithTags publishes the given message with the set of tags. The set is
// matched with clients queries. If there is a match, the message is sent to
// the client.
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error {
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]string) error {
select {
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
return nil
@ -285,10 +266,8 @@ func (s *Server) OnStop() {
// NOTE: not goroutine safe
type state struct {
// query string -> client -> ch
queryToChanMap map[string]map[string]chan<- interface{}
// client -> query string -> struct{}
clientToQueryMap map[string]map[string]struct{}
// query string -> client -> subscription
subscriptions map[string]map[string]*Subscription
// query string -> queryPlusRefCount
queries map[string]*queryPlusRefCount
}
@ -303,9 +282,8 @@ type queryPlusRefCount struct {
// OnStart implements Service.OnStart by starting the server.
func (s *Server) OnStart() error {
go s.loop(state{
queryToChanMap: make(map[string]map[string]chan<- interface{}),
clientToQueryMap: make(map[string]map[string]struct{}),
queries: make(map[string]*queryPlusRefCount),
subscriptions: make(map[string]map[string]*Subscription),
queries: make(map[string]*queryPlusRefCount),
})
return nil
}
@ -321,75 +299,57 @@ loop:
switch cmd.op {
case unsub:
if cmd.query != nil {
state.remove(cmd.clientID, cmd.query)
state.remove(cmd.clientID, cmd.query.String(), ErrUnsubscribed)
} else {
state.removeAll(cmd.clientID)
state.removeClient(cmd.clientID, ErrUnsubscribed)
}
case shutdown:
for clientID := range state.clientToQueryMap {
state.removeAll(clientID)
}
state.removeAll(nil)
break loop
case sub:
state.add(cmd.clientID, cmd.query, cmd.ch)
state.add(cmd.clientID, cmd.query, cmd.subscription)
case pub:
state.send(cmd.msg, cmd.tags)
}
}
}
func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
func (state *state) add(clientID string, q Query, subscription *Subscription) {
qStr := q.String()
// initialize clientToChannelMap per query if needed
if _, ok := state.queryToChanMap[qStr]; !ok {
state.queryToChanMap[qStr] = make(map[string]chan<- interface{})
// initialize subscription for this client per query if needed
if _, ok := state.subscriptions[qStr]; !ok {
state.subscriptions[qStr] = make(map[string]*Subscription)
}
// create subscription
state.queryToChanMap[qStr][clientID] = ch
state.subscriptions[qStr][clientID] = subscription
// initialize queries if needed
// initialize query if needed
if _, ok := state.queries[qStr]; !ok {
state.queries[qStr] = &queryPlusRefCount{q: q, refCount: 0}
}
// increment reference counter
state.queries[qStr].refCount++
// add client if needed
if _, ok := state.clientToQueryMap[clientID]; !ok {
state.clientToQueryMap[clientID] = make(map[string]struct{})
}
state.clientToQueryMap[clientID][qStr] = struct{}{}
}
func (state *state) remove(clientID string, q Query) {
qStr := q.String()
clientToChannelMap, ok := state.queryToChanMap[qStr]
func (state *state) remove(clientID string, qStr string, reason error) {
clientSubscriptions, ok := state.subscriptions[qStr]
if !ok {
return
}
ch, ok := clientToChannelMap[clientID]
subscription, ok := clientSubscriptions[clientID]
if !ok {
return
}
close(ch)
// remove the query from client map.
// if client is not subscribed to anything else, remove it.
delete(state.clientToQueryMap[clientID], qStr)
if len(state.clientToQueryMap[clientID]) == 0 {
delete(state.clientToQueryMap, clientID)
}
subscription.cancel(reason)
// remove the client from query map.
// remove client from query map.
// if query has no other clients subscribed, remove it.
delete(state.queryToChanMap[qStr], clientID)
if len(state.queryToChanMap[qStr]) == 0 {
delete(state.queryToChanMap, qStr)
delete(state.subscriptions[qStr], clientID)
if len(state.subscriptions[qStr]) == 0 {
delete(state.subscriptions, qStr)
}
// decrease ref counter in queries
@ -400,41 +360,38 @@ func (state *state) remove(clientID string, q Query) {
}
}
func (state *state) removeAll(clientID string) {
queryMap, ok := state.clientToQueryMap[clientID]
if !ok {
return
}
for qStr := range queryMap {
ch := state.queryToChanMap[qStr][clientID]
close(ch)
// remove the client from query map.
// if query has no other clients subscribed, remove it.
delete(state.queryToChanMap[qStr], clientID)
if len(state.queryToChanMap[qStr]) == 0 {
delete(state.queryToChanMap, qStr)
func (state *state) removeClient(clientID string, reason error) {
for qStr, clientSubscriptions := range state.subscriptions {
if _, ok := clientSubscriptions[clientID]; ok {
state.remove(clientID, qStr, reason)
}
}
}
// decrease ref counter in queries
state.queries[qStr].refCount--
// remove the query if nobody else is using it
if state.queries[qStr].refCount == 0 {
delete(state.queries, qStr)
func (state *state) removeAll(reason error) {
for qStr, clientSubscriptions := range state.subscriptions {
for clientID := range clientSubscriptions {
state.remove(clientID, qStr, reason)
}
}
// remove the client.
delete(state.clientToQueryMap, clientID)
}
func (state *state) send(msg interface{}, tags TagMap) {
for qStr, clientToChannelMap := range state.queryToChanMap {
func (state *state) send(msg interface{}, tags map[string]string) {
for qStr, clientSubscriptions := range state.subscriptions {
q := state.queries[qStr].q
if q.Matches(tags) {
for _, ch := range clientToChannelMap {
ch <- msg
for clientID, subscription := range clientSubscriptions {
if cap(subscription.out) == 0 {
// block on unbuffered channel
subscription.out <- Message{msg, tags}
} else {
// don't block on buffered channels
select {
case subscription.out <- Message{msg, tags}:
default:
state.remove(clientID, qStr, ErrOutOfCapacity)
}
}
}
}
}


+ 148
- 59
libs/pubsub/pubsub_test.go View File

@ -27,16 +27,97 @@ func TestSubscribe(t *testing.T) {
defer s.Stop()
ctx := context.Background()
ch := make(chan interface{}, 1)
err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
err = s.Publish(ctx, "Ka-Zar")
require.NoError(t, err)
assertReceive(t, "Ka-Zar", ch)
assertReceive(t, "Ka-Zar", subscription.Out())
err = s.Publish(ctx, "Quicksilver")
published := make(chan struct{})
go func() {
defer close(published)
err := s.Publish(ctx, "Quicksilver")
assert.NoError(t, err)
err = s.Publish(ctx, "Asylum")
assert.NoError(t, err)
}()
select {
case <-published:
assertReceive(t, "Quicksilver", subscription.Out())
assertCancelled(t, subscription, pubsub.ErrOutOfCapacity)
case <-time.After(100 * time.Millisecond):
t.Fatal("Expected Publish(Asylum) not to block")
}
}
func TestSubscribeWithCapacity(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
assert.Panics(t, func() {
s.Subscribe(ctx, clientID, query.Empty{}, -1)
})
assert.Panics(t, func() {
s.Subscribe(ctx, clientID, query.Empty{}, 0)
})
subscription, err := s.Subscribe(ctx, clientID, query.Empty{}, 1)
require.NoError(t, err)
err = s.Publish(ctx, "Aggamon")
require.NoError(t, err)
assertReceive(t, "Aggamon", subscription.Out())
}
func TestSubscribeUnbuffered(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
subscription, err := s.SubscribeUnbuffered(ctx, clientID, query.Empty{})
require.NoError(t, err)
published := make(chan struct{})
go func() {
defer close(published)
err := s.Publish(ctx, "Ultron")
assert.NoError(t, err)
err = s.Publish(ctx, "Darkhawk")
assert.NoError(t, err)
}()
select {
case <-published:
t.Fatal("Expected Publish(Darkhawk) to block")
case <-time.After(100 * time.Millisecond):
assertReceive(t, "Ultron", subscription.Out())
assertReceive(t, "Darkhawk", subscription.Out())
}
}
func TestSlowClientIsRemovedWithErrOutOfCapacity(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
assertReceive(t, "Quicksilver", ch)
err = s.Publish(ctx, "Fat Cobra")
require.NoError(t, err)
err = s.Publish(ctx, "Viper")
require.NoError(t, err)
assertCancelled(t, subscription, pubsub.ErrOutOfCapacity)
}
func TestDifferentClients(t *testing.T) {
@ -46,27 +127,24 @@ func TestDifferentClients(t *testing.T) {
defer s.Stop()
ctx := context.Background()
ch1 := make(chan interface{}, 1)
err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1)
subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
err = s.PublishWithTags(ctx, "Iceman", map[string]string{"tm.events.type": "NewBlock"})
require.NoError(t, err)
assertReceive(t, "Iceman", ch1)
assertReceive(t, "Iceman", subscription1.Out())
ch2 := make(chan interface{}, 1)
err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2)
subscription2, err := s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"))
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}))
err = s.PublishWithTags(ctx, "Ultimo", map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
require.NoError(t, err)
assertReceive(t, "Ultimo", ch1)
assertReceive(t, "Ultimo", ch2)
assertReceive(t, "Ultimo", subscription1.Out())
assertReceive(t, "Ultimo", subscription2.Out())
ch3 := make(chan interface{}, 1)
err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3)
subscription3, err := s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"))
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewRoundStep"}))
err = s.PublishWithTags(ctx, "Valeria Richards", map[string]string{"tm.events.type": "NewRoundStep"})
require.NoError(t, err)
assert.Zero(t, len(ch3))
assert.Zero(t, len(subscription3.Out()))
}
func TestClientSubscribesTwice(t *testing.T) {
@ -78,20 +156,19 @@ func TestClientSubscribesTwice(t *testing.T) {
ctx := context.Background()
q := query.MustParse("tm.events.type='NewBlock'")
ch1 := make(chan interface{}, 1)
err := s.Subscribe(ctx, clientID, q, ch1)
subscription1, err := s.Subscribe(ctx, clientID, q)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
err = s.PublishWithTags(ctx, "Goblin Queen", map[string]string{"tm.events.type": "NewBlock"})
require.NoError(t, err)
assertReceive(t, "Goblin Queen", ch1)
assertReceive(t, "Goblin Queen", subscription1.Out())
ch2 := make(chan interface{}, 1)
err = s.Subscribe(ctx, clientID, q, ch2)
subscription2, err := s.Subscribe(ctx, clientID, q)
require.Error(t, err)
require.Nil(t, subscription2)
err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
err = s.PublishWithTags(ctx, "Spider-Man", map[string]string{"tm.events.type": "NewBlock"})
require.NoError(t, err)
assertReceive(t, "Spider-Man", ch1)
assertReceive(t, "Spider-Man", subscription1.Out())
}
func TestUnsubscribe(t *testing.T) {
@ -101,18 +178,16 @@ func TestUnsubscribe(t *testing.T) {
defer s.Stop()
ctx := context.Background()
ch := make(chan interface{})
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch)
subscription, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err)
assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe")
assert.Zero(t, len(subscription.Out()), "Should not receive anything after Unsubscribe")
_, ok := <-ch
assert.False(t, ok)
assertCancelled(t, subscription, pubsub.ErrUnsubscribed)
}
func TestClientUnsubscribesTwice(t *testing.T) {
@ -122,8 +197,7 @@ func TestClientUnsubscribesTwice(t *testing.T) {
defer s.Stop()
ctx := context.Background()
ch := make(chan interface{})
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch)
_, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
@ -141,18 +215,16 @@ func TestResubscribe(t *testing.T) {
defer s.Stop()
ctx := context.Background()
ch := make(chan interface{})
err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
ch = make(chan interface{})
err = s.Subscribe(ctx, clientID, query.Empty{}, ch)
subscription, err = s.Subscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
err = s.Publish(ctx, "Cable")
require.NoError(t, err)
assertReceive(t, "Cable", ch)
assertReceive(t, "Cable", subscription.Out())
}
func TestUnsubscribeAll(t *testing.T) {
@ -162,10 +234,9 @@ func TestUnsubscribeAll(t *testing.T) {
defer s.Stop()
ctx := context.Background()
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1)
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch1)
subscription1, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), ch2)
subscription2, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"))
require.NoError(t, err)
err = s.UnsubscribeAll(ctx, clientID)
@ -173,13 +244,11 @@ func TestUnsubscribeAll(t *testing.T) {
err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err)
assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll")
assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll")
assert.Zero(t, len(subscription1.Out()), "Should not receive anything after UnsubscribeAll")
assert.Zero(t, len(subscription2.Out()), "Should not receive anything after UnsubscribeAll")
_, ok := <-ch1
assert.False(t, ok)
_, ok = <-ch2
assert.False(t, ok)
assertCancelled(t, subscription1, pubsub.ErrUnsubscribed)
assertCancelled(t, subscription2, pubsub.ErrUnsubscribed)
}
func TestBufferCapacity(t *testing.T) {
@ -217,18 +286,26 @@ func benchmarkNClients(n int, b *testing.B) {
ctx := context.Background()
for i := 0; i < n; i++ {
ch := make(chan interface{})
subscription, err := s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)))
if err != nil {
b.Fatal(err)
}
go func() {
for range ch {
for {
select {
case <-subscription.Out():
continue
case <-subscription.Cancelled():
return
}
}
}()
s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch)
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)}))
s.PublishWithTags(ctx, "Gamora", map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)})
}
}
@ -240,18 +317,26 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
ctx := context.Background()
q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1")
for i := 0; i < n; i++ {
ch := make(chan interface{})
subscription, err := s.Subscribe(ctx, clientID, q)
if err != nil {
b.Fatal(err)
}
go func() {
for range ch {
for {
select {
case <-subscription.Out():
continue
case <-subscription.Cancelled():
return
}
}
}()
s.Subscribe(ctx, clientID, q, ch)
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"}))
s.PublishWithTags(ctx, "Gamora", map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"})
}
}
@ -259,14 +344,18 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
/// HELPERS
///////////////////////////////////////////////////////////////////////////////
func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) {
func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.Message, msgAndArgs ...interface{}) {
select {
case actual := <-ch:
if actual != nil {
assert.Equal(t, expected, actual, msgAndArgs...)
}
assert.Equal(t, expected, actual.Data(), msgAndArgs...)
case <-time.After(1 * time.Second):
t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected)
debug.PrintStack()
}
}
func assertCancelled(t *testing.T, subscription *pubsub.Subscription, err error) {
_, ok := <-subscription.Cancelled()
assert.False(t, ok)
assert.Equal(t, err, subscription.Err())
}

+ 1
- 3
libs/pubsub/query/empty.go View File

@ -1,13 +1,11 @@
package query
import "github.com/tendermint/tendermint/libs/pubsub"
// Empty query matches any set of tags.
type Empty struct {
}
// Matches always returns true.
func (Empty) Matches(tags pubsub.TagMap) bool {
func (Empty) Matches(tags map[string]string) bool {
return true
}


+ 4
- 5
libs/pubsub/query/empty_test.go View File

@ -5,14 +5,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
func TestEmptyQueryMatchesAnything(t *testing.T) {
q := query.Empty{}
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Asher": "Roth"})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66"})))
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66", "Billy": "Blue"})))
assert.True(t, q.Matches(map[string]string{}))
assert.True(t, q.Matches(map[string]string{"Asher": "Roth"}))
assert.True(t, q.Matches(map[string]string{"Route": "66"}))
assert.True(t, q.Matches(map[string]string{"Route": "66", "Billy": "Blue"}))
}

+ 4
- 6
libs/pubsub/query/query.go View File

@ -14,8 +14,6 @@ import (
"strconv"
"strings"
"time"
"github.com/tendermint/tendermint/libs/pubsub"
)
// Query holds the query string and the query parser.
@ -154,8 +152,8 @@ func (q *Query) Conditions() []Condition {
//
// For example, query "name=John" matches tags = {"name": "John"}. More
// examples could be found in parser_test.go and query_test.go.
func (q *Query) Matches(tags pubsub.TagMap) bool {
if tags.Len() == 0 {
func (q *Query) Matches(tags map[string]string) bool {
if len(tags) == 0 {
return false
}
@ -240,9 +238,9 @@ func (q *Query) Matches(tags pubsub.TagMap) bool {
// value from it to the operand using the operator.
//
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool {
func match(tag string, op Operator, operand reflect.Value, tags map[string]string) bool {
// look up the tag from the query in tags
value, ok := tags.Get(tag)
value, ok := tags[tag]
if !ok {
return false
}


+ 2
- 3
libs/pubsub/query/query_test.go View File

@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
@ -53,9 +52,9 @@ func TestMatches(t *testing.T) {
}
if tc.matches {
assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags)
assert.True(t, q.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags)
} else {
assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags)
assert.False(t, q.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags)
}
}
}


+ 89
- 0
libs/pubsub/subscription.go View File

@ -0,0 +1,89 @@
package pubsub
import (
"errors"
"sync"
)
var (
// ErrUnsubscribed is returned by Err when a client unsubscribes.
ErrUnsubscribed = errors.New("client unsubscribed")
// ErrOutOfCapacity is returned by Err when a client is not pulling messages
// fast enough. Note the client's subscription will be terminated.
ErrOutOfCapacity = errors.New("client is not pulling messages fast enough")
)
// A Subscription represents a client subscription for a particular query and
// consists of three things:
// 1) channel onto which messages and tags are published
// 2) channel which is closed if a client is too slow or choose to unsubscribe
// 3) err indicating the reason for (2)
type Subscription struct {
out chan Message
cancelled chan struct{}
mtx sync.RWMutex
err error
}
// NewSubscription returns a new subscription with the given outCapacity.
func NewSubscription(outCapacity int) *Subscription {
return &Subscription{
out: make(chan Message, outCapacity),
cancelled: make(chan struct{}),
}
}
// Out returns a channel onto which messages and tags are published.
// Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from
// receiving a nil message.
func (s *Subscription) Out() <-chan Message {
return s.out
}
// Cancelled returns a channel that's closed when the subscription is
// terminated and supposed to be used in a select statement.
func (s *Subscription) Cancelled() <-chan struct{} {
return s.cancelled
}
// Err returns nil if the channel returned by Cancelled is not yet closed.
// If the channel is closed, Err returns a non-nil error explaining why:
// - ErrUnsubscribed if the subscriber choose to unsubscribe,
// - ErrOutOfCapacity if the subscriber is not pulling messages fast enough
// and the channel returned by Out became full,
// After Err returns a non-nil error, successive calls to Err return the same
// error.
func (s *Subscription) Err() error {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.err
}
func (s *Subscription) cancel(err error) {
s.mtx.Lock()
s.err = err
s.mtx.Unlock()
close(s.cancelled)
}
// Message glues data and tags together.
type Message struct {
data interface{}
tags map[string]string
}
func NewMessage(data interface{}, tags map[string]string) Message {
return Message{data, tags}
}
// Data returns an original data published.
func (msg Message) Data() interface{} {
return msg.data
}
// Tags returns tags, which matched the client's query.
func (msg Message) Tags() map[string]string {
return msg.tags
}

+ 4
- 3
node/node_test.go View File

@ -42,11 +42,12 @@ func TestNodeStartStop(t *testing.T) {
t.Logf("Started node %v", n.sw.NodeInfo())
// wait for the node to produce a block
blockCh := make(chan interface{})
err = n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock, blockCh)
blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock)
require.NoError(t, err)
select {
case <-blockCh:
case <-blocksSub.Out():
case <-blocksSub.Cancelled():
t.Fatal("blocksSub was cancelled")
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for the node to produce a block")
}


+ 6
- 18
rpc/client/helpers.go View File

@ -59,32 +59,20 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
const subscriber = "helpers"
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
evts := make(chan interface{}, 1)
// register for the next event of this type
query := types.QueryForEvent(evtTyp)
err := c.Subscribe(ctx, subscriber, query, evts)
sub, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp))
if err != nil {
return nil, errors.Wrap(err, "failed to subscribe")
}
// make sure to unregister after the test is over
defer func() {
// drain evts to make sure we don't block
LOOP:
for {
select {
case <-evts:
default:
break LOOP
}
}
c.UnsubscribeAll(ctx, subscriber)
}()
defer c.UnsubscribeAll(ctx, subscriber)
select {
case evt := <-evts:
return evt.(types.TMEventData), nil
case msg := <-sub.Out():
return msg.Data().(types.TMEventData), nil
case <-sub.Cancelled():
return nil, errors.New("subscription was cancelled")
case <-ctx.Done():
return nil, errors.New("timed out waiting for event")
}


+ 51
- 14
rpc/client/httpclient.go View File

@ -249,6 +249,28 @@ func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) {
/** websocket event stuff here... **/
type subscription struct {
out chan tmpubsub.Message
cancelled chan struct{}
mtx sync.RWMutex
err error
}
func (s *subscription) Out() <-chan tmpubsub.Message {
return s.out
}
func (s *subscription) Cancelled() <-chan struct{} {
return s.cancelled
}
func (s *subscription) Err() error {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.err
}
type WSEvents struct {
cmn.BaseService
cdc *amino.Codec
@ -256,8 +278,9 @@ type WSEvents struct {
endpoint string
ws *rpcclient.WSClient
mtx sync.RWMutex
subscriptions map[string]chan<- interface{}
mtx sync.RWMutex
// query -> subscription
subscriptions map[string]*subscription
}
func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
@ -265,7 +288,7 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
cdc: cdc,
endpoint: endpoint,
remote: remote,
subscriptions: make(map[string]chan<- interface{}),
subscriptions: make(map[string]*subscription),
}
wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents)
@ -295,21 +318,29 @@ func (w *WSEvents) OnStop() {
}
}
func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) {
q := query.String()
err := w.ws.Subscribe(ctx, q)
if err != nil {
return err
return nil, err
}
outCap := 1
if len(outCapacity) > 0 && outCapacity[0] >= 0 {
outCap = outCapacity[0]
}
w.mtx.Lock()
// subscriber param is ignored because Tendermint will override it with
// remote IP anyway.
w.subscriptions[q] = out
w.subscriptions[q] = &subscription{
out: make(chan tmpubsub.Message, outCap),
cancelled: make(chan struct{}),
}
w.mtx.Unlock()
return nil
return w.subscriptions[q], nil
}
func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
@ -321,9 +352,12 @@ func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmp
}
w.mtx.Lock()
ch, ok := w.subscriptions[q]
sub, ok := w.subscriptions[q]
if ok {
close(ch)
close(sub.cancelled)
sub.mtx.Lock()
sub.err = errors.New("unsubscribed")
sub.mtx.Unlock()
delete(w.subscriptions, q)
}
w.mtx.Unlock()
@ -338,10 +372,13 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error
}
w.mtx.Lock()
for _, ch := range w.subscriptions {
close(ch)
for _, sub := range w.subscriptions {
close(sub.cancelled)
sub.mtx.Lock()
sub.err = errors.New("unsubscribed")
sub.mtx.Unlock()
}
w.subscriptions = make(map[string]chan<- interface{})
w.subscriptions = make(map[string]*subscription)
w.mtx.Unlock()
return nil
@ -381,8 +418,8 @@ func (w *WSEvents) eventListener() {
// NOTE: writing also happens inside mutex so we can't close a channel in
// Unsubscribe/UnsubscribeAll.
w.mtx.RLock()
if ch, ok := w.subscriptions[result.Query]; ok {
ch <- result.Data
if sub, ok := w.subscriptions[result.Query]; ok {
sub.out <- tmpubsub.NewMessage(result.Data, result.Tags)
}
w.mtx.RUnlock()
case <-w.Quit():


+ 2
- 2
rpc/client/localclient.go View File

@ -140,8 +140,8 @@ func (Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.Resu
return core.TxSearch(query, prove, page, perPage)
}
func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
return c.EventBus.Subscribe(ctx, subscriber, query, out)
func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) {
return c.EventBus.Subscribe(ctx, subscriber, query, outCapacity...)
}
func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {


+ 19
- 5
rpc/core/events.go View File

@ -101,16 +101,30 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
ch := make(chan interface{})
err = eventBusFor(wsCtx).Subscribe(ctx, addr, q, ch)
sub, err := eventBusFor(wsCtx).Subscribe(ctx, addr, q)
if err != nil {
return nil, err
}
go func() {
for event := range ch {
tmResult := &ctypes.ResultEvent{Query: query, Data: event.(tmtypes.TMEventData)}
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Codec(), rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)), tmResult))
for {
select {
case msg := <-sub.Out():
resultEvent := &ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}
wsCtx.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse(
wsCtx.Codec(),
rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)),
resultEvent,
))
case <-sub.Cancelled():
wsCtx.TryWriteRPCResponse(
rpctypes.RPCServerError(rpctypes.JSONRPCStringID(
fmt.Sprintf("%v#event", wsCtx.Request.ID)),
fmt.Errorf("subscription was cancelled (reason: %v)", sub.Err()),
))
return
}
}
}()


+ 12
- 19
rpc/core/mempool.go View File

@ -169,26 +169,14 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// Subscribe to tx being committed in block.
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
deliverTxResCh := make(chan interface{}, 1)
q := types.EventQueryTxFor(tx)
err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh)
deliverTxSub, err := eventBus.Subscribe(ctx, "mempool", q)
if err != nil {
err = errors.Wrap(err, "failed to subscribe to tx")
logger.Error("Error on broadcast_tx_commit", "err", err)
return nil, err
}
defer func() {
// drain deliverTxResCh to make sure we don't block
LOOP:
for {
select {
case <-deliverTxResCh:
default:
break LOOP
}
}
eventBus.Unsubscribe(context.Background(), "mempool", q)
}()
defer eventBus.Unsubscribe(context.Background(), "mempool", q)
// Broadcast tx and wait for CheckTx result
checkTxResCh := make(chan *abci.Response, 1)
@ -213,17 +201,22 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// TODO: configurable?
var deliverTxTimeout = rpcserver.WriteTimeout / 2
select {
case deliverTxResMsg, ok := <-deliverTxResCh: // The tx was included in a block.
if !ok {
return nil, errors.New("Error on broadcastTxCommit: expected DeliverTxResult, got nil. Did the Tendermint stop?")
}
deliverTxRes := deliverTxResMsg.(types.EventDataTx)
case msg := <-deliverTxSub.Out(): // The tx was included in a block.
deliverTxRes := msg.Data().(types.EventDataTx)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: deliverTxRes.Result,
Hash: tx.Hash(),
Height: deliverTxRes.Height,
}, nil
case <-deliverTxSub.Cancelled():
err = errors.New("deliverTxSub was cancelled. Did the Tendermint stop?")
logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, err
case <-time.After(deliverTxTimeout):
err = errors.New("Timed out waiting for tx to be included in a block")
logger.Error("Error on broadcastTxCommit", "err", err)


+ 1
- 0
rpc/core/types/responses.go View File

@ -205,4 +205,5 @@ type (
type ResultEvent struct {
Query string `json:"query"`
Data types.TMEventData `json:"data"`
Tags map[string]string `json:"tags"`
}

+ 1
- 0
rpc/lib/server/handlers.go View File

@ -526,6 +526,7 @@ func (wsc *wsConnection) OnStart() error {
func (wsc *wsConnection) OnStop() {
// Both read and write loops close the websocket connection when they exit their loops.
// The writeChan is never closed, to allow WriteRPCResponse() to fail.
if wsc.eventSub != nil {
wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr)
}


+ 8
- 7
rpc/lib/types/types.go View File

@ -12,6 +12,7 @@ import (
amino "github.com/tendermint/go-amino"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmtypes "github.com/tendermint/tendermint/types"
)
// a wrapper to emulate a sum type: jsonrpcid = string | int
@ -244,19 +245,19 @@ type WSRPCConnection interface {
Codec() *amino.Codec
}
// EventSubscriber mirros tendermint/tendermint/types.EventBusSubscriber
type EventSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
}
// websocket-only RPCFuncs take this as the first parameter.
type WSRPCContext struct {
Request RPCRequest
WSRPCConnection
}
// EventSubscriber mirrors tendermint/tendermint/types.EventBusSubscriber
type EventSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (tmtypes.Subscription, error)
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
}
//----------------------------------------
// SOCKETS
//


+ 6
- 5
state/execution_test.go View File

@ -317,8 +317,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
defer eventBus.Stop()
blockExec.SetEventBus(eventBus)
updatesCh := make(chan interface{}, 1)
err = eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates, updatesCh)
updatesSub, err := eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates)
require.NoError(t, err)
block := makeBlock(state, 1)
@ -342,13 +341,15 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
// test we threw an event
select {
case e := <-updatesCh:
event, ok := e.(types.EventDataValidatorSetUpdates)
require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", e)
case msg := <-updatesSub.Out():
event, ok := msg.Data().(types.EventDataValidatorSetUpdates)
require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", msg.Data())
if assert.NotEmpty(t, event.ValidatorUpdates) {
assert.Equal(t, pubkey, event.ValidatorUpdates[0].PubKey)
assert.EqualValues(t, 10, event.ValidatorUpdates[0].VotingPower)
}
case <-updatesSub.Cancelled():
t.Fatalf("updatesSub was cancelled (reason: %v)", updatesSub.Err())
case <-time.After(1 * time.Second):
t.Fatal("Did not receive EventValidatorSetUpdates within 1 sec.")
}


+ 22
- 17
state/txindex/indexer_service.go View File

@ -31,35 +31,40 @@ func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService
// OnStart implements cmn.Service by subscribing for all transactions
// and indexing them by tags.
func (is *IndexerService) OnStart() error {
blockHeadersCh := make(chan interface{})
if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader, blockHeadersCh); err != nil {
// Use SubscribeUnbuffered here to ensure both subscriptions does not get
// cancelled due to not pulling messages fast enough. Cause this might
// sometimes happen when there are no other subscribers.
blockHeadersSub, err := is.eventBus.SubscribeUnbuffered(context.Background(), subscriber, types.EventQueryNewBlockHeader)
if err != nil {
return err
}
txsCh := make(chan interface{})
if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, txsCh); err != nil {
txsSub, err := is.eventBus.SubscribeUnbuffered(context.Background(), subscriber, types.EventQueryTx)
if err != nil {
return err
}
go func() {
for {
e, ok := <-blockHeadersCh
if !ok {
return
}
header := e.(types.EventDataNewBlockHeader).Header
msg := <-blockHeadersSub.Out()
header := msg.Data().(types.EventDataNewBlockHeader).Header
batch := NewBatch(header.NumTxs)
for i := int64(0); i < header.NumTxs; i++ {
e, ok := <-txsCh
if !ok {
is.Logger.Error("Failed to index all transactions due to closed transactions channel", "height", header.Height, "numTxs", header.NumTxs, "numProcessed", i)
return
msg2 := <-txsSub.Out()
txResult := msg2.Data().(types.EventDataTx).TxResult
if err = batch.Add(&txResult); err != nil {
is.Logger.Error("Can't add tx to batch",
"height", header.Height,
"index", txResult.Index,
"err", err)
}
txResult := e.(types.EventDataTx).TxResult
batch.Add(&txResult)
}
is.idr.AddBatch(batch)
is.Logger.Info("Indexed block", "height", header.Height)
if err = is.idr.AddBatch(batch); err != nil {
is.Logger.Error("Failed to index block", "height", header.Height, "err", err)
} else {
is.Logger.Info("Indexed block", "height", header.Height)
}
}
}()
return nil


+ 64
- 0
state/txindex/indexer_service_test.go View File

@ -0,0 +1,64 @@
package txindex_test
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/kv"
"github.com/tendermint/tendermint/types"
)
func TestIndexerServiceIndexesBlocks(t *testing.T) {
// event bus
eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger())
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()
// tx indexer
store := db.NewMemDB()
txIndexer := kv.NewTxIndex(store, kv.IndexAllTags())
service := txindex.NewIndexerService(txIndexer, eventBus)
service.SetLogger(log.TestingLogger())
err = service.Start()
require.NoError(t, err)
defer service.Stop()
// publish block with txs
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: types.Header{Height: 1, NumTxs: 2},
})
txResult1 := &types.TxResult{
Height: 1,
Index: uint32(0),
Tx: types.Tx("foo"),
Result: abci.ResponseDeliverTx{Code: 0},
}
eventBus.PublishEventTx(types.EventDataTx{*txResult1})
txResult2 := &types.TxResult{
Height: 1,
Index: uint32(1),
Tx: types.Tx("bar"),
Result: abci.ResponseDeliverTx{Code: 0},
}
eventBus.PublishEventTx(types.EventDataTx{*txResult2})
time.Sleep(100 * time.Millisecond)
// check the result
res, err := txIndexer.Get(types.Tx("foo").Hash())
assert.NoError(t, err)
assert.Equal(t, txResult1, res)
res, err = txIndexer.Get(types.Tx("bar").Hash())
assert.NoError(t, err)
assert.Equal(t, txResult2, res)
}

+ 19
- 7
types/event_bus.go View File

@ -12,11 +12,17 @@ import (
const defaultCapacity = 0
type EventBusSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error)
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
}
type Subscription interface {
Out() <-chan tmpubsub.Message
Cancelled() <-chan struct{}
Err() error
}
// EventBus is a common bus for all events going through the system. All calls
// are proxied to underlying pubsub server. All events must be published using
// EventBus to ensure correct data types.
@ -52,8 +58,14 @@ func (b *EventBus) OnStop() {
b.pubsub.Stop()
}
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
return b.pubsub.Subscribe(ctx, subscriber, query, out)
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) {
return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...)
}
// This method can be used for a local consensus explorer and synchronous
// testing. Do not use for for public facing / untrusted subscriptions!
func (b *EventBus) SubscribeUnbuffered(ctx context.Context, subscriber string, query tmpubsub.Query) (Subscription, error) {
return b.pubsub.SubscribeUnbuffered(ctx, subscriber, query)
}
func (b *EventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
@ -67,7 +79,7 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error
func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
// no explicit deadline for publishing events
ctx := context.Background()
b.pubsub.PublishWithTags(ctx, eventData, tmpubsub.NewTagMap(map[string]string{EventTypeKey: eventType}))
b.pubsub.PublishWithTags(ctx, eventData, map[string]string{EventTypeKey: eventType})
return nil
}
@ -95,7 +107,7 @@ func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlock
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
b.pubsub.PublishWithTags(ctx, data, tags)
return nil
}
@ -111,7 +123,7 @@ func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) erro
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlockHeader
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
b.pubsub.PublishWithTags(ctx, data, tags)
return nil
}
@ -142,7 +154,7 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
logIfTagExists(TxHeightKey, tags, b.Logger)
tags[TxHeightKey] = fmt.Sprintf("%d", data.Height)
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
b.pubsub.PublishWithTags(ctx, data, tags)
return nil
}


+ 40
- 41
types/event_bus_test.go View File

@ -24,23 +24,20 @@ func TestEventBusPublishEventTx(t *testing.T) {
tx := Tx("foo")
result := abci.ResponseDeliverTx{Data: []byte("bar"), Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
txEventsCh := make(chan interface{})
// PublishEventTx adds all these 3 tags, so the query below should work
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND baz=1", tx.Hash())
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
txsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err)
done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataTx)
assert.Equal(t, int64(1), edt.Height)
assert.Equal(t, uint32(0), edt.Index)
assert.Equal(t, tx, edt.Tx)
assert.Equal(t, result, edt.Result)
close(done)
}
msg := <-txsSub.Out()
edt := msg.Data().(EventDataTx)
assert.Equal(t, int64(1), edt.Height)
assert.Equal(t, uint32(0), edt.Index)
assert.Equal(t, tx, edt.Tx)
assert.Equal(t, result, edt.Result)
close(done)
}()
err = eventBus.PublishEventTx(EventDataTx{TxResult{
@ -68,22 +65,19 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
txEventsCh := make(chan interface{})
// PublishEventNewBlock adds the tm.event tag, so the query below should work
query := "tm.event='NewBlock' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
blocksSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err)
done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}
msg := <-blocksSub.Out()
edt := msg.Data().(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}()
err = eventBus.PublishEventNewBlock(EventDataNewBlock{
@ -110,22 +104,19 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
txEventsCh := make(chan interface{})
// PublishEventNewBlockHeader adds the tm.event tag, so the query below should work
query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
headersSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err)
done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlockHeader)
assert.Equal(t, block.Header, edt.Header)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}
msg := <-headersSub.Out()
edt := msg.Data().(EventDataNewBlockHeader)
assert.Equal(t, block.Header, edt.Header)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}()
err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{
@ -148,18 +139,19 @@ func TestEventBusPublish(t *testing.T) {
require.NoError(t, err)
defer eventBus.Stop()
eventsCh := make(chan interface{})
err = eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, eventsCh)
const numEventsExpected = 14
sub, err := eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, numEventsExpected)
require.NoError(t, err)
const numEventsExpected = 14
done := make(chan struct{})
go func() {
numEvents := 0
for range eventsCh {
for range sub.Out() {
numEvents++
if numEvents >= numEventsExpected {
close(done)
return
}
}
}()
@ -243,15 +235,22 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes
q := EventQueryNewBlock
for i := 0; i < numClients; i++ {
ch := make(chan interface{})
go func() {
for range ch {
}
}()
if randQueries {
q = randQuery()
}
eventBus.Subscribe(ctx, fmt.Sprintf("client-%d", i), q, ch)
sub, err := eventBus.Subscribe(ctx, fmt.Sprintf("client-%d", i), q)
if err != nil {
b.Fatal(err)
}
go func() {
for {
select {
case <-sub.Out():
case <-sub.Cancelled():
return
}
}
}()
}
eventType := EventNewBlock


Loading…
Cancel
Save