Browse Source

start adapting other pkgs to new pubsub

pull/3227/head
Anton Kaliaev 6 years ago
parent
commit
641182e5d3
No known key found for this signature in database GPG Key ID: 7B6881D965918214
14 changed files with 277 additions and 252 deletions
  1. +6
    -5
      consensus/byzantine_test.go
  2. +51
    -51
      consensus/common_test.go
  3. +1
    -1
      consensus/mempool_test.go
  4. +84
    -59
      consensus/reactor_test.go
  5. +8
    -5
      consensus/replay.go
  6. +13
    -33
      consensus/replay_file.go
  7. +5
    -4
      consensus/replay_test.go
  8. +4
    -4
      consensus/state_test.go
  9. +19
    -5
      rpc/core/events.go
  10. +12
    -19
      rpc/core/mempool.go
  11. +5
    -4
      state/execution_test.go
  12. +28
    -19
      state/txindex/indexer_service.go
  13. +3
    -3
      types/event_bus.go
  14. +38
    -40
      types/event_bus_test.go

+ 6
- 5
consensus/byzantine_test.go View File

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
@ -49,7 +50,7 @@ func TestByzantine(t *testing.T) {
switches[i].SetLogger(p2pLogger.With("validator", i))
}
eventChans := make([]chan interface{}, N)
eventSubs := make([]*tmpubsub.Subscription, N)
reactors := make([]p2p.Reactor, N)
for i := 0; i < N; i++ {
// make first val byzantine
@ -68,8 +69,8 @@ func TestByzantine(t *testing.T) {
eventBus := css[i].eventBus
eventBus.SetLogger(logger.With("module", "events", "validator", i))
eventChans[i] = make(chan interface{}, 1)
err := eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i])
var err error
eventSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states
@ -134,7 +135,7 @@ func TestByzantine(t *testing.T) {
p2p.Connect2Switches(switches, ind1, ind2)
// wait for someone in the big partition (B) to make a block
<-eventChans[ind2]
<-eventSubs[ind2].Out()
t.Log("A block has been committed. Healing partition")
p2p.Connect2Switches(switches, ind0, ind1)
@ -146,7 +147,7 @@ func TestByzantine(t *testing.T) {
wg.Add(2)
for i := 1; i < N-1; i++ {
go func(j int) {
<-eventChans[j]
<-eventSubs[j].Out()
wg.Done()
}(i)
}


+ 51
- 51
consensus/common_test.go View File

@ -25,6 +25,7 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/privval"
@ -220,18 +221,17 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo
// genesis
func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
voteCh0 := make(chan interface{})
err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote, voteCh0)
voteCh0Sub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote)
if err != nil {
panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote))
}
voteCh := make(chan interface{})
go func() {
for v := range voteCh0 {
vote := v.(types.EventDataVote)
for msgAndTags := range voteCh0Sub.Out() {
vote := msgAndTags.Msg.(types.EventDataVote)
// we only fire for our own votes
if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
voteCh <- v
voteCh <- msgAndTags.Msg
}
}
}()
@ -311,7 +311,7 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
//-------------------------------------------------------------------------------
func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration,
func ensureNoNewEvent(ch <-chan tmpubsub.MsgAndTags, timeout time.Duration,
errorMessage string) {
select {
case <-time.After(timeout):
@ -321,28 +321,28 @@ func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration,
}
}
func ensureNoNewEventOnChannel(ch <-chan interface{}) {
func ensureNoNewEventOnChannel(ch <-chan tmpubsub.MsgAndTags) {
ensureNoNewEvent(
ch,
ensureTimeout,
"We should be stuck waiting, not receiving new event on the channel")
}
func ensureNoNewRoundStep(stepCh <-chan interface{}) {
func ensureNoNewRoundStep(stepCh <-chan tmpubsub.MsgAndTags) {
ensureNoNewEvent(
stepCh,
ensureTimeout,
"We should be stuck waiting, not receiving NewRoundStep event")
}
func ensureNoNewUnlock(unlockCh <-chan interface{}) {
func ensureNoNewUnlock(unlockCh <-chan tmpubsub.MsgAndTags) {
ensureNoNewEvent(
unlockCh,
ensureTimeout,
"We should be stuck waiting, not receiving Unlock event")
}
func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) {
func ensureNoNewTimeout(stepCh <-chan tmpubsub.MsgAndTags, timeout int64) {
timeoutDuration := time.Duration(timeout*5) * time.Nanosecond
ensureNoNewEvent(
stepCh,
@ -351,7 +351,7 @@ func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) {
}
func ensureNewEvent(
ch <-chan interface{},
ch <-chan tmpubsub.MsgAndTags,
height int64,
round int,
timeout time.Duration,
@ -361,7 +361,7 @@ func ensureNewEvent(
case <-time.After(timeout):
panic(errorMessage)
case ev := <-ch:
rs, ok := ev.(types.EventDataRoundState)
rs, ok := ev.Msg.(types.EventDataRoundState)
if !ok {
panic(
fmt.Sprintf(
@ -378,12 +378,12 @@ func ensureNewEvent(
}
}
func ensureNewRound(roundCh <-chan interface{}, height int64, round int) {
func ensureNewRound(roundCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewRound event")
case ev := <-roundCh:
rs, ok := ev.(types.EventDataNewRound)
rs, ok := ev.Msg.(types.EventDataNewRound)
if !ok {
panic(
fmt.Sprintf(
@ -399,18 +399,18 @@ func ensureNewRound(roundCh <-chan interface{}, height int64, round int) {
}
}
func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) {
func ensureNewTimeout(timeoutCh <-chan tmpubsub.MsgAndTags, height int64, round int, timeout int64) {
timeoutDuration := time.Duration(timeout*3) * time.Nanosecond
ensureNewEvent(timeoutCh, height, round, timeoutDuration,
"Timeout expired while waiting for NewTimeout event")
}
func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) {
func ensureNewProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh:
rs, ok := ev.(types.EventDataCompleteProposal)
rs, ok := ev.Msg.(types.EventDataCompleteProposal)
if !ok {
panic(
fmt.Sprintf(
@ -426,17 +426,17 @@ func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) {
}
}
func ensureNewValidBlock(validBlockCh <-chan interface{}, height int64, round int) {
func ensureNewValidBlock(validBlockCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
ensureNewEvent(validBlockCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewValidBlock event")
}
func ensureNewBlock(blockCh <-chan interface{}, height int64) {
func ensureNewBlock(blockCh <-chan tmpubsub.MsgAndTags, height int64) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewBlock event")
case ev := <-blockCh:
block, ok := ev.(types.EventDataNewBlock)
block, ok := ev.Msg.(types.EventDataNewBlock)
if !ok {
panic(fmt.Sprintf("expected a *types.EventDataNewBlock, "+
"got %v. wrong subscription channel?",
@ -448,12 +448,12 @@ func ensureNewBlock(blockCh <-chan interface{}, height int64) {
}
}
func ensureNewBlockHeader(blockCh <-chan interface{}, height int64, blockHash cmn.HexBytes) {
func ensureNewBlockHeader(blockCh <-chan tmpubsub.MsgAndTags, height int64, blockHash cmn.HexBytes) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewBlockHeader event")
case ev := <-blockCh:
blockHeader, ok := ev.(types.EventDataNewBlockHeader)
blockHeader, ok := ev.Msg.(types.EventDataNewBlockHeader)
if !ok {
panic(fmt.Sprintf("expected a *types.EventDataNewBlockHeader, "+
"got %v. wrong subscription channel?",
@ -468,42 +468,17 @@ func ensureNewBlockHeader(blockCh <-chan interface{}, height int64, blockHash cm
}
}
func ensureNewUnlock(unlockCh <-chan interface{}, height int64, round int) {
func ensureNewUnlock(unlockCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
ensureNewEvent(unlockCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewUnlock event")
}
func ensureVote(voteCh <-chan interface{}, height int64, round int,
voteType types.SignedMsgType) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewVote event")
case v := <-voteCh:
edv, ok := v.(types.EventDataVote)
if !ok {
panic(fmt.Sprintf("expected a *types.Vote, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(v)))
}
vote := edv.Vote
if vote.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height))
}
if vote.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round))
}
if vote.Type != voteType {
panic(fmt.Sprintf("expected type %v, got %v", voteType, vote.Type))
}
}
}
func ensureProposal(proposalCh <-chan interface{}, height int64, round int, propId types.BlockID) {
func ensureProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, round int, propId types.BlockID) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh:
rs, ok := ev.(types.EventDataCompleteProposal)
rs, ok := ev.Msg.(types.EventDataCompleteProposal)
if !ok {
panic(
fmt.Sprintf(
@ -530,7 +505,32 @@ func ensurePrevote(voteCh <-chan interface{}, height int64, round int) {
ensureVote(voteCh, height, round, types.PrevoteType)
}
func ensureNewEventOnChannel(ch <-chan interface{}) {
func ensureVote(voteCh <-chan interface{}, height int64, round int,
voteType types.SignedMsgType) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewVote event")
case v := <-voteCh:
edv, ok := v.(types.EventDataVote)
if !ok {
panic(fmt.Sprintf("expected a *types.Vote, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(v)))
}
vote := edv.Vote
if vote.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height))
}
if vote.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round))
}
if vote.Type != voteType {
panic(fmt.Sprintf("expected type %v, got %v", voteType, vote.Type))
}
}
}
func ensureNewEventOnChannel(ch <-chan tmpubsub.MsgAndTags) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for new activity on the channel")


+ 1
- 1
consensus/mempool_test.go View File

@ -118,7 +118,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
ticker := time.NewTicker(time.Second * 30)
select {
case b := <-newBlockCh:
evt := b.(types.EventDataNewBlock)
evt := b.Msg.(types.EventDataNewBlock)
nTxs += int(evt.Block.Header.NumTxs)
case <-ticker.C:
panic("Timed out waiting to commit blocks with transactions")


+ 84
- 59
consensus/reactor_test.go View File

@ -21,6 +21,7 @@ import (
cfg "github.com/tendermint/tendermint/config"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
@ -34,9 +35,14 @@ func init() {
//----------------------------------------------
// in-process testnets
func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*ConsensusReactor, []chan interface{}, []*types.EventBus) {
func startConsensusNet(t *testing.T, css []*ConsensusState, N int) (
[]*ConsensusReactor,
[]*tmpubsub.Subscription,
[]*types.EventBus,
) {
var err error
reactors := make([]*ConsensusReactor, N)
eventChans := make([]chan interface{}, N)
eventSubs := make([]*tmpubsub.Subscription, N)
eventBuses := make([]*types.EventBus, N)
for i := 0; i < N; i++ {
/*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info")
@ -48,8 +54,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
eventBuses[i] = css[i].eventBus
reactors[i].SetEventBus(eventBuses[i])
eventChans[i] = make(chan interface{}, 1)
err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i])
eventSubs[i], err = eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
}
// make connected switches and start all reactors
@ -67,7 +72,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s, 0)
}
return reactors, eventChans, eventBuses
return reactors, eventSubs, eventBuses
}
func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuses []*types.EventBus) {
@ -87,11 +92,11 @@ func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuse
func TestReactorBasic(t *testing.T) {
N := 4
css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
reactors, eventSubs, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block
timeoutWaitGroup(t, N, func(j int) {
<-eventChans[j]
<-eventSubs[j].Out()
}, css)
}
@ -163,20 +168,20 @@ func TestReactorWithEvidence(t *testing.T) {
css[i] = cs
}
reactors, eventChans, eventBuses := startConsensusNet(t, css, nValidators)
reactors, eventSubs, eventBuses := startConsensusNet(t, css, nValidators)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block with no evidence
timeoutWaitGroup(t, nValidators, func(j int) {
blockI := <-eventChans[j]
block := blockI.(types.EventDataNewBlock).Block
blockI := <-eventSubs[j].Out()
block := blockI.Msg.(types.EventDataNewBlock).Block
assert.True(t, len(block.Evidence.Evidence) == 0)
}, css)
// second block should have evidence
timeoutWaitGroup(t, nValidators, func(j int) {
blockI := <-eventChans[j]
block := blockI.(types.EventDataNewBlock).Block
blockI := <-eventSubs[j].Out()
block := blockI.Msg.(types.EventDataNewBlock).Block
assert.True(t, len(block.Evidence.Evidence) > 0)
}, css)
}
@ -221,7 +226,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
func(c *cfg.Config) {
c.Consensus.CreateEmptyBlocks = false
})
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
reactors, eventSubs, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// send a tx
@ -231,7 +236,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
// wait till everyone makes the first new block
timeoutWaitGroup(t, N, func(j int) {
<-eventChans[j]
<-eventSubs[j].Out()
}, css)
}
@ -239,12 +244,12 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
N := 4
css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
reactors, eventSubs, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block
timeoutWaitGroup(t, N, func(j int) {
<-eventChans[j]
<-eventSubs[j].Out()
}, css)
// Get peer
@ -263,7 +268,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
nVals := 4
logger := log.TestingLogger()
css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentKVStore)
reactors, eventChans, eventBuses := startConsensusNet(t, css, nVals)
reactors, eventSubs, eventBuses := startConsensusNet(t, css, nVals)
defer stopConsensusNet(logger, reactors, eventBuses)
// map of active validators
@ -275,7 +280,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
// wait till everyone makes block 1
timeoutWaitGroup(t, nVals, func(j int) {
<-eventChans[j]
<-eventSubs[j].Out()
}, css)
//---------------------------------------------------------------------------
@ -286,10 +291,10 @@ func TestReactorVotingPowerChange(t *testing.T) {
updateValidatorTx := kvstore.MakeValSetChangeTx(val1PubKeyABCI, 25)
previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventSubs, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css)
waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css)
if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
@ -298,10 +303,10 @@ func TestReactorVotingPowerChange(t *testing.T) {
updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 2)
previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventSubs, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css)
waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css)
if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
@ -310,10 +315,10 @@ func TestReactorVotingPowerChange(t *testing.T) {
updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 26)
previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css, updateValidatorTx)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventSubs, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css)
waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css)
if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
@ -327,7 +332,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
logger := log.TestingLogger()
reactors, eventChans, eventBuses := startConsensusNet(t, css, nPeers)
reactors, eventSubs, eventBuses := startConsensusNet(t, css, nPeers)
defer stopConsensusNet(logger, reactors, eventBuses)
// map of active validators
@ -339,7 +344,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
// wait till everyone makes block 1
timeoutWaitGroup(t, nPeers, func(j int) {
<-eventChans[j]
<-eventSubs[j].Out()
}, css)
//---------------------------------------------------------------------------
@ -352,22 +357,22 @@ func TestReactorValidatorSetChanges(t *testing.T) {
// wait till everyone makes block 2
// ensure the commit includes all validators
// send newValTx to change vals in block 3
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx1)
waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, newValidatorTx1)
// wait till everyone makes block 3.
// it includes the commit for block 2, which is by the original validator set
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx1)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, newValidatorTx1)
// wait till everyone makes block 4.
// it includes the commit for block 3, which is by the original validator set
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css)
// the commits for block 4 should be with the updated validator set
activeVals[string(newValidatorPubKey1.Address())] = struct{}{}
// wait till everyone makes block 5
// it includes the commit for block 4, which should have the updated validator set
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css)
//---------------------------------------------------------------------------
logger.Info("---------------------------- Testing changing the voting power of one validator")
@ -377,10 +382,10 @@ func TestReactorValidatorSetChanges(t *testing.T) {
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, updateValidatorTx1)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, updateValidatorTx1)
waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css)
if css[nVals].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
t.Errorf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[nVals].GetRoundState().LastValidators.TotalVotingPower())
@ -397,12 +402,12 @@ func TestReactorValidatorSetChanges(t *testing.T) {
newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3)
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css)
activeVals[string(newValidatorPubKey2.Address())] = struct{}{}
activeVals[string(newValidatorPubKey3.Address())] = struct{}{}
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css)
//---------------------------------------------------------------------------
logger.Info("---------------------------- Testing removing two validators at once")
@ -410,12 +415,12 @@ func TestReactorValidatorSetChanges(t *testing.T) {
removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0)
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css)
delete(activeVals, string(newValidatorPubKey2.Address()))
delete(activeVals, string(newValidatorPubKey3.Address()))
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css)
}
// Check we can make blocks with skip_timeout_commit=false
@ -427,23 +432,30 @@ func TestReactorWithTimeoutCommit(t *testing.T) {
css[i].config.SkipTimeoutCommit = false
}
reactors, eventChans, eventBuses := startConsensusNet(t, css, N-1)
reactors, eventSubs, eventBuses := startConsensusNet(t, css, N-1)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block
timeoutWaitGroup(t, N-1, func(j int) {
<-eventChans[j]
<-eventSubs[j].Out()
}, css)
}
func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
func waitForAndValidateBlock(
t *testing.T,
n int,
activeVals map[string]struct{},
eventSubs []*tmpubsub.Subscription,
css []*ConsensusState,
txs ...[]byte,
) {
timeoutWaitGroup(t, n, func(j int) {
css[j].Logger.Debug("waitForAndValidateBlock")
newBlockI, ok := <-eventChans[j]
newBlockI, ok := <-eventSubs[j].Out()
if !ok {
return
}
newBlock := newBlockI.(types.EventDataNewBlock).Block
newBlock := newBlockI.Msg.(types.EventDataNewBlock).Block
css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
@ -454,17 +466,24 @@ func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}
}, css)
}
func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
func waitForAndValidateBlockWithTx(
t *testing.T,
n int,
activeVals map[string]struct{},
eventSubs []*tmpubsub.Subscription,
css []*ConsensusState,
txs ...[]byte,
) {
timeoutWaitGroup(t, n, func(j int) {
ntxs := 0
BLOCK_TX_LOOP:
for {
css[j].Logger.Debug("waitForAndValidateBlockWithTx", "ntxs", ntxs)
newBlockI, ok := <-eventChans[j]
newBlockI, ok := <-eventSubs[j].Out()
if !ok {
return
}
newBlock := newBlockI.(types.EventDataNewBlock).Block
newBlock := newBlockI.Msg.(types.EventDataNewBlock).Block
css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
@ -485,18 +504,24 @@ func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]st
}, css)
}
func waitForBlockWithUpdatedValsAndValidateIt(t *testing.T, n int, updatedVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState) {
func waitForBlockWithUpdatedValsAndValidateIt(
t *testing.T,
n int,
updatedVals map[string]struct{},
eventSubs []*tmpubsub.Subscription,
css []*ConsensusState,
) {
timeoutWaitGroup(t, n, func(j int) {
var newBlock *types.Block
LOOP:
for {
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt")
newBlockI, ok := <-eventChans[j]
newBlockI, ok := <-eventSubs[j].Out()
if !ok {
return
}
newBlock = newBlockI.(types.EventDataNewBlock).Block
newBlock = newBlockI.Msg.(types.EventDataNewBlock).Block
if newBlock.LastCommit.Size() == len(updatedVals) {
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height)
break LOOP


+ 8
- 5
consensus/replay.go View File

@ -17,6 +17,7 @@ import (
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
@ -42,7 +43,7 @@ var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Unmarshal and apply a single message to the consensus state as if it were
// received in receiveRoutine. Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running.
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error {
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepSub *tmpubsub.Subscription) error {
// Skip meta messages which exist for demarcating boundaries.
if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil
@ -54,15 +55,17 @@ func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan
cs.Logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
// these are playback checks
ticker := time.After(time.Second * 2)
if newStepCh != nil {
if newStepSub != nil {
select {
case mi := <-newStepCh:
m2 := mi.(types.EventDataRoundState)
case mi := <-newStepSub.Out():
m2 := mi.Msg.(types.EventDataRoundState)
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
}
case <-newStepSub.Cancelled():
return fmt.Errorf("Failed to read off newStepSub.Out(). newStepSub was cancelled")
case <-ticker:
return fmt.Errorf("Failed to read off newStepCh")
return fmt.Errorf("Failed to read off newStepSub.Out()")
}
}
case msgInfo:


+ 13
- 33
consensus/replay_file.go View File

@ -16,6 +16,7 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
@ -51,25 +52,13 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
cs.startForReplay()
// ensure all new step events are regenerated as expected
newStepCh := make(chan interface{}, 1)
ctx := context.Background()
err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh)
newStepSub, err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
if err != nil {
return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)
}
defer func() {
// drain newStepCh to make sure we don't block
LOOP:
for {
select {
case <-newStepCh:
default:
break LOOP
}
}
cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
}()
defer cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
// just open the file for reading, no need to use wal
fp, err := os.OpenFile(file, os.O_RDONLY, 0600)
@ -94,7 +83,8 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
return err
}
if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil {
// TODO: pass a subscription
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
return err
}
@ -132,7 +122,7 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState sm.S
}
// go back count steps by resetting the state and running (pb.count - count) steps
func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
func (pb *playback) replayReset(count int, newStepSub *tmpubsub.Subscription) error {
pb.cs.Stop()
pb.cs.Wait()
@ -162,7 +152,7 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
} else if err != nil {
return err
}
if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil {
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
return err
}
pb.count++
@ -226,27 +216,16 @@ func (pb *playback) replayConsoleLoop() int {
ctx := context.Background()
// ensure all new step events are regenerated as expected
newStepCh := make(chan interface{}, 1)
err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh)
newStepSub, err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
if err != nil {
cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep))
}
defer func() {
// drain newStepCh to make sure we don't block
LOOP:
for {
select {
case <-newStepCh:
default:
break LOOP
}
}
pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
}()
defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
if len(tokens) == 1 {
if err := pb.replayReset(1, newStepCh); err != nil {
// TODO: pass a subscription
if err := pb.replayReset(1, newStepSub); err != nil {
pb.cs.Logger.Error("Replay reset error", "err", err)
}
} else {
@ -256,7 +235,8 @@ func (pb *playback) replayConsoleLoop() int {
} else if i > pb.count {
fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count)
} else {
if err := pb.replayReset(i, newStepCh); err != nil {
// TODO: pass a subscription
if err := pb.replayReset(i, newStepSub); err != nil {
pb.cs.Logger.Error("Replay reset error", "err", err)
}
}


+ 5
- 4
consensus/replay_test.go View File

@ -70,13 +70,14 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64,
// in the WAL itself. Assuming the consensus state is running, replay of any
// WAL, including the empty one, should eventually be followed by a new
// block, or else something is wrong.
newBlockCh := make(chan interface{}, 1)
err = cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, newBlockCh)
newBlockSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
select {
case <-newBlockCh:
case <-newBlockSub.Out():
case <-newBlockSub.Cancelled():
t.Fatal("newBlockSub was cancelled")
case <-time.After(60 * time.Second):
t.Fatalf("Timed out waiting for new block (see trace above)")
t.Fatal("Timed out waiting for new block (see trace above)")
}
}


+ 4
- 4
consensus/state_test.go View File

@ -1565,11 +1565,11 @@ func TestStateOutputVoteStats(t *testing.T) {
}
// subscribe subscribes test client to the given query and returns a channel with cap = 1.
func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan interface{} {
out := make(chan interface{}, 1)
err := eventBus.Subscribe(context.Background(), testSubscriber, q, out)
func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.MsgAndTags {
sub, err := eventBus.Subscribe(context.Background(), testSubscriber, q)
if err != nil {
panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, q))
}
return out
// XXX: return a subscription?
return sub.Out()
}

+ 19
- 5
rpc/core/events.go View File

@ -101,16 +101,30 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
ch := make(chan interface{})
err = eventBusFor(wsCtx).Subscribe(ctx, addr, q, ch)
sub, err := eventBusFor(wsCtx).Subscribe(ctx, addr, q)
if err != nil {
return nil, err
}
go func() {
for event := range ch {
tmResult := &ctypes.ResultEvent{query, event.(tmtypes.TMEventData)}
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Codec(), rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)), tmResult))
for {
select {
case event := <-sub.Out():
tmResult := &ctypes.ResultEvent{query, event.Msg.(tmtypes.TMEventData)}
wsCtx.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse(
wsCtx.Codec(),
rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)),
tmResult,
))
case <-sub.Cancelled():
wsCtx.TryWriteRPCResponse(
rpctypes.RPCServerError(rpctypes.JSONRPCStringID(
fmt.Sprintf("%v#event", wsCtx.Request.ID)),
fmt.Errorf("subscription was cancelled (reason: %v)", sub.Err()),
))
return
}
}
}()


+ 12
- 19
rpc/core/mempool.go View File

@ -169,26 +169,14 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// Subscribe to tx being committed in block.
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
deliverTxResCh := make(chan interface{}, 1)
q := types.EventQueryTxFor(tx)
err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh)
deliverTxSub, err := eventBus.Subscribe(ctx, "mempool", q)
if err != nil {
err = errors.Wrap(err, "failed to subscribe to tx")
logger.Error("Error on broadcast_tx_commit", "err", err)
return nil, err
}
defer func() {
// drain deliverTxResCh to make sure we don't block
LOOP:
for {
select {
case <-deliverTxResCh:
default:
break LOOP
}
}
eventBus.Unsubscribe(context.Background(), "mempool", q)
}()
defer eventBus.Unsubscribe(context.Background(), "mempool", q)
// Broadcast tx and wait for CheckTx result
checkTxResCh := make(chan *abci.Response, 1)
@ -213,17 +201,22 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// TODO: configurable?
var deliverTxTimeout = rpcserver.WriteTimeout / 2
select {
case deliverTxResMsg, ok := <-deliverTxResCh: // The tx was included in a block.
if !ok {
return nil, errors.New("Error on broadcastTxCommit: expected DeliverTxResult, got nil. Did the Tendermint stop?")
}
deliverTxRes := deliverTxResMsg.(types.EventDataTx)
case deliverTxResMsg := <-deliverTxSub.Out(): // The tx was included in a block.
deliverTxRes := deliverTxResMsg.Msg.(types.EventDataTx)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: deliverTxRes.Result,
Hash: tx.Hash(),
Height: deliverTxRes.Height,
}, nil
case <-deliverTxSub.Cancelled():
err = errors.New("deliverTxSub was cancelled. Did the Tendermint stop?")
logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, err
case <-time.After(deliverTxTimeout):
err = errors.New("Timed out waiting for tx to be included in a block")
logger.Error("Error on broadcastTxCommit", "err", err)


+ 5
- 4
state/execution_test.go View File

@ -317,8 +317,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
defer eventBus.Stop()
blockExec.SetEventBus(eventBus)
updatesCh := make(chan interface{}, 1)
err = eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates, updatesCh)
updatesSub, err := eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates)
require.NoError(t, err)
block := makeBlock(state, 1)
@ -342,13 +341,15 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
// test we threw an event
select {
case e := <-updatesCh:
event, ok := e.(types.EventDataValidatorSetUpdates)
case e := <-updatesSub.Out():
event, ok := e.Msg.(types.EventDataValidatorSetUpdates)
require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", e)
if assert.NotEmpty(t, event.ValidatorUpdates) {
assert.Equal(t, pubkey, event.ValidatorUpdates[0].PubKey)
assert.EqualValues(t, 10, event.ValidatorUpdates[0].VotingPower)
}
case <-updatesSub.Cancelled():
t.Fatal("updatesSub was cancelled.")
case <-time.After(1 * time.Second):
t.Fatal("Did not receive EventValidatorSetUpdates within 1 sec.")
}


+ 28
- 19
state/txindex/indexer_service.go View File

@ -31,35 +31,44 @@ func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService
// OnStart implements cmn.Service by subscribing for all transactions
// and indexing them by tags.
func (is *IndexerService) OnStart() error {
blockHeadersCh := make(chan interface{})
if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader, blockHeadersCh); err != nil {
blockHeadersSub, err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader)
if err != nil {
return err
}
txsCh := make(chan interface{})
if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, txsCh); err != nil {
txsSub, err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx)
if err != nil {
return err
}
go func() {
for {
e, ok := <-blockHeadersCh
if !ok {
return
}
header := e.(types.EventDataNewBlockHeader).Header
batch := NewBatch(header.NumTxs)
for i := int64(0); i < header.NumTxs; i++ {
e, ok := <-txsCh
if !ok {
is.Logger.Error("Failed to index all transactions due to closed transactions channel", "height", header.Height, "numTxs", header.NumTxs, "numProcessed", i)
return
select {
case msgAndTags := <-blockHeadersSub.Out():
header := msgAndTags.Msg.(types.EventDataNewBlockHeader).Header
batch := NewBatch(header.NumTxs)
for i := int64(0); i < header.NumTxs; i++ {
select {
case msgAndTags := <-txsSub.Out():
txResult := msgAndTags.Msg.(types.EventDataTx).TxResult
batch.Add(&txResult)
case <-txsSub.Cancelled():
is.Logger.Error("Failed to index a block. txsSub was cancelled. Did the Tendermint stop?",
"err", txsSub.Err(),
"height", header.Height,
"numTxs", header.NumTxs,
"numProcessed", i,
)
return
}
}
txResult := e.(types.EventDataTx).TxResult
batch.Add(&txResult)
is.idr.AddBatch(batch)
is.Logger.Info("Indexed block", "height", header.Height)
case <-blockHeadersSub.Cancelled():
is.Logger.Error("Failed to index a block. blockHeadersSub was cancelled. Did the Tendermint stop?",
"reason", blockHeadersSub.Err())
return
}
is.idr.AddBatch(batch)
is.Logger.Info("Indexed block", "height", header.Height)
}
}()
return nil


+ 3
- 3
types/event_bus.go View File

@ -12,7 +12,7 @@ import (
const defaultCapacity = 0
type EventBusSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (*tmpubsub.Subscription, error)
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
}
@ -52,8 +52,8 @@ func (b *EventBus) OnStop() {
b.pubsub.Stop()
}
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
return b.pubsub.Subscribe(ctx, subscriber, query, out)
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (*tmpubsub.Subscription, error) {
return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...)
}
func (b *EventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {


+ 38
- 40
types/event_bus_test.go View File

@ -24,23 +24,20 @@ func TestEventBusPublishEventTx(t *testing.T) {
tx := Tx("foo")
result := abci.ResponseDeliverTx{Data: []byte("bar"), Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
txEventsCh := make(chan interface{})
// PublishEventTx adds all these 3 tags, so the query below should work
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND baz=1", tx.Hash())
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
txEventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err)
done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataTx)
assert.Equal(t, int64(1), edt.Height)
assert.Equal(t, uint32(0), edt.Index)
assert.Equal(t, tx, edt.Tx)
assert.Equal(t, result, edt.Result)
close(done)
}
e := <-txEventsSub.Out()
edt := e.Msg.(EventDataTx)
assert.Equal(t, int64(1), edt.Height)
assert.Equal(t, uint32(0), edt.Index)
assert.Equal(t, tx, edt.Tx)
assert.Equal(t, result, edt.Result)
close(done)
}()
err = eventBus.PublishEventTx(EventDataTx{TxResult{
@ -68,22 +65,19 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
txEventsCh := make(chan interface{})
// PublishEventNewBlock adds the tm.event tag, so the query below should work
query := "tm.event='NewBlock' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
txEventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err)
done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}
e := <-txEventsSub.Out()
edt := e.Msg.(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}()
err = eventBus.PublishEventNewBlock(EventDataNewBlock{
@ -110,22 +104,19 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
txEventsCh := make(chan interface{})
// PublishEventNewBlockHeader adds the tm.event tag, so the query below should work
query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
txEventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err)
done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlockHeader)
assert.Equal(t, block.Header, edt.Header)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}
e := <-txEventsSub.Out()
edt := e.Msg.(EventDataNewBlockHeader)
assert.Equal(t, block.Header, edt.Header)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}()
err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{
@ -148,18 +139,18 @@ func TestEventBusPublish(t *testing.T) {
require.NoError(t, err)
defer eventBus.Stop()
eventsCh := make(chan interface{})
err = eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, eventsCh)
eventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.Empty{})
require.NoError(t, err)
const numEventsExpected = 14
done := make(chan struct{})
go func() {
numEvents := 0
for range eventsCh {
for range eventsSub.Out() {
numEvents++
if numEvents >= numEventsExpected {
close(done)
return
}
}
}()
@ -243,15 +234,22 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes
q := EventQueryNewBlock
for i := 0; i < numClients; i++ {
ch := make(chan interface{})
go func() {
for range ch {
}
}()
if randQueries {
q = randQuery()
}
eventBus.Subscribe(ctx, fmt.Sprintf("client-%d", i), q, ch)
sub, err := eventBus.Subscribe(ctx, fmt.Sprintf("client-%d", i), q)
if err != nil {
b.Fatal(err)
}
go func() {
for {
select {
case <-sub.Out():
case <-sub.Cancelled():
return
}
}
}()
}
eventType := EventNewBlock


Loading…
Cancel
Save