diff --git a/CHANGELOG.md b/CHANGELOG.md index 1832ff812..c5be08774 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ BREAKING CHANGES: - Better support for injecting randomness - Pass evidence/voteInfo through ABCI - Upgrade consensus for more real-time use of evidence +- New events system using tmlibs/pubsub FEATURES: - Peer reputation management diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 9ac580318..5a0730305 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -49,7 +49,7 @@ type BlockchainReactor struct { requestsCh chan BlockRequest timeoutsCh chan string - evsw types.EventSwitch + eventBus *types.EventBus } // NewBlockchainReactor returns new reactor instance. @@ -271,7 +271,7 @@ FOR_LOOP: // NOTE: we could improve performance if we // didn't make the app commit to disk every block // ... but we would need a way to get the hash without it persisting - err := bcR.state.ApplyBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader, types.MockMempool{}) + err := bcR.state.ApplyBlock(bcR.eventBus, bcR.proxyAppConn, first, firstPartsHeader, types.MockMempool{}) if err != nil { // TODO This is bad, are we zombie? cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) @@ -299,9 +299,9 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error { return nil } -// SetEventSwitch implements events.Eventable -func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) { - bcR.evsw = evsw +// SetEventBus sets event bus. +func (bcR *BlockchainReactor) SetEventBus(b *types.EventBus) { + bcR.eventBus = b } //----------------------------------------------------------------------------- diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index c96ccf976..6bd7bdd4a 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -1,16 +1,17 @@ package consensus import ( + "context" "sync" "testing" "time" + "github.com/stretchr/testify/require" crypto "github.com/tendermint/go-crypto" data "github.com/tendermint/go-wire/data" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/events" ) func init() { @@ -41,18 +42,8 @@ func TestByzantine(t *testing.T) { switches[i].SetLogger(p2pLogger.With("validator", i)) } - reactors := make([]p2p.Reactor, N) - defer func() { - for _, r := range reactors { - if rr, ok := r.(*ByzantineReactor); ok { - rr.reactor.Switch.Stop() - } else { - r.(*ConsensusReactor).Switch.Stop() - } - } - }() eventChans := make([]chan interface{}, N) - eventLogger := logger.With("module", "events") + reactors := make([]p2p.Reactor, N) for i := 0; i < N; i++ { if i == 0 { css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator) @@ -65,17 +56,19 @@ func TestByzantine(t *testing.T) { css[i].doPrevote = func(height, round int) {} } - eventSwitch := events.NewEventSwitch() - eventSwitch.SetLogger(eventLogger.With("validator", i)) - _, err := eventSwitch.Start() - if err != nil { - t.Fatalf("Failed to start switch: %v", err) - } - eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) + eventBus := types.NewEventBus() + eventBus.SetLogger(logger.With("module", "events", "validator", i)) + _, err := eventBus.Start() + require.NoError(t, err) + defer eventBus.Stop() + + eventChans[i] = make(chan interface{}, 1) + err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i]) + require.NoError(t, err) conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states conR.SetLogger(logger.With("validator", i)) - conR.SetEventSwitch(eventSwitch) + conR.SetEventBus(eventBus) var conRI p2p.Reactor conRI = conR @@ -86,6 +79,16 @@ func TestByzantine(t *testing.T) { reactors[i] = conRI } + defer func() { + for _, r := range reactors { + if rr, ok := r.(*ByzantineReactor); ok { + rr.reactor.Switch.Stop() + } else { + r.(*ConsensusReactor).Switch.Stop() + } + } + }() + p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { // ignore new switch s, we already made ours switches[i].AddReactor("CONSENSUS", reactors[i]) diff --git a/consensus/common.go b/consensus/common.go deleted file mode 100644 index 1e16c4dab..000000000 --- a/consensus/common.go +++ /dev/null @@ -1,35 +0,0 @@ -package consensus - -import ( - "github.com/tendermint/tendermint/types" -) - -// XXX: WARNING: these functions can halt the consensus as firing events is synchronous. -// Make sure to read off the channels, and in the case of subscribeToEventRespond, to write back on it - -// NOTE: if chanCap=0, this blocks on the event being consumed -func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { - // listen for event - ch := make(chan interface{}, chanCap) - types.AddListenerForEvent(evsw, receiver, eventID, func(data types.TMEventData) { - ch <- data - }) - return ch -} - -// NOTE: this blocks on receiving a response after the event is consumed -func subscribeToEventRespond(evsw types.EventSwitch, receiver, eventID string) chan interface{} { - // listen for event - ch := make(chan interface{}) - types.AddListenerForEvent(evsw, receiver, eventID, func(data types.TMEventData) { - ch <- data - <-ch - }) - return ch -} - -func discardFromChan(ch chan interface{}, n int) { - for i := 0; i < n; i++ { - <-ch - } -} diff --git a/consensus/common_test.go b/consensus/common_test.go index 9810024d4..50793e651 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -2,6 +2,7 @@ package consensus import ( "bytes" + "context" "fmt" "io/ioutil" "os" @@ -30,6 +31,10 @@ import ( "github.com/go-kit/kit/log/term" ) +const ( + testSubscriber = "test-client" +) + // genesis, chain_id, priv_val var config *cfg.Config // NOTE: must be reset for each _test.go file var ensureTimeout = time.Second * 2 @@ -208,11 +213,14 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo // genesis func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { - voteCh0 := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1) + voteCh0 := make(chan interface{}) + err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote, voteCh0) + if err != nil { + panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote)) + } voteCh := make(chan interface{}) go func() { - for { - v := <-voteCh0 + for v := range voteCh0 { vote := v.(types.TMEventData).Unwrap().(types.EventDataVote) // we only fire for our own votes if bytes.Equal(addr, vote.Vote.ValidatorAddress) { @@ -231,8 +239,12 @@ func newConsensusState(state *sm.State, pv types.PrivValidator, app abci.Applica } func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState { - // Get BlockStore blockDB := dbm.NewMemDB() + return newConsensusStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB) +} + +func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state *sm.State, pv types.PrivValidator, app abci.Application, blockDB dbm.DB) *ConsensusState { + // Get BlockStore blockStore := bc.NewBlockStore(blockDB) // one for mempool, one for consensus @@ -252,10 +264,11 @@ func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv typ cs.SetLogger(log.TestingLogger()) cs.SetPrivValidator(pv) - evsw := types.NewEventSwitch() - evsw.SetLogger(log.TestingLogger().With("module", "events")) - cs.SetEventSwitch(evsw) - evsw.Start() + eventBus := types.NewEventBus() + eventBus.SetLogger(log.TestingLogger().With("module", "events")) + eventBus.Start() + cs.SetEventBus(eventBus) + return cs } @@ -267,13 +280,13 @@ func loadPrivValidator(config *cfg.Config) *types.PrivValidatorFS { return privValidator } -func fixedConsensusStateDummy() *ConsensusState { +func fixedConsensusStateDummy(config *cfg.Config, logger log.Logger) *ConsensusState { stateDB := dbm.NewMemDB() state, _ := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile()) - state.SetLogger(log.TestingLogger().With("module", "state")) + state.SetLogger(logger.With("module", "state")) privValidator := loadPrivValidator(config) cs := newConsensusState(state, privValidator, dummy.NewDummyApplication()) - cs.SetLogger(log.TestingLogger()) + cs.SetLogger(logger) return cs } @@ -297,7 +310,7 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { //------------------------------------------------------------------------------- -func ensureNoNewStep(stepCh chan interface{}) { +func ensureNoNewStep(stepCh <-chan interface{}) { timer := time.NewTimer(ensureTimeout) select { case <-timer.C: @@ -307,7 +320,7 @@ func ensureNoNewStep(stepCh chan interface{}) { } } -func ensureNewStep(stepCh chan interface{}) { +func ensureNewStep(stepCh <-chan interface{}) { timer := time.NewTimer(ensureTimeout) select { case <-timer.C: @@ -362,10 +375,11 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerFunc func() TimeoutTicker, appFunc func() abci.Application) []*ConsensusState { genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower)) css := make([]*ConsensusState, nPeers) + logger := consensusLogger() for i := 0; i < nPeers; i++ { db := dbm.NewMemDB() // each state needs its own db state, _ := sm.MakeGenesisState(db, genDoc) - state.SetLogger(log.TestingLogger().With("module", "state")) + state.SetLogger(logger.With("module", "state", "validator", i)) state.Save() thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i)) ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal @@ -382,7 +396,7 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF app.InitChain(abci.RequestInitChain{Validators: vals}) css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, app) - css[i].SetLogger(log.TestingLogger()) + css[i].SetLogger(logger.With("validator", i)) css[i].SetTimeoutTicker(tickerFunc()) } return css diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 3a430ef26..a92ab473c 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -7,7 +7,6 @@ import ( abci "github.com/tendermint/abci/types" "github.com/tendermint/tendermint/types" - cmn "github.com/tendermint/tmlibs/common" ) @@ -22,7 +21,7 @@ func TestNoProgressUntilTxsAvailable(t *testing.T) { cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication()) cs.mempool.EnableTxsAvailable() height, round := cs.Height, cs.Round - newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) + newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) startTestRound(cs, height, round) ensureNewStep(newBlockCh) // first block gets committed @@ -41,7 +40,7 @@ func TestProgressAfterCreateEmptyBlocksInterval(t *testing.T) { cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication()) cs.mempool.EnableTxsAvailable() height, round := cs.Height, cs.Round - newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) + newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) startTestRound(cs, height, round) ensureNewStep(newBlockCh) // first block gets committed @@ -56,9 +55,9 @@ func TestProgressInHigherRound(t *testing.T) { cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication()) cs.mempool.EnableTxsAvailable() height, round := cs.Height, cs.Round - newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) - newRoundCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewRound(), 1) - timeoutCh := subscribeToEvent(cs.evsw, "tester", types.EventStringTimeoutPropose(), 1) + newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) + newRoundCh := subscribe(cs.eventBus, types.EventQueryNewRound) + timeoutCh := subscribe(cs.eventBus, types.EventQueryTimeoutPropose) cs.setProposal = func(proposal *types.Proposal) error { if cs.Height == 2 && cs.Round == 0 { // dont set the proposal in round 0 so we timeout and @@ -92,11 +91,10 @@ func deliverTxsRange(cs *ConsensusState, start, end int) { } func TestTxConcurrentWithCommit(t *testing.T) { - state, privVals := randGenesisState(1, false, 10) cs := newConsensusState(state, privVals[0], NewCounterApplication()) height, round := cs.Height, cs.Round - newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) + newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) NTxs := 10000 go deliverTxsRange(cs, 0, NTxs) diff --git a/consensus/reactor.go b/consensus/reactor.go index e68499928..88f3e328e 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -2,12 +2,14 @@ package consensus import ( "bytes" - "errors" + "context" "fmt" "reflect" "sync" "time" + "github.com/pkg/errors" + wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" @@ -34,10 +36,10 @@ type ConsensusReactor struct { p2p.BaseReactor // BaseService + p2p.Switch conS *ConsensusState - evsw types.EventSwitch mtx sync.RWMutex fastSync bool + eventBus *types.EventBus } // NewConsensusReactor returns a new ConsensusReactor with the given consensusState. @@ -55,9 +57,10 @@ func (conR *ConsensusReactor) OnStart() error { conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync()) conR.BaseReactor.OnStart() - // callbacks for broadcasting new steps and votes to peers - // upon their respective events (ie. uses evsw) - conR.registerEventCallbacks() + err := conR.broadcastNewRoundStepsAndVotes() + if err != nil { + return err + } if !conR.FastSync() { _, err := conR.conS.Start() @@ -65,6 +68,7 @@ func (conR *ConsensusReactor) OnStart() error { return err } } + return nil } @@ -306,10 +310,10 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) } } -// SetEventSwitch implements events.Eventable -func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { - conR.evsw = evsw - conR.conS.SetEventSwitch(evsw) +// SetEventBus sets event bus. +func (conR *ConsensusReactor) SetEventBus(b *types.EventBus) { + conR.eventBus = b + conR.conS.SetEventBus(b) } // FastSync returns whether the consensus reactor is in fast-sync mode. @@ -321,24 +325,47 @@ func (conR *ConsensusReactor) FastSync() bool { //-------------------------------------- -// Listens for new steps and votes, -// broadcasting the result to peers -func (conR *ConsensusReactor) registerEventCallbacks() { +// broadcastNewRoundStepsAndVotes subscribes for new round steps and votes +// using the event bus and broadcasts events to peers upon receiving them. +func (conR *ConsensusReactor) broadcastNewRoundStepsAndVotes() error { + subscriber := "consensus-reactor" + ctx := context.Background() - types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) { - rs := data.Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) - conR.broadcastNewRoundStep(rs) - }) + // new round steps + stepsCh := make(chan interface{}) + err := conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, stepsCh) + if err != nil { + return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryNewRoundStep) + } - types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) { - edv := data.Unwrap().(types.EventDataVote) - conR.broadcastHasVoteMessage(edv.Vote) - }) + // votes + votesCh := make(chan interface{}) + err = conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryVote, votesCh) + if err != nil { + return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryVote) + } - types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposalHeartbeat(), func(data types.TMEventData) { - heartbeat := data.Unwrap().(types.EventDataProposalHeartbeat) - conR.broadcastProposalHeartbeatMessage(heartbeat) - }) + go func() { + for { + select { + case data, ok := <-stepsCh: + if ok { // a receive from a closed channel returns the zero value immediately + edrs := data.(types.TMEventData).Unwrap().(types.EventDataRoundState) + conR.broadcastNewRoundStep(edrs.RoundState.(*cstypes.RoundState)) + } + case data, ok := <-votesCh: + if ok { + edv := data.(types.TMEventData).Unwrap().(types.EventDataVote) + conR.broadcastHasVoteMessage(edv.Vote) + } + case <-conR.Quit: + conR.eventBus.UnsubscribeAll(ctx, subscriber) + return + } + } + }() + + return nil } func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) { diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index ed8fa87ba..05a422da9 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -1,17 +1,19 @@ package consensus import ( + "context" "fmt" "sync" "testing" "time" "github.com/tendermint/abci/example/dummy" - "github.com/tendermint/tmlibs/events" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" + + "github.com/stretchr/testify/require" ) func init() { @@ -21,27 +23,25 @@ func init() { //---------------------------------------------- // in-process testnets -func startConsensusNet(t *testing.T, css []*ConsensusState, N int, subscribeEventRespond bool) ([]*ConsensusReactor, []chan interface{}) { +func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*ConsensusReactor, []chan interface{}, []*types.EventBus) { reactors := make([]*ConsensusReactor, N) eventChans := make([]chan interface{}, N) + eventBuses := make([]*types.EventBus, N) logger := consensusLogger() for i := 0; i < N; i++ { reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states reactors[i].SetLogger(logger.With("validator", i)) - eventSwitch := events.NewEventSwitch() - eventSwitch.SetLogger(logger.With("module", "events", "validator", i)) - _, err := eventSwitch.Start() - if err != nil { - t.Fatalf("Failed to start switch: %v", err) - } + eventBuses[i] = types.NewEventBus() + eventBuses[i].SetLogger(logger.With("module", "events", "validator", i)) + _, err := eventBuses[i].Start() + require.NoError(t, err) - reactors[i].SetEventSwitch(eventSwitch) - if subscribeEventRespond { - eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock()) - } else { - eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) - } + reactors[i].SetEventBus(eventBuses[i]) + + eventChans[i] = make(chan interface{}, 1) + err = eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i]) + require.NoError(t, err) } // make connected switches and start all reactors p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { @@ -56,21 +56,24 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int, subscribeEven s := reactors[i].conS.GetState() reactors[i].SwitchToConsensus(s, 0) } - return reactors, eventChans + return reactors, eventChans, eventBuses } -func stopConsensusNet(reactors []*ConsensusReactor) { +func stopConsensusNet(reactors []*ConsensusReactor, eventBuses []*types.EventBus) { for _, r := range reactors { r.Switch.Stop() } + for _, b := range eventBuses { + b.Stop() + } } // Ensure a testnet makes blocks func TestReactor(t *testing.T) { N := 4 css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) - reactors, eventChans := startConsensusNet(t, css, N, false) - defer stopConsensusNet(reactors) + reactors, eventChans, eventBuses := startConsensusNet(t, css, N) + defer stopConsensusNet(reactors, eventBuses) // wait till everyone makes the first new block timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) { <-eventChans[j] @@ -85,11 +88,14 @@ func TestReactorProposalHeartbeats(t *testing.T) { func(c *cfg.Config) { c.Consensus.CreateEmptyBlocks = false }) - reactors, eventChans := startConsensusNet(t, css, N, false) - defer stopConsensusNet(reactors) + reactors, eventChans, eventBuses := startConsensusNet(t, css, N) + defer stopConsensusNet(reactors, eventBuses) heartbeatChans := make([]chan interface{}, N) + var err error for i := 0; i < N; i++ { - heartbeatChans[i] = subscribeToEvent(css[i].evsw, "tester", types.EventStringProposalHeartbeat(), 1) + heartbeatChans[i] = make(chan interface{}, 1) + err = eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryProposalHeartbeat, heartbeatChans[i]) + require.NoError(t, err) } // wait till everyone sends a proposal heartbeat timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) { @@ -113,8 +119,8 @@ func TestReactorProposalHeartbeats(t *testing.T) { func TestVotingPowerChange(t *testing.T) { nVals := 4 css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentDummy) - reactors, eventChans := startConsensusNet(t, css, nVals, true) - defer stopConsensusNet(reactors) + reactors, eventChans, eventBuses := startConsensusNet(t, css, nVals) + defer stopConsensusNet(reactors, eventBuses) // map of active validators activeVals := make(map[string]struct{}) @@ -125,7 +131,6 @@ func TestVotingPowerChange(t *testing.T) { // wait till everyone makes block 1 timeoutWaitGroup(t, nVals, func(wg *sync.WaitGroup, j int) { <-eventChans[j] - eventChans[j] <- struct{}{} wg.Done() }, css) @@ -174,8 +179,9 @@ func TestValidatorSetChanges(t *testing.T) { nPeers := 7 nVals := 4 css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentDummy) - reactors, eventChans := startConsensusNet(t, css, nPeers, true) - defer stopConsensusNet(reactors) + + reactors, eventChans, eventBuses := startConsensusNet(t, css, nPeers) + defer stopConsensusNet(reactors, eventBuses) // map of active validators activeVals := make(map[string]struct{}) @@ -186,7 +192,6 @@ func TestValidatorSetChanges(t *testing.T) { // wait till everyone makes block 1 timeoutWaitGroup(t, nPeers, func(wg *sync.WaitGroup, j int) { <-eventChans[j] - eventChans[j] <- struct{}{} wg.Done() }, css) @@ -214,7 +219,7 @@ func TestValidatorSetChanges(t *testing.T) { // wait till everyone makes block 5 // it includes the commit for block 4, which should have the updated validator set - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) //--------------------------------------------------------------------------- t.Log("---------------------------- Testing changing the voting power of one validator") @@ -226,7 +231,7 @@ func TestValidatorSetChanges(t *testing.T) { waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) if css[nVals].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower { t.Errorf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[nVals].GetRoundState().LastValidators.TotalVotingPower()) @@ -246,7 +251,7 @@ func TestValidatorSetChanges(t *testing.T) { waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) activeVals[string(newValidatorPubKey2.Address())] = struct{}{} activeVals[string(newValidatorPubKey3.Address())] = struct{}{} - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) //--------------------------------------------------------------------------- t.Log("---------------------------- Testing removing two validators at once") @@ -259,7 +264,7 @@ func TestValidatorSetChanges(t *testing.T) { waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) delete(activeVals, string(newValidatorPubKey2.Address())) delete(activeVals, string(newValidatorPubKey3.Address())) - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) } // Check we can make blocks with skip_timeout_commit=false @@ -271,8 +276,8 @@ func TestReactorWithTimeoutCommit(t *testing.T) { css[i].config.SkipTimeoutCommit = false } - reactors, eventChans := startConsensusNet(t, css, N-1, false) - defer stopConsensusNet(reactors) + reactors, eventChans, eventBuses := startConsensusNet(t, css, N-1) + defer stopConsensusNet(reactors, eventBuses) // wait till everyone makes the first new block timeoutWaitGroup(t, N-1, func(wg *sync.WaitGroup, j int) { @@ -285,16 +290,40 @@ func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{} timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) { newBlockI := <-eventChans[j] newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block - t.Logf("[WARN] Got block height=%v validator=%v", newBlock.Height, j) + t.Logf("Got block height=%v validator=%v", newBlock.Height, j) err := validateBlock(newBlock, activeVals) if err != nil { t.Fatal(err) } for _, tx := range txs { - css[j].mempool.CheckTx(tx, nil) + if err = css[j].mempool.CheckTx(tx, nil); err != nil { + t.Fatal(err) + } + } + wg.Done() + }, css) +} + +func waitForBlockWithUpdatedValsAndValidateIt(t *testing.T, n int, updatedVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) { + timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) { + var newBlock *types.Block + LOOP: + for { + newBlockI := <-eventChans[j] + newBlock = newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block + if newBlock.LastCommit.Size() == len(updatedVals) { + t.Logf("Block with new validators height=%v validator=%v", newBlock.Height, j) + break LOOP + } else { + t.Logf("Block with no new validators height=%v validator=%v. Skipping...", newBlock.Height, j) + } + } + + err := validateBlock(newBlock, updatedVals) + if err != nil { + t.Fatal(err) } - eventChans[j] <- struct{}{} wg.Done() }, css) } diff --git a/consensus/replay.go b/consensus/replay.go index d3c5cd5d1..49aa5e7fe 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -91,7 +91,6 @@ func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan // replay only those messages since the last block. // timeoutRoutine should run concurrently to read off tickChan func (cs *ConsensusState) catchupReplay(csHeight int) error { - // set replayMode cs.replayMode = true defer func() { cs.replayMode = false }() @@ -104,7 +103,7 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error { gr.Close() } if found { - return errors.New(cmn.Fmt("WAL should not contain #ENDHEIGHT %d.", csHeight)) + return fmt.Errorf("WAL should not contain #ENDHEIGHT %d.", csHeight) } // Search for last height marker @@ -334,11 +333,10 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store func (h *Handshaker) replayBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) { mempool := types.MockMempool{} - var eventCache types.Fireable // nil block := h.store.LoadBlock(height) meta := h.store.LoadBlockMeta(height) - if err := h.state.ApplyBlock(eventCache, proxyApp, block, meta.BlockID.PartsHeader, mempool); err != nil { + if err := h.state.ApplyBlock(types.NopEventBus{}, proxyApp, block, meta.BlockID.PartsHeader, mempool); err != nil { return nil, err } diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 24df20fb3..3bdd349e3 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -2,13 +2,15 @@ package consensus import ( "bufio" - "errors" + "context" "fmt" "io" "os" "strconv" "strings" + "github.com/pkg/errors" + bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/proxy" @@ -42,7 +44,14 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { cs.startForReplay() // ensure all new step events are regenerated as expected - newStepCh := subscribeToEvent(cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1) + newStepCh := make(chan interface{}, 1) + + ctx := context.Background() + err := cs.eventBus.Subscribe(ctx, "replay-file", types.EventQueryNewRoundStep, newStepCh) + if err != nil { + return errors.Errorf("failed to subscribe replay-file to %v", types.EventQueryNewRoundStep) + } + defer cs.eventBus.Unsubscribe(ctx, "replay-file", types.EventQueryNewRoundStep) // just open the file for reading, no need to use wal fp, err := os.OpenFile(file, os.O_RDONLY, 0666) @@ -106,12 +115,11 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm. // go back count steps by resetting the state and running (pb.count - count) steps func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { - pb.cs.Stop() pb.cs.Wait() newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool) - newCS.SetEventSwitch(pb.cs.evsw) + newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() pb.fp.Close() @@ -196,8 +204,16 @@ func (pb *playback) replayConsoleLoop() int { // NOTE: "back" is not supported in the state machine design, // so we restart and replay up to + ctx := context.Background() // ensure all new step events are regenerated as expected - newStepCh := subscribeToEvent(pb.cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1) + newStepCh := make(chan interface{}, 1) + + err := pb.cs.eventBus.Subscribe(ctx, "replay-file", types.EventQueryNewRoundStep, newStepCh) + if err != nil { + cmn.Exit(fmt.Sprintf("failed to subscribe replay-file to %v", types.EventQueryNewRoundStep)) + } + defer pb.cs.eventBus.Unsubscribe(ctx, "replay-file", types.EventQueryNewRoundStep) + if len(tokens) == 1 { pb.replayReset(1, newStepCh) } else { @@ -270,14 +286,13 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err)) } - // Make event switch - eventSwitch := types.NewEventSwitch() - if _, err := eventSwitch.Start(); err != nil { - cmn.Exit(cmn.Fmt("Failed to start event switch: %v", err)) + eventBus := types.NewEventBus() + if _, err := eventBus.Start(); err != nil { + cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err)) } consensusState := NewConsensusState(csConfig, state.Copy(), proxyApp.Consensus(), blockStore, types.MockMempool{}) - consensusState.SetEventSwitch(eventSwitch) + consensusState.SetEventBus(eventBus) return consensusState } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 7d882dc1b..a5d3f0883 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -2,19 +2,24 @@ package consensus import ( "bytes" + "context" "errors" "fmt" "io" "io/ioutil" "os" "path" + "runtime" "testing" "time" + "github.com/stretchr/testify/require" + "github.com/tendermint/abci/example/dummy" abci "github.com/tendermint/abci/types" crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" + auto "github.com/tendermint/tmlibs/autofile" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" @@ -25,8 +30,10 @@ import ( "github.com/tendermint/tmlibs/log" ) +var consensusReplayConfig *cfg.Config + func init() { - config = ResetConfig("consensus_replay_test") + consensusReplayConfig = ResetConfig("consensus_replay_test") } // These tests ensure we can always recover from failure at any part of the consensus process. @@ -39,8 +46,7 @@ func init() { // NOTE: Files in this dir are generated by running the `build.sh` therein. // It's a simple way to generate wals for a single block, or multiple blocks, with random transactions, -// and different part sizes. The output is not deterministic, and the stepChanges may need to be adjusted -// after running it (eg. sometimes small_block2 will have 5 block parts, sometimes 6). +// and different part sizes. The output is not deterministic. // It should only have to be re-run if there is some breaking change to the consensus data structures (eg. blocks, votes) // or to the behaviour of the app (eg. computes app hash differently) var data_dir = path.Join(cmn.GoPath(), "src/github.com/tendermint/tendermint/consensus", "test_data") @@ -52,230 +58,209 @@ var data_dir = path.Join(cmn.GoPath(), "src/github.com/tendermint/tendermint/con // and which ones we need the wal for - then we'd also be able to only flush the // wal writer when we need to, instead of with every message. -// the priv validator changes step at these lines for a block with 1 val and 1 part -var baseStepChanges = []int{3, 6, 8} - -// test recovery from each line in each testCase -var testCases = []*testCase{ - newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part) - newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part - newTestCase("small_block2", []int{3, 12, 14}), // small block with txs across 6 smaller block parts -} - -type testCase struct { - name string - log []byte //full cs wal - stepMap map[int]int8 // map lines of log to privval step - - proposeLine int - prevoteLine int - precommitLine int -} - -func newTestCase(name string, stepChanges []int) *testCase { - if len(stepChanges) != 3 { - panic(cmn.Fmt("a full wal has 3 step changes! Got array %v", stepChanges)) - } - return &testCase{ - name: name, - log: readWAL(path.Join(data_dir, name+".cswal")), - stepMap: newMapFromChanges(stepChanges), - - proposeLine: stepChanges[0], - prevoteLine: stepChanges[1], - precommitLine: stepChanges[2], - } -} - -func newMapFromChanges(changes []int) map[int]int8 { - changes = append(changes, changes[2]+1) // so we add the last step change to the map - m := make(map[int]int8) - var count int - for changeNum, nextChange := range changes { - for ; count < nextChange; count++ { - m[count] = int8(changeNum) - } - } - return m -} - -func readWAL(p string) []byte { - b, err := ioutil.ReadFile(p) - if err != nil { - panic(err) - } - return b -} - -func writeWAL(walMsgs []byte) string { - walFile, err := ioutil.TempFile("", "wal") - if err != nil { - panic(fmt.Errorf("failed to create temp WAL file: %v", err)) - } - _, err = walFile.Write(walMsgs) - if err != nil { - panic(fmt.Errorf("failed to write to temp WAL file: %v", err)) - } - if err := walFile.Close(); err != nil { - panic(fmt.Errorf("failed to close temp WAL file: %v", err)) - } - return walFile.Name() -} - -func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) { - after := time.After(time.Second * 10) +func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int, blockDB dbm.DB, stateDB dbm.DB) { + logger := log.TestingLogger() + state, _ := sm.GetState(stateDB, consensusReplayConfig.GenesisFile()) + state.SetLogger(logger.With("module", "state")) + privValidator := loadPrivValidator(consensusReplayConfig) + cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB) + cs.SetLogger(logger) + + bytes, _ := ioutil.ReadFile(cs.config.WalFile()) + // fmt.Printf("====== WAL: \n\r%s\n", bytes) + t.Logf("====== WAL: \n\r%s\n", bytes) + + _, err := cs.Start() + require.NoError(t, err) + defer func() { + cs.Stop() + }() + + // This is just a signal that we haven't halted; its not something contained + // in the WAL itself. Assuming the consensus state is running, replay of any + // WAL, including the empty one, should eventually be followed by a new + // block, or else something is wrong. + newBlockCh := make(chan interface{}, 1) + err = cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, newBlockCh) + require.NoError(t, err) select { case <-newBlockCh: - case <-after: - panic(cmn.Fmt("Timed out waiting for new block for case '%s' line %d", thisCase.name, i)) + case <-time.After(10 * time.Second): + t.Fatalf("Timed out waiting for new block (see trace above)") } } -func runReplayTest(t *testing.T, cs *ConsensusState, walFile string, newBlockCh chan interface{}, - thisCase *testCase, i int) { - - cs.config.SetWalFile(walFile) - started, err := cs.Start() - if err != nil { - t.Fatalf("Cannot start consensus: %v", err) - } - if !started { - t.Error("Consensus did not start") - } - // Wait to make a new block. - // This is just a signal that we haven't halted; its not something contained in the WAL itself. - // Assuming the consensus state is running, replay of any WAL, including the empty one, - // should eventually be followed by a new block, or else something is wrong - waitForBlock(newBlockCh, thisCase, i) - cs.evsw.Stop() - cs.Stop() -LOOP: +func sendTxs(cs *ConsensusState, ctx context.Context) { + i := 0 for { select { - case <-newBlockCh: + case <-ctx.Done(): + return default: - break LOOP + cs.mempool.CheckTx([]byte{byte(i)}, nil) + i++ } } - cs.Wait() } -func toPV(pv types.PrivValidator) *types.PrivValidatorFS { - return pv.(*types.PrivValidatorFS) +// TestWALCrash uses crashing WAL to test we can recover from any WAL failure. +func TestWALCrash(t *testing.T) { + testCases := []struct { + name string + initFn func(*ConsensusState, context.Context) + heightToStop uint64 + }{ + {"empty block", + func(cs *ConsensusState, ctx context.Context) {}, + 1}, + {"block with a smaller part size", + func(cs *ConsensusState, ctx context.Context) { + // XXX: is there a better way to change BlockPartSizeBytes? + params := cs.state.Params + params.BlockPartSizeBytes = 512 + cs.state.Params = params + sendTxs(cs, ctx) + }, + 1}, + {"many non-empty blocks", + sendTxs, + 3}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + crashWALandCheckLiveness(t, tc.initFn, tc.heightToStop) + }) + } } -func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, []byte, string) { - t.Log("-------------------------------------") - t.Logf("Starting replay test %v (of %d lines of WAL). Crash after = %v", thisCase.name, nLines, crashAfter) +func crashWALandCheckLiveness(t *testing.T, initFn func(*ConsensusState, context.Context), heightToStop uint64) { + walPaniced := make(chan error) + crashingWal := &crashingWAL{panicCh: walPaniced, heightToStop: heightToStop} - lineStep := nLines - if crashAfter { - lineStep -= 1 - } + i := 1 +LOOP: + for { + // fmt.Printf("====== LOOP %d\n", i) + t.Logf("====== LOOP %d\n", i) + + // create consensus state from a clean slate + logger := log.NewNopLogger() + stateDB := dbm.NewMemDB() + state, _ := sm.MakeGenesisStateFromFile(stateDB, consensusReplayConfig.GenesisFile()) + state.SetLogger(logger.With("module", "state")) + privValidator := loadPrivValidator(consensusReplayConfig) + blockDB := dbm.NewMemDB() + cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB) + cs.SetLogger(logger) + + // start sending transactions + ctx, cancel := context.WithCancel(context.Background()) + go initFn(cs, ctx) + + // clean up WAL file from the previous iteration + walFile := cs.config.WalFile() + os.Remove(walFile) + + // set crashing WAL + csWal, err := cs.OpenWAL(walFile) + require.NoError(t, err) + crashingWal.next = csWal + // reset the message counter + crashingWal.msgIndex = 1 + cs.wal = crashingWal + + // start consensus state + _, err = cs.Start() + require.NoError(t, err) + + i++ - split := bytes.Split(thisCase.log, walSeparator) - lastMsg := split[nLines] + select { + case err := <-walPaniced: + t.Logf("WAL paniced: %v", err) - // we write those lines up to (not including) one with the signature - b := bytes.Join(split[:nLines], walSeparator) - b = append(b, walSeparator...) - walFile := writeWAL(b) + // make sure we can make blocks after a crash + startNewConsensusStateAndWaitForBlock(t, cs.Height, blockDB, stateDB) - cs := fixedConsensusStateDummy() + // stop consensus state and transactions sender (initFn) + cs.Stop() + cancel() - // set the last step according to when we crashed vs the wal - toPV(cs.privValidator).LastHeight = 1 // first block - toPV(cs.privValidator).LastStep = thisCase.stepMap[lineStep] + // if we reached the required height, exit + if _, ok := err.(ReachedHeightToStopError); ok { + break LOOP + } + case <-time.After(10 * time.Second): + t.Fatal("WAL did not panic for 10 seconds (check the log)") + } + } +} - t.Logf("[WARN] setupReplayTest LastStep=%v", toPV(cs.privValidator).LastStep) +// crashingWAL is a WAL which crashes or rather simulates a crash during Save +// (before and after). It remembers a message for which we last panicked +// (lastPanicedForMsgIndex), so we don't panic for it in subsequent iterations. +type crashingWAL struct { + next WAL + panicCh chan error + heightToStop uint64 - newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) + msgIndex int // current message index + lastPanicedForMsgIndex int // last message for which we panicked +} - return cs, newBlockCh, lastMsg, walFile +// WALWriteError indicates a WAL crash. +type WALWriteError struct { + msg string } -func readTimedWALMessage(t *testing.T, rawMsg []byte) TimedWALMessage { - b := bytes.NewBuffer(rawMsg) - // because rawMsg does not contain a separator and WALDecoder#Decode expects it - _, err := b.Write(walSeparator) - if err != nil { - t.Fatal(err) - } - dec := NewWALDecoder(b) - msg, err := dec.Decode() - if err != nil { - t.Fatalf("Error reading json data: %v", err) - } - return *msg +func (e WALWriteError) Error() string { + return e.msg } -//----------------------------------------------- -// Test the log at every iteration, and set the privVal last step -// as if the log was written after signing, before the crash - -func TestWALCrashAfterWrite(t *testing.T) { - for _, thisCase := range testCases { - splitSize := bytes.Count(thisCase.log, walSeparator) - for i := 0; i < splitSize-1; i++ { - t.Run(fmt.Sprintf("%s:%d", thisCase.name, i), func(t *testing.T) { - cs, newBlockCh, _, walFile := setupReplayTest(t, thisCase, i+1, true) - cs.config.TimeoutPropose = 100 - runReplayTest(t, cs, walFile, newBlockCh, thisCase, i+1) - // cleanup - os.Remove(walFile) - }) - } - } +// ReachedHeightToStopError indicates we've reached the required consensus +// height and may exit. +type ReachedHeightToStopError struct { + height uint64 } -//----------------------------------------------- -// Test the log as if we crashed after signing but before writing. -// This relies on privValidator.LastSignature being set - -func TestWALCrashBeforeWritePropose(t *testing.T) { - for _, thisCase := range testCases { - lineNum := thisCase.proposeLine - t.Run(fmt.Sprintf("%s:%d", thisCase.name, lineNum), func(t *testing.T) { - // setup replay test where last message is a proposal - cs, newBlockCh, proposalMsg, walFile := setupReplayTest(t, thisCase, lineNum, false) - cs.config.TimeoutPropose = 100 - msg := readTimedWALMessage(t, proposalMsg) - proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) - // Set LastSig - toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) - toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature - runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum) - // cleanup - os.Remove(walFile) - }) - } +func (e ReachedHeightToStopError) Error() string { + return fmt.Sprintf("reached height to stop %d", e.height) } -func TestWALCrashBeforeWritePrevote(t *testing.T) { - for _, thisCase := range testCases { - testReplayCrashBeforeWriteVote(t, thisCase, thisCase.prevoteLine, types.EventStringCompleteProposal()) +// Save simulate WAL's crashing by sending an error to the panicCh and then +// exiting the cs.receiveRoutine. +func (w *crashingWAL) Save(m WALMessage) { + if endMsg, ok := m.(EndHeightMessage); ok { + if endMsg.Height == w.heightToStop { + w.panicCh <- ReachedHeightToStopError{endMsg.Height} + runtime.Goexit() + } else { + w.next.Save(m) + } + return } -} -func TestWALCrashBeforeWritePrecommit(t *testing.T) { - for _, thisCase := range testCases { - testReplayCrashBeforeWriteVote(t, thisCase, thisCase.precommitLine, types.EventStringPolka()) + if w.msgIndex > w.lastPanicedForMsgIndex { + w.lastPanicedForMsgIndex = w.msgIndex + _, file, line, _ := runtime.Caller(1) + w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)} + runtime.Goexit() + } else { + w.msgIndex++ + w.next.Save(m) } } -func testReplayCrashBeforeWriteVote(t *testing.T, thisCase *testCase, lineNum int, eventString string) { - // setup replay test where last message is a vote - cs, newBlockCh, voteMsg, walFile := setupReplayTest(t, thisCase, lineNum, false) - types.AddListenerForEvent(cs.evsw, "tester", eventString, func(data types.TMEventData) { - msg := readTimedWALMessage(t, voteMsg) - vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) - // Set LastSig - toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) - toPV(cs.privValidator).LastSignature = vote.Vote.Signature - }) - runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum) +func (w *crashingWAL) Group() *auto.Group { return w.next.Group() } +func (w *crashingWAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) { + return w.next.SearchForEndHeight(height) } +func (w *crashingWAL) Start() (bool, error) { return w.next.Start() } +func (w *crashingWAL) Stop() bool { return w.next.Stop() } +func (w *crashingWAL) Wait() { w.next.Wait() } + //------------------------------------------------------------------------------------------ // Handshake Tests @@ -320,6 +305,21 @@ func TestHandshakeReplayNone(t *testing.T) { } } +func writeWAL(walMsgs []byte) string { + walFile, err := ioutil.TempFile("", "wal") + if err != nil { + panic(fmt.Errorf("failed to create temp WAL file: %v", err)) + } + _, err = walFile.Write(walMsgs) + if err != nil { + panic(fmt.Errorf("failed to write to temp WAL file: %v", err)) + } + if err := walFile.Close(); err != nil { + panic(fmt.Errorf("failed to close temp WAL file: %v", err)) + } + return walFile.Name() +} + // Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { config := ResetConfig("proxy_test_") @@ -397,7 +397,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) { testPartSize := st.Params.BlockPartSizeBytes - err := st.ApplyBlock(nil, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool) + err := st.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool) if err != nil { panic(err) } @@ -477,7 +477,7 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B //-------------------------- // utils for making blocks -func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) { +func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) { // Search for height marker gr, found, err := wal.SearchForEndHeight(0) if err != nil { diff --git a/consensus/state.go b/consensus/state.go index e5b7641f0..15a036930 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -91,13 +91,13 @@ type ConsensusState struct { internalMsgQueue chan msgInfo timeoutTicker TimeoutTicker - // we use PubSub to trigger msg broadcasts in the reactor, + // we use eventBus to trigger msg broadcasts in the reactor, // and to notify external subscribers, eg. through a websocket - evsw types.EventSwitch + eventBus *types.EventBus // a Write-Ahead Log ensures we can recover from any kind of crash // and helps us avoid signing conflicting votes - wal *WAL + wal WAL replayMode bool // so we don't log signing errors during replay doWALCatchup bool // determines if we even try to do the catchup @@ -125,6 +125,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppCon timeoutTicker: NewTimeoutTicker(), done: make(chan struct{}), doWALCatchup: true, + wal: nilWAL{}, } // set function defaults (may be overwritten before calling Start) cs.decideProposal = cs.defaultDecideProposal @@ -148,9 +149,9 @@ func (cs *ConsensusState) SetLogger(l log.Logger) { cs.timeoutTicker.SetLogger(l) } -// SetEventSwitch implements events.Eventable -func (cs *ConsensusState) SetEventSwitch(evsw types.EventSwitch) { - cs.evsw = evsw +// SetEventBus sets event bus. +func (cs *ConsensusState) SetEventBus(b *types.EventBus) { + cs.eventBus = b } // String returns a string. @@ -212,11 +213,16 @@ func (cs *ConsensusState) LoadCommit(height int) *types.Commit { // OnStart implements cmn.Service. // It loads the latest state via the WAL, and starts the timeout and receive routines. func (cs *ConsensusState) OnStart() error { - - walFile := cs.config.WalFile() - if err := cs.OpenWAL(walFile); err != nil { - cs.Logger.Error("Error loading ConsensusState wal", "err", err.Error()) - return err + // we may set the WAL in testing before calling Start, + // so only OpenWAL if its still the nilWAL + if _, ok := cs.wal.(nilWAL); ok { + walFile := cs.config.WalFile() + wal, err := cs.OpenWAL(walFile) + if err != nil { + cs.Logger.Error("Error loading ConsensusState wal", "err", err.Error()) + return err + } + cs.wal = wal } // we need the timeoutRoutine for replay so @@ -260,7 +266,7 @@ func (cs *ConsensusState) OnStop() { cs.timeoutTicker.Stop() // Make BaseService.Wait() wait until cs.wal.Wait() - if cs.wal != nil && cs.IsRunning() { + if cs.IsRunning() { cs.wal.Wait() } } @@ -273,25 +279,22 @@ func (cs *ConsensusState) Wait() { } // OpenWAL opens a file to log all consensus messages and timeouts for deterministic accountability -func (cs *ConsensusState) OpenWAL(walFile string) (err error) { - err = cmn.EnsureDir(filepath.Dir(walFile), 0700) +func (cs *ConsensusState) OpenWAL(walFile string) (WAL, error) { + err := cmn.EnsureDir(filepath.Dir(walFile), 0700) if err != nil { cs.Logger.Error("Error ensuring ConsensusState wal dir", "err", err.Error()) - return err + return nil, err } - cs.mtx.Lock() - defer cs.mtx.Unlock() wal, err := NewWAL(walFile, cs.config.WalLight) if err != nil { - return err + return nil, err } wal.SetLogger(cs.Logger.With("wal", walFile)) if _, err := wal.Start(); err != nil { - return err + return nil, err } - cs.wal = wal - return nil + return wal, nil } //------------------------------------------------------------ @@ -480,9 +483,9 @@ func (cs *ConsensusState) newStep() { rs := cs.RoundStateEvent() cs.wal.Save(rs) cs.nSteps += 1 - // newStep is called by updateToStep in NewConsensusState before the evsw is set! - if cs.evsw != nil { - types.FireEventNewRoundStep(cs.evsw, rs) + // newStep is called by updateToStep in NewConsensusState before the eventBus is set! + if cs.eventBus != nil { + cs.eventBus.PublishEventNewRoundStep(rs) } } @@ -536,9 +539,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // priv_val tracks LastSig // close wal now that we're done writing to it - if cs.wal != nil { - cs.wal.Stop() - } + cs.wal.Stop() close(cs.done) return @@ -607,13 +608,13 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) { case cstypes.RoundStepNewRound: cs.enterPropose(ti.Height, 0) case cstypes.RoundStepPropose: - types.FireEventTimeoutPropose(cs.evsw, cs.RoundStateEvent()) + cs.eventBus.PublishEventTimeoutPropose(cs.RoundStateEvent()) cs.enterPrevote(ti.Height, ti.Round) case cstypes.RoundStepPrevoteWait: - types.FireEventTimeoutWait(cs.evsw, cs.RoundStateEvent()) + cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()) cs.enterPrecommit(ti.Height, ti.Round) case cstypes.RoundStepPrecommitWait: - types.FireEventTimeoutWait(cs.evsw, cs.RoundStateEvent()) + cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()) cs.enterNewRound(ti.Height, ti.Round+1) default: panic(cmn.Fmt("Invalid timeout step: %v", ti.Step)) @@ -673,7 +674,7 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { } cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping - types.FireEventNewRound(cs.evsw, cs.RoundStateEvent()) + cs.eventBus.PublishEventNewRound(cs.RoundStateEvent()) // Wait for txs to be available in the mempool // before we enterPropose in round 0. If the last block changed the app hash, @@ -726,8 +727,7 @@ func (cs *ConsensusState) proposalHeartbeat(height, round int) { ValidatorIndex: valIndex, } cs.privValidator.SignHeartbeat(chainID, heartbeat) - heartbeatEvent := types.EventDataProposalHeartbeat{heartbeat} - types.FireEventProposalHeartbeat(cs.evsw, heartbeatEvent) + cs.eventBus.PublishEventProposalHeartbeat(types.EventDataProposalHeartbeat{heartbeat}) counter += 1 time.Sleep(proposalHeartbeatIntervalSeconds * time.Second) } @@ -885,7 +885,7 @@ func (cs *ConsensusState) enterPrevote(height int, round int) { // fire event for how we got here if cs.isProposalComplete() { - types.FireEventCompleteProposal(cs.evsw, cs.RoundStateEvent()) + cs.eventBus.PublishEventCompleteProposal(cs.RoundStateEvent()) } else { // we received +2/3 prevotes for a future round // TODO: catchup event? @@ -987,7 +987,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { } // At this point +2/3 prevoted for a particular block or nil - types.FireEventPolka(cs.evsw, cs.RoundStateEvent()) + cs.eventBus.PublishEventPolka(cs.RoundStateEvent()) // the latest POLRound should be this round polRound, _ := cs.Votes.POLInfo() @@ -1004,7 +1004,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { cs.LockedRound = 0 cs.LockedBlock = nil cs.LockedBlockParts = nil - types.FireEventUnlock(cs.evsw, cs.RoundStateEvent()) + cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()) } cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) return @@ -1016,7 +1016,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { if cs.LockedBlock.HashesTo(blockID.Hash) { cs.Logger.Info("enterPrecommit: +2/3 prevoted locked block. Relocking") cs.LockedRound = round - types.FireEventRelock(cs.evsw, cs.RoundStateEvent()) + cs.eventBus.PublishEventRelock(cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, blockID.Hash, blockID.PartsHeader) return } @@ -1031,7 +1031,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { cs.LockedRound = round cs.LockedBlock = cs.ProposalBlock cs.LockedBlockParts = cs.ProposalBlockParts - types.FireEventLock(cs.evsw, cs.RoundStateEvent()) + cs.eventBus.PublishEventLock(cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, blockID.Hash, blockID.PartsHeader) return } @@ -1047,7 +1047,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { cs.ProposalBlock = nil cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartsHeader) } - types.FireEventUnlock(cs.evsw, cs.RoundStateEvent()) + cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) } @@ -1191,21 +1191,19 @@ func (cs *ConsensusState) finalizeCommit(height int) { // WAL replay for blocks with an #ENDHEIGHT // As is, ConsensusState should not be started again // until we successfully call ApplyBlock (ie. here or in Handshake after restart) - if cs.wal != nil { - cs.wal.Save(EndHeightMessage{uint64(height)}) - } + cs.wal.Save(EndHeightMessage{uint64(height)}) fail.Fail() // XXX // Create a copy of the state for staging // and an event cache for txs stateCopy := cs.state.Copy() - eventCache := types.NewEventCache(cs.evsw) + txEventBuffer := types.NewTxEventBuffer(cs.eventBus, block.NumTxs) // Execute and commit the block, update and save the state, and update the mempool. // All calls to the proxyAppConn come here. // NOTE: the block.AppHash wont reflect these txs until the next block - err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) + err := stateCopy.ApplyBlock(txEventBuffer, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) if err != nil { cs.Logger.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "err", err) return @@ -1220,9 +1218,12 @@ func (cs *ConsensusState) finalizeCommit(height int) { // * Fire before persisting state, in ApplyBlock // * Fire on start up if we haven't written any new WAL msgs // Both options mean we may fire more than once. Is that fine ? - types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) - types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) - eventCache.Flush() + cs.eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) + cs.eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) + err = txEventBuffer.Flush() + if err != nil { + cs.Logger.Error("Failed to flush event buffer", "err", err) + } fail.Fail() // XXX @@ -1357,7 +1358,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, added, err = cs.LastCommit.AddVote(vote) if added { cs.Logger.Info(cmn.Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) - types.FireEventVote(cs.evsw, types.EventDataVote{vote}) + cs.eventBus.PublishEventVote(types.EventDataVote{vote}) // if we can skip timeoutCommit and have all the votes now, if cs.config.SkipTimeoutCommit && cs.LastCommit.HasAll() { @@ -1375,7 +1376,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, height := cs.Height added, err = cs.Votes.AddVote(vote, peerKey) if added { - types.FireEventVote(cs.evsw, types.EventDataVote{vote}) + cs.eventBus.PublishEventVote(types.EventDataVote{vote}) switch vote.Type { case types.VoteTypePrevote: @@ -1393,7 +1394,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, cs.LockedRound = 0 cs.LockedBlock = nil cs.LockedBlockParts = nil - types.FireEventUnlock(cs.evsw, cs.RoundStateEvent()) + cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()) } } if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() { diff --git a/consensus/state_test.go b/consensus/state_test.go index 060e37d4e..290eb0260 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -2,6 +2,7 @@ package consensus import ( "bytes" + "context" "fmt" "testing" "time" @@ -9,6 +10,7 @@ import ( cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" + tmpubsub "github.com/tendermint/tmlibs/pubsub" ) func init() { @@ -56,8 +58,8 @@ func TestProposerSelection0(t *testing.T) { cs1, vss := randConsensusState(4) height, round := cs1.Height, cs1.Round - newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) - proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) startTestRound(cs1, height, round) @@ -89,7 +91,7 @@ func TestProposerSelection0(t *testing.T) { func TestProposerSelection2(t *testing.T) { cs1, vss := randConsensusState(4) // test needs more work for more than 3 validators - newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) // this time we jump in at round 2 incrementRound(vss[1:]...) @@ -121,7 +123,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { height, round := cs.Height, cs.Round // Listen for propose timeout event - timeoutCh := subscribeToEvent(cs.evsw, "tester", types.EventStringTimeoutPropose(), 1) + timeoutCh := subscribe(cs.eventBus, types.EventQueryTimeoutPropose) startTestRound(cs, height, round) @@ -146,8 +148,8 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { // Listen for propose timeout event - timeoutCh := subscribeToEvent(cs.evsw, "tester", types.EventStringTimeoutPropose(), 1) - proposalCh := subscribeToEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), 1) + timeoutCh := subscribe(cs.eventBus, types.EventQueryTimeoutPropose) + proposalCh := subscribe(cs.eventBus, types.EventQueryCompleteProposal) cs.enterNewRound(height, round) cs.startRoutines(3) @@ -183,8 +185,8 @@ func TestBadProposal(t *testing.T) { partSize := cs1.state.Params.BlockPartSizeBytes - proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) - voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1) + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) + voteCh := subscribe(cs1.eventBus, types.EventQueryVote) propBlock, _ := cs1.createProposalBlock() //changeProposer(t, cs1, vs2) @@ -238,9 +240,9 @@ func TestFullRound1(t *testing.T) { cs, vss := randConsensusState(1) height, round := cs.Height, cs.Round - voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 0) - propCh := subscribeToEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), 1) - newRoundCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewRound(), 1) + voteCh := subscribe(cs.eventBus, types.EventQueryVote) + propCh := subscribe(cs.eventBus, types.EventQueryCompleteProposal) + newRoundCh := subscribe(cs.eventBus, types.EventQueryNewRound) startTestRound(cs, height, round) @@ -251,8 +253,6 @@ func TestFullRound1(t *testing.T) { propBlockHash := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState).ProposalBlock.Hash() <-voteCh // wait for prevote - // NOTE: voteChan cap of 0 ensures we can complete this - // before consensus can move to the next height (and cause a race condition) validatePrevote(t, cs, round, vss[0], propBlockHash) <-voteCh // wait for precommit @@ -268,7 +268,7 @@ func TestFullRoundNil(t *testing.T) { cs, vss := randConsensusState(1) height, round := cs.Height, cs.Round - voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1) + voteCh := subscribe(cs.eventBus, types.EventQueryVote) cs.enterPrevote(height, round) cs.startRoutines(4) @@ -287,8 +287,8 @@ func TestFullRound2(t *testing.T) { vs2 := vss[1] height, round := cs1.Height, cs1.Round - voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1) - newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlock(), 1) + voteCh := subscribe(cs1.eventBus, types.EventQueryVote) + newBlockCh := subscribe(cs1.eventBus, types.EventQueryNewBlock) // start round and wait for propose and prevote startTestRound(cs1, height, round) @@ -330,11 +330,11 @@ func TestLockNoPOL(t *testing.T) { partSize := cs1.state.Params.BlockPartSizeBytes - timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) - timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) - voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1) - proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) - newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) + timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) + timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) + voteCh := subscribe(cs1.eventBus, types.EventQueryVote) + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) /* Round1 (cs1, B) // B B // B B2 @@ -496,12 +496,12 @@ func TestLockPOLRelock(t *testing.T) { partSize := cs1.state.Params.BlockPartSizeBytes - timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) - timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) - proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) - voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1) - newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) - newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlockHeader(), 1) + timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) + timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) + voteCh := subscribe(cs1.eventBus, types.EventQueryVote) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) + newBlockCh := subscribe(cs1.eventBus, types.EventQueryNewBlockHeader) // everything done from perspective of cs1 @@ -609,11 +609,11 @@ func TestLockPOLUnlock(t *testing.T) { partSize := cs1.state.Params.BlockPartSizeBytes - proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) - timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) - timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) - newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) - unlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringUnlock(), 1) + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) + timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) + timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) + unlockCh := subscribe(cs1.eventBus, types.EventQueryUnlock) voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) // everything done from perspective of cs1 @@ -704,10 +704,10 @@ func TestLockPOLSafety1(t *testing.T) { partSize := cs1.state.Params.BlockPartSizeBytes - proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) - timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) - timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) - newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) + timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) + timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) // start round and wait for propose and prevote @@ -802,7 +802,7 @@ func TestLockPOLSafety1(t *testing.T) { // we should prevote what we're locked on validatePrevote(t, cs1, 2, vss[0], propBlockHash) - newStepCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRoundStep(), 1) + newStepCh := subscribe(cs1.eventBus, types.EventQueryNewRoundStep) // add prevotes from the earlier round addVotes(cs1, prevotes...) @@ -825,11 +825,11 @@ func TestLockPOLSafety2(t *testing.T) { partSize := cs1.state.Params.BlockPartSizeBytes - proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) - timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) - timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) - newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) - unlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringUnlock(), 1) + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) + timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) + timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) + unlockCh := subscribe(cs1.eventBus, types.EventQueryUnlock) voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) // the block for R0: gets polkad but we miss it @@ -919,9 +919,9 @@ func TestSlashingPrevotes(t *testing.T) { vs2 := vss[1] - proposalCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringCompleteProposal() , 1) - timeoutWaitCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringTimeoutWait() , 1) - newRoundCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringNewRound() , 1) + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) + timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) // start round and wait for propose and prevote @@ -954,9 +954,9 @@ func TestSlashingPrecommits(t *testing.T) { vs2 := vss[1] - proposalCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringCompleteProposal() , 1) - timeoutWaitCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringTimeoutWait() , 1) - newRoundCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringNewRound() , 1) + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) + timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) // start round and wait for propose and prevote @@ -1000,10 +1000,10 @@ func TestHalt1(t *testing.T) { partSize := cs1.state.Params.BlockPartSizeBytes - proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) - timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) - newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) - newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlock(), 1) + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) + timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) + newBlockCh := subscribe(cs1.eventBus, types.EventQueryNewBlock) voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) // start round and wait for propose and prevote @@ -1057,3 +1057,20 @@ func TestHalt1(t *testing.T) { panic("expected height to increment") } } + +// subscribe subscribes test client to the given query and returns a channel with cap = 1. +func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan interface{} { + out := make(chan interface{}, 1) + err := eventBus.Subscribe(context.Background(), testSubscriber, q, out) + if err != nil { + panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, q)) + } + return out +} + +// discardFromChan reads n values from the channel. +func discardFromChan(ch <-chan interface{}, n int) { + for i := 0; i < n; i++ { + <-ch + } +} diff --git a/consensus/wal.go b/consensus/wal.go index 80f4b809e..5ae02e4e7 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -45,11 +45,22 @@ var _ = wire.RegisterInterface( //-------------------------------------------------------- // Simple write-ahead logger +// WAL is an interface for any write-ahead logger. +type WAL interface { + Save(WALMessage) + Group() *auto.Group + SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) + + Start() (bool, error) + Stop() bool + Wait() +} + // Write ahead logger writes msgs to disk before they are processed. // Can be used for crash-recovery and deterministic replay // TODO: currently the wal is overwritten during replay catchup // give it a mode so it's either reading or appending - must read to end to start appending again -type WAL struct { +type baseWAL struct { cmn.BaseService group *auto.Group @@ -58,21 +69,25 @@ type WAL struct { enc *WALEncoder } -func NewWAL(walFile string, light bool) (*WAL, error) { +func NewWAL(walFile string, light bool) (*baseWAL, error) { group, err := auto.OpenGroup(walFile) if err != nil { return nil, err } - wal := &WAL{ + wal := &baseWAL{ group: group, light: light, enc: NewWALEncoder(group), } - wal.BaseService = *cmn.NewBaseService(nil, "WAL", wal) + wal.BaseService = *cmn.NewBaseService(nil, "baseWAL", wal) return wal, nil } -func (wal *WAL) OnStart() error { +func (wal *baseWAL) Group() *auto.Group { + return wal.group +} + +func (wal *baseWAL) OnStart() error { size, err := wal.group.Head.Size() if err != nil { return err @@ -83,13 +98,13 @@ func (wal *WAL) OnStart() error { return err } -func (wal *WAL) OnStop() { +func (wal *baseWAL) OnStop() { wal.BaseService.OnStop() wal.group.Stop() } // called in newStep and for each pass in receiveRoutine -func (wal *WAL) Save(msg WALMessage) { +func (wal *baseWAL) Save(msg WALMessage) { if wal == nil { return } @@ -119,7 +134,7 @@ func (wal *WAL) Save(msg WALMessage) { // Group reader will be nil if found equals false. // // CONTRACT: caller must close group reader. -func (wal *WAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) { +func (wal *baseWAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) { var msg *TimedWALMessage // NOTE: starting from the last file in the group because we're usually @@ -277,3 +292,14 @@ func readSeparator(r io.Reader) error { } return nil } + +type nilWAL struct{} + +func (nilWAL) Save(m WALMessage) {} +func (nilWAL) Group() *auto.Group { return nil } +func (nilWAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) { + return nil, false, nil +} +func (nilWAL) Start() (bool, error) { return true, nil } +func (nilWAL) Stop() bool { return true } +func (nilWAL) Wait() {} diff --git a/glide.lock b/glide.lock index cd105b3c1..13127b076 100644 --- a/glide.lock +++ b/glide.lock @@ -134,6 +134,8 @@ imports: - flowrate - log - merkle + - pubsub + - pubsub/query - test - name: golang.org/x/crypto version: 2509b142fb2b797aa7587dad548f113b2c0f20ce diff --git a/glide.yaml b/glide.yaml index 4c1f7e21f..a305f0b79 100644 --- a/glide.yaml +++ b/glide.yaml @@ -45,6 +45,7 @@ import: - flowrate - log - merkle + - pubsub - package: golang.org/x/crypto subpackages: - nacl/box diff --git a/mempool/reactor.go b/mempool/reactor.go index 87bac5d92..6a8765200 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -28,7 +28,6 @@ type MempoolReactor struct { p2p.BaseReactor config *cfg.MempoolConfig Mempool *Mempool - evsw types.EventSwitch } // NewMempoolReactor returns a new MempoolReactor with the given config and mempool. @@ -150,11 +149,6 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { } } -// SetEventSwitch implements events.Eventable. -func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) { - memR.evsw = evsw -} - //----------------------------------------------------------------------------- // Messages diff --git a/node/node.go b/node/node.go index 7bb714496..d5548415a 100644 --- a/node/node.go +++ b/node/node.go @@ -99,7 +99,7 @@ type Node struct { addrBook *p2p.AddrBook // known peers // services - evsw types.EventSwitch // pub/sub for services + eventBus *types.EventBus // pub/sub for services blockStore *bc.BlockStore // store the blockchain to disk bcReactor *bc.BlockchainReactor // for fast-syncing mempoolReactor *mempl.MempoolReactor // for gossipping transactions @@ -187,13 +187,6 @@ func NewNode(config *cfg.Config, // Generate node PrivKey privKey := crypto.GenPrivKeyEd25519() - // Make event switch - eventSwitch := types.NewEventSwitch() - eventSwitch.SetLogger(logger.With("module", "types")) - if _, err := eventSwitch.Start(); err != nil { - return nil, fmt.Errorf("Failed to start switch: %v", err) - } - // Decide whether to fast-sync or not // We don't fast-sync when the only validator is us. fastSync := config.FastSync @@ -280,14 +273,16 @@ func NewNode(config *cfg.Config, }) } - // add the event switch to all services - // they should all satisfy events.Eventable - SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor) + eventBus := types.NewEventBus() + eventBus.SetLogger(logger.With("module", "events")) + + // services which will be publishing and/or subscribing for messages (events) + bcReactor.SetEventBus(eventBus) + consensusReactor.SetEventBus(eventBus) // run the profile server profileHost := config.ProfListenAddress if profileHost != "" { - go func() { logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil)) }() @@ -302,7 +297,6 @@ func NewNode(config *cfg.Config, sw: sw, addrBook: addrBook, - evsw: eventSwitch, blockStore: blockStore, bcReactor: bcReactor, mempoolReactor: mempoolReactor, @@ -310,6 +304,7 @@ func NewNode(config *cfg.Config, consensusReactor: consensusReactor, proxyApp: proxyApp, txIndexer: txIndexer, + eventBus: eventBus, } node.BaseService = *cmn.NewBaseService(logger, "Node", node) return node, nil @@ -317,6 +312,11 @@ func NewNode(config *cfg.Config, // OnStart starts the Node. It implements cmn.Service. func (n *Node) OnStart() error { + _, err := n.eventBus.Start() + if err != nil { + return err + } + // Run the RPC server first // so we can eg. receive txs for the first block if n.config.RPC.ListenAddress != "" { @@ -335,7 +335,7 @@ func (n *Node) OnStart() error { // Start the switch n.sw.SetNodeInfo(n.makeNodeInfo()) n.sw.SetNodePrivKey(n.privKey) - _, err := n.sw.Start() + _, err = n.sw.Start() if err != nil { return err } @@ -366,6 +366,8 @@ func (n *Node) OnStop() { n.Logger.Error("Error closing listener", "listener", l, "err", err) } } + + n.eventBus.Stop() } // RunForever waits for an interupt signal and stops the node. @@ -376,13 +378,6 @@ func (n *Node) RunForever() { }) } -// SetEventSwitch adds the event switch to reactors, mempool, etc. -func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) { - for _, e := range eventables { - e.SetEventSwitch(evsw) - } -} - // AddListener adds a listener to accept inbound peer connections. // It should be called before starting the Node. // The first listener is the primary listener (in NodeInfo) @@ -393,7 +388,6 @@ func (n *Node) AddListener(l p2p.Listener) { // ConfigureRPC sets all variables in rpccore so they will serve // rpc calls from this node func (n *Node) ConfigureRPC() { - rpccore.SetEventSwitch(n.evsw) rpccore.SetBlockStore(n.blockStore) rpccore.SetConsensusState(n.consensusState) rpccore.SetMempool(n.mempoolReactor.Mempool) @@ -404,6 +398,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetProxyAppQuery(n.proxyApp.Query()) rpccore.SetTxIndexer(n.txIndexer) rpccore.SetConsensusReactor(n.consensusReactor) + rpccore.SetEventBus(n.eventBus) rpccore.SetLogger(n.Logger.With("module", "rpc")) } @@ -420,7 +415,7 @@ func (n *Node) startRPC() ([]net.Listener, error) { for i, listenAddr := range listenAddrs { mux := http.NewServeMux() rpcLogger := n.Logger.With("module", "rpc-server") - wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw) + wm := rpcserver.NewWebsocketManager(rpccore.Routes) wm.SetLogger(rpcLogger.With("protocol", "websocket")) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) @@ -469,9 +464,9 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor { return n.mempoolReactor } -// EventSwitch returns the Node's EventSwitch. -func (n *Node) EventSwitch() types.EventSwitch { - return n.evsw +// EventBus returns the Node's EventBus. +func (n *Node) EventBus() *types.EventBus { + return n.eventBus } // PrivValidator returns the Node's PrivValidator. diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index a10021821..e5f5aba76 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -31,7 +31,7 @@ func TestHeaderEvents(t *testing.T) { defer c.Stop() } - evtTyp := types.EventStringNewBlockHeader() + evtTyp := types.EventNewBlockHeader evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) require.Nil(err, "%d: %+v", i, err) _, ok := evt.Unwrap().(types.EventDataNewBlockHeader) @@ -54,20 +54,20 @@ func TestBlockEvents(t *testing.T) { // listen for a new block; ensure height increases by 1 var firstBlockHeight int - for i := 0; i < 3; i++ { - evtTyp := types.EventStringNewBlock() + for j := 0; j < 3; j++ { + evtTyp := types.EventNewBlock evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) - require.Nil(err, "%d: %+v", i, err) + require.Nil(err, "%d: %+v", j, err) blockEvent, ok := evt.Unwrap().(types.EventDataNewBlock) - require.True(ok, "%d: %#v", i, evt) + require.True(ok, "%d: %#v", j, evt) block := blockEvent.Block - if i == 0 { + if j == 0 { firstBlockHeight = block.Header.Height continue } - require.Equal(block.Header.Height, firstBlockHeight+i) + require.Equal(block.Header.Height, firstBlockHeight+j) } } } @@ -86,7 +86,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { // make the tx _, _, tx := MakeTxKV() - evtTyp := types.EventStringTx(types.Tx(tx)) + evtTyp := types.EventTx // send async txres, err := c.BroadcastTxAsync(tx) @@ -119,9 +119,9 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { // make the tx _, _, tx := MakeTxKV() - evtTyp := types.EventStringTx(types.Tx(tx)) + evtTyp := types.EventTx - // send async + // send sync txres, err := c.BroadcastTxSync(tx) require.Nil(err, "%+v", err) require.True(txres.Code.IsOK()) diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index bc26ea57f..c2f06c005 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -1,12 +1,12 @@ package client import ( + "context" + "fmt" "time" "github.com/pkg/errors" "github.com/tendermint/tendermint/types" - cmn "github.com/tendermint/tmlibs/common" - events "github.com/tendermint/tmlibs/events" ) // Waiter is informed of current height, decided whether to quit early @@ -56,33 +56,25 @@ func WaitForHeight(c StatusClient, h int, waiter Waiter) error { // when the timeout duration has expired. // // This handles subscribing and unsubscribing under the hood -func WaitForOneEvent(evsw types.EventSwitch, - evtTyp string, timeout time.Duration) (types.TMEventData, error) { - listener := cmn.RandStr(12) - - evts, quit := make(chan events.EventData, 10), make(chan bool, 1) - // start timeout count-down - go func() { - time.Sleep(timeout) - quit <- true - }() +func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + evts := make(chan interface{}, 1) // register for the next event of this type - evsw.AddListenerForEvent(listener, evtTyp, func(data events.EventData) { - evts <- data - }) + query := fmt.Sprintf("%s='%s'", types.EventTypeKey, evtTyp) + err := c.Subscribe(ctx, query, evts) + if err != nil { + return types.TMEventData{}, errors.Wrap(err, "failed to subscribe") + } + // make sure to unregister after the test is over - defer evsw.RemoveListenerForEvent(evtTyp, listener) - // defer evsw.RemoveListener(listener) // this also works + defer c.Unsubscribe(ctx, query) select { - case <-quit: - return types.TMEventData{}, errors.New("timed out waiting for event") case evt := <-evts: - tmevt, ok := evt.(types.TMEventData) - if ok { - return tmevt, nil - } - return types.TMEventData{}, errors.Errorf("Got unexpected event type: %#v", evt) + return evt.(types.TMEventData), nil + case <-ctx.Done(): + return types.TMEventData{}, errors.New("timed out waiting for event") } } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index e63fcd4ba..82fdded41 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "github.com/pkg/errors" @@ -11,7 +12,7 @@ import ( ctypes "github.com/tendermint/tendermint/rpc/core/types" rpcclient "github.com/tendermint/tendermint/rpc/lib/client" "github.com/tendermint/tendermint/types" - events "github.com/tendermint/tmlibs/events" + cmn "github.com/tendermint/tmlibs/common" ) /* @@ -40,10 +41,9 @@ func NewHTTP(remote, wsEndpoint string) *HTTP { } var ( - _ Client = (*HTTP)(nil) - _ NetworkClient = (*HTTP)(nil) - _ types.EventSwitch = (*HTTP)(nil) - _ types.EventSwitch = (*WSEvents)(nil) + _ Client = (*HTTP)(nil) + _ NetworkClient = (*HTTP)(nil) + _ EventsClient = (*HTTP)(nil) ) func (c *HTTP) Status() (*ctypes.ResultStatus, error) { @@ -186,128 +186,114 @@ func (c *HTTP) Validators(height *int) (*ctypes.ResultValidators, error) { /** websocket event stuff here... **/ type WSEvents struct { - types.EventSwitch + cmn.BaseService remote string endpoint string ws *rpcclient.WSClient + subscriptions map[string]chan<- interface{} + mtx sync.RWMutex + // used for signaling the goroutine that feeds ws -> EventSwitch quit chan bool done chan bool - - // used to maintain counts of actively listened events - // so we can properly subscribe/unsubscribe - // FIXME: thread-safety??? - // FIXME: reuse code from tmlibs/events??? - evtCount map[string]int // count how many time each event is subscribed - listeners map[string][]string // keep track of which events each listener is listening to } func newWSEvents(remote, endpoint string) *WSEvents { - return &WSEvents{ - EventSwitch: types.NewEventSwitch(), - endpoint: endpoint, - remote: remote, - quit: make(chan bool, 1), - done: make(chan bool, 1), - evtCount: map[string]int{}, - listeners: map[string][]string{}, + wsEvents := &WSEvents{ + endpoint: endpoint, + remote: remote, + quit: make(chan bool, 1), + done: make(chan bool, 1), + subscriptions: make(map[string]chan<- interface{}), } + + wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents) + return wsEvents } // Start is the only way I could think the extend OnStart from // events.eventSwitch. If only it wasn't private... // BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start func (w *WSEvents) Start() (bool, error) { - st, err := w.EventSwitch.Start() - // if we did start, then OnStart here... - if st && err == nil { - ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() { - w.redoSubscriptions() - })) - _, err = ws.Start() - if err == nil { - w.ws = ws - go w.eventListener() - } + ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() { + w.redoSubscriptions() + })) + started, err := ws.Start() + if err == nil { + w.ws = ws + go w.eventListener() } - return st, errors.Wrap(err, "StartWSEvent") + return started, errors.Wrap(err, "StartWSEvent") } // Stop wraps the BaseService/eventSwitch actions as Start does func (w *WSEvents) Stop() bool { - stop := w.EventSwitch.Stop() - if stop { - // send a message to quit to stop the eventListener - w.quit <- true - <-w.done - w.ws.Stop() - w.ws = nil - } - return stop + // send a message to quit to stop the eventListener + w.quit <- true + <-w.done + w.ws.Stop() + w.ws = nil + return true } -/** TODO: more intelligent subscriptions! **/ -func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) { - // no one listening -> subscribe - if w.evtCount[event] == 0 { - w.subscribe(event) +func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { + w.mtx.RLock() + if _, ok := w.subscriptions[query]; ok { + return errors.New("already subscribed") } - // if this listener was already listening to this event, return early - for _, s := range w.listeners[listenerID] { - if event == s { - return - } + w.mtx.RUnlock() + + err := w.ws.Subscribe(ctx, query) + if err != nil { + return errors.Wrap(err, "failed to subscribe") } - // otherwise, add this event to this listener - w.evtCount[event] += 1 - w.listeners[listenerID] = append(w.listeners[listenerID], event) - w.EventSwitch.AddListenerForEvent(listenerID, event, cb) + + w.mtx.Lock() + w.subscriptions[query] = out + w.mtx.Unlock() + + return nil } -func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) { - // if this listener is listening already, splice it out - found := false - l := w.listeners[listenerID] - for i, s := range l { - if event == s { - found = true - w.listeners[listenerID] = append(l[:i], l[i+1:]...) - break - } - } - // if the listener wasn't already listening to the event, exit early - if !found { - return +func (w *WSEvents) Unsubscribe(ctx context.Context, query string) error { + err := w.ws.Unsubscribe(ctx, query) + if err != nil { + return err } - // now we can update the subscriptions - w.evtCount[event] -= 1 - if w.evtCount[event] == 0 { - w.unsubscribe(event) + w.mtx.Lock() + defer w.mtx.Unlock() + ch, ok := w.subscriptions[query] + if ok { + close(ch) + delete(w.subscriptions, query) } - w.EventSwitch.RemoveListenerForEvent(event, listenerID) + + return nil } -func (w *WSEvents) RemoveListener(listenerID string) { - // remove all counts for this listener - for _, s := range w.listeners[listenerID] { - w.evtCount[s] -= 1 - if w.evtCount[s] == 0 { - w.unsubscribe(s) - } +func (w *WSEvents) UnsubscribeAll(ctx context.Context) error { + err := w.ws.UnsubscribeAll(ctx) + if err != nil { + return err } - w.listeners[listenerID] = nil - // then let the switch do it's magic - w.EventSwitch.RemoveListener(listenerID) + w.mtx.Lock() + defer w.mtx.Unlock() + for _, ch := range w.subscriptions { + close(ch) + } + w.subscriptions = make(map[string]chan<- interface{}) + return nil } -// After being reconnected, it is necessary to redo subscription -// to server otherwise no data will be automatically received +// After being reconnected, it is necessary to redo subscription to server +// otherwise no data will be automatically received. func (w *WSEvents) redoSubscriptions() { - for event, _ := range w.evtCount { - w.subscribe(event) + for query, out := range w.subscriptions { + // NOTE: no timeout for reconnect + w.Subscribe(context.Background(), query, out) } } @@ -350,23 +336,10 @@ func (w *WSEvents) parseEvent(data []byte) (err error) { // TODO: ? return nil } - // looks good! let's fire this baby! - w.EventSwitch.FireEvent(result.Name, result.Data) - return nil -} - -// no way of exposing these failures, so we panic. -// is this right? or silently ignore??? -func (w *WSEvents) subscribe(event string) { - err := w.ws.Subscribe(context.TODO(), event) - if err != nil { - panic(err) - } -} - -func (w *WSEvents) unsubscribe(event string) { - err := w.ws.Unsubscribe(context.TODO(), event) - if err != nil { - panic(err) + w.mtx.RLock() + if ch, ok := w.subscriptions[result.Query]; ok { + ch <- result.Data } + w.mtx.RUnlock() + return nil } diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 10689a561..443ea89d2 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -20,9 +20,12 @@ implementation. package client import ( + "context" + data "github.com/tendermint/go-wire/data" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" ) // ABCIClient groups together the functionality that principally @@ -64,14 +67,12 @@ type StatusClient interface { // if you want to listen for events, test if it also // implements events.EventSwitch type Client interface { + cmn.Service ABCIClient SignClient HistoryClient StatusClient - - // this Client is reactive, you can subscribe to any TMEventData - // type, given the proper string. see tendermint/types/events.go - types.EventSwitch + EventsClient } // NetworkClient is general info about the network state. May not @@ -83,3 +84,11 @@ type NetworkClient interface { NetInfo() (*ctypes.ResultNetInfo, error) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) } + +// EventsClient is reactive, you can subscribe to any message, given the proper +// string. see tendermint/types/events.go +type EventsClient interface { + Subscribe(ctx context.Context, query string, out chan<- interface{}) error + Unsubscribe(ctx context.Context, query string) error + UnsubscribeAll(ctx context.Context) error +} diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index c6adfc5fb..1fea2afb9 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -1,22 +1,27 @@ package client import ( + "context" + + "github.com/pkg/errors" + data "github.com/tendermint/go-wire/data" nm "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" + tmquery "github.com/tendermint/tmlibs/pubsub/query" ) /* Local is a Client implementation that directly executes the rpc -functions on a given node, without going through HTTP or GRPC +functions on a given node, without going through HTTP or GRPC. This implementation is useful for: * Running tests against a node in-process without the overhead of going through an http server -* Communication between an ABCI app and tendermin core when they +* Communication between an ABCI app and Tendermint core when they are compiled in process. For real clients, you probably want to use client.HTTP. For more @@ -24,7 +29,9 @@ powerful control during testing, you probably want the "client/mock" package. */ type Local struct { node *nm.Node - types.EventSwitch + + *types.EventBus + subscriptions map[string]*tmquery.Query } // NewLocal configures a client that calls the Node directly. @@ -33,24 +40,26 @@ type Local struct { // you can only have one node per process. So make sure test cases // don't run in parallel, or try to simulate an entire network in // one process... -func NewLocal(node *nm.Node) Local { +func NewLocal(node *nm.Node) *Local { node.ConfigureRPC() - return Local{ - node: node, - EventSwitch: node.EventSwitch(), + return &Local{ + node: node, + EventBus: node.EventBus(), + subscriptions: make(map[string]*tmquery.Query), } } var ( - _ Client = Local{} + _ Client = (*Local)(nil) _ NetworkClient = Local{} + _ EventsClient = (*Local)(nil) ) -func (c Local) Status() (*ctypes.ResultStatus, error) { +func (Local) Status() (*ctypes.ResultStatus, error) { return core.Status() } -func (c Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) { +func (Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) { return core.ABCIInfo() } @@ -62,50 +71,82 @@ func (c Local) ABCIQueryWithOptions(path string, data data.Bytes, opts ABCIQuery return core.ABCIQuery(path, data, opts.Height, opts.Trusted) } -func (c Local) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { +func (Local) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { return core.BroadcastTxCommit(tx) } -func (c Local) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func (Local) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { return core.BroadcastTxAsync(tx) } -func (c Local) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func (Local) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { return core.BroadcastTxSync(tx) } -func (c Local) NetInfo() (*ctypes.ResultNetInfo, error) { +func (Local) NetInfo() (*ctypes.ResultNetInfo, error) { return core.NetInfo() } -func (c Local) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { +func (Local) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { return core.DumpConsensusState() } -func (c Local) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { +func (Local) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { return core.UnsafeDialSeeds(seeds) } -func (c Local) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) { +func (Local) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) { return core.BlockchainInfo(minHeight, maxHeight) } -func (c Local) Genesis() (*ctypes.ResultGenesis, error) { +func (Local) Genesis() (*ctypes.ResultGenesis, error) { return core.Genesis() } -func (c Local) Block(height *int) (*ctypes.ResultBlock, error) { +func (Local) Block(height *int) (*ctypes.ResultBlock, error) { return core.Block(height) } -func (c Local) Commit(height *int) (*ctypes.ResultCommit, error) { +func (Local) Commit(height *int) (*ctypes.ResultCommit, error) { return core.Commit(height) } -func (c Local) Validators(height *int) (*ctypes.ResultValidators, error) { +func (Local) Validators(height *int) (*ctypes.ResultValidators, error) { return core.Validators(height) } -func (c Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { +func (Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { return core.Tx(hash, prove) } + +func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { + q, err := tmquery.New(query) + if err != nil { + return errors.Wrap(err, "failed to subscribe") + } + if err = c.EventBus.Subscribe(ctx, "rpclocalclient", q, out); err != nil { + return errors.Wrap(err, "failed to subscribe") + } + c.subscriptions[query] = q + return nil +} + +func (c *Local) Unsubscribe(ctx context.Context, query string) error { + q, ok := c.subscriptions[query] + if !ok { + return errors.New("subscription not found") + } + if err := c.EventBus.Unsubscribe(ctx, "rpclocalclient", q); err != nil { + return errors.Wrap(err, "failed to unsubscribe") + } + delete(c.subscriptions, query) + return nil +} + +func (c *Local) UnsubscribeAll(ctx context.Context) error { + if err := c.EventBus.UnsubscribeAll(ctx, "rpclocalclient"); err != nil { + return errors.Wrap(err, "failed to unsubscribe") + } + c.subscriptions = make(map[string]*tmquery.Query) + return nil +} diff --git a/rpc/client/mock/client.go b/rpc/client/mock/client.go index b59734749..7fc452068 100644 --- a/rpc/client/mock/client.go +++ b/rpc/client/mock/client.go @@ -21,6 +21,7 @@ import ( "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" ) // Client wraps arbitrary implementations of the various interfaces. @@ -33,8 +34,8 @@ type Client struct { client.SignClient client.HistoryClient client.StatusClient - // create a mock with types.NewEventSwitch() - types.EventSwitch + client.EventsClient + cmn.Service } var _ client.Client = Client{} diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index d329a1200..f2626f84e 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -18,7 +18,7 @@ func getHTTPClient() *client.HTTP { return client.NewHTTP(rpcAddr, "/websocket") } -func getLocalClient() client.Local { +func getLocalClient() *client.Local { return client.NewLocal(node) } diff --git a/rpc/core/events.go b/rpc/core/events.go index 00fd9a08c..e9d544413 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -1,9 +1,15 @@ package core import ( + "context" + "time" + + "github.com/pkg/errors" + ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/lib/types" - "github.com/tendermint/tendermint/types" + tmtypes "github.com/tendermint/tendermint/types" + tmquery "github.com/tendermint/tmlibs/pubsub/query" ) // Subscribe for events via WebSocket. @@ -33,14 +39,32 @@ import ( // | event | string | "" | true | Event name | // // -func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { - logger.Info("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - types.AddListenerForEvent(wsCtx.GetEventSwitch(), wsCtx.GetRemoteAddr(), event, func(msg types.TMEventData) { - // NOTE: EventSwitch callbacks must be nonblocking - // NOTE: RPCResponses of subscribed events have id suffix "#event" - tmResult := &ctypes.ResultEvent{event, msg} - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Request.ID+"#event", tmResult)) - }) +func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) { + addr := wsCtx.GetRemoteAddr() + + logger.Info("Subscribe to query", "remote", addr, "query", query) + q, err := tmquery.New(query) + if err != nil { + return nil, errors.Wrap(err, "failed to parse a query") + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + ch := make(chan interface{}) + err = eventBus.Subscribe(ctx, addr, q, ch) + if err != nil { + return nil, errors.Wrap(err, "failed to subscribe") + } + + wsCtx.AddSubscription(query, q) + + go func() { + for event := range ch { + tmResult := &ctypes.ResultEvent{query, event.(tmtypes.TMEventData)} + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Request.ID+"#event", tmResult)) + } + }() + return &ctypes.ResultSubscribe{}, nil } @@ -71,8 +95,21 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri // | event | string | "" | true | Event name | // // -func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { - logger.Info("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().RemoveListenerForEvent(event, wsCtx.GetRemoteAddr()) +func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error) { + addr := wsCtx.GetRemoteAddr() + logger.Info("Unsubscribe from query", "remote", addr, "query", query) + q, ok := wsCtx.DeleteSubscription(query) + if !ok { + return nil, errors.New("subscription not found") + } + eventBus.Unsubscribe(context.Background(), addr, q.(*tmquery.Query)) + return &ctypes.ResultUnsubscribe{}, nil +} + +func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) { + addr := wsCtx.GetRemoteAddr() + logger.Info("Unsubscribe from all", "remote", addr) + eventBus.UnsubscribeAll(context.Background(), addr) + wsCtx.DeleteAllSubscriptions() return &ctypes.ResultUnsubscribe{}, nil } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 94fc0efce..649f701b4 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -1,9 +1,12 @@ package core import ( + "context" "fmt" "time" + "github.com/pkg/errors" + abci "github.com/tendermint/abci/types" data "github.com/tendermint/go-wire/data" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -147,20 +150,26 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { - // subscribe to tx being committed in block - deliverTxResCh := make(chan types.EventDataTx, 1) - types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) { - deliverTxResCh <- data.Unwrap().(types.EventDataTx) - }) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + deliverTxResCh := make(chan interface{}) + q := types.EventQueryTx(tx) + err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh) + if err != nil { + err = errors.Wrap(err, "failed to subscribe to tx") + logger.Error("Error broadcasting transaction", "err", err) + return nil, fmt.Errorf("Error broadcasting transaction: %v", err) + } + defer eventBus.Unsubscribe(context.Background(), "mempool", q) // broadcast the tx and register checktx callback checkTxResCh := make(chan *abci.Response, 1) - err := mempool.CheckTx(tx, func(res *abci.Response) { + err = mempool.CheckTx(tx, func(res *abci.Response) { checkTxResCh <- res }) if err != nil { - logger.Error("err", "err", err) + logger.Error("Error broadcasting transaction", "err", err) return nil, fmt.Errorf("Error broadcasting transaction: %v", err) } checkTxRes := <-checkTxResCh @@ -179,7 +188,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // TODO: configurable? timer := time.NewTimer(60 * 2 * time.Second) select { - case deliverTxRes := <-deliverTxResCh: + case deliverTxResMsg := <-deliverTxResCh: + deliverTxRes := deliverTxResMsg.(types.TMEventData).Unwrap().(types.EventDataTx) // The tx was included in a block. deliverTxR := &abci.ResponseDeliverTx{ Code: deliverTxRes.Code, diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 20141cb91..bee59e1c0 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -36,7 +36,6 @@ type P2P interface { var ( // external, thread safe interfaces - eventSwitch types.EventSwitch proxyAppQuery proxy.AppConnQuery // interfaces defined in types and above @@ -51,14 +50,11 @@ var ( addrBook *p2p.AddrBook txIndexer txindex.TxIndexer consensusReactor *consensus.ConsensusReactor + eventBus *types.EventBus logger log.Logger ) -func SetEventSwitch(evsw types.EventSwitch) { - eventSwitch = evsw -} - func SetBlockStore(bs types.BlockStore) { blockStore = bs } @@ -102,3 +98,7 @@ func SetConsensusReactor(conR *consensus.ConsensusReactor) { func SetLogger(l log.Logger) { logger = l } + +func SetEventBus(b *types.EventBus) { + eventBus = b +} diff --git a/rpc/core/routes.go b/rpc/core/routes.go index b1dbd3785..a4328f1d2 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -7,8 +7,9 @@ import ( // TODO: better system than "unsafe" prefix var Routes = map[string]*rpc.RPCFunc{ // subscribe/unsubscribe are reserved for websocket events. - "subscribe": rpc.NewWSRPCFunc(Subscribe, "event"), - "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "event"), + "subscribe": rpc.NewWSRPCFunc(Subscribe, "query"), + "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"), + "unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""), // info API "status": rpc.NewRPCFunc(Status, ""), diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 874e351d3..8aa904fe5 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -140,6 +140,6 @@ type ResultSubscribe struct{} type ResultUnsubscribe struct{} type ResultEvent struct { - Name string `json:"name"` - Data types.TMEventData `json:"data"` + Query string `json:"query"` + Data types.TMEventData `json:"data"` } diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 6e9242907..bfe2272e4 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -449,17 +449,17 @@ func (c *WSClient) readRoutine() { /////////////////////////////////////////////////////////////////////////////// // Predefined methods -// Subscribe to an event. Note the server must have a "subscribe" route +// Subscribe to a query. Note the server must have a "subscribe" route // defined. -func (c *WSClient) Subscribe(ctx context.Context, eventType string) error { - params := map[string]interface{}{"event": eventType} +func (c *WSClient) Subscribe(ctx context.Context, query string) error { + params := map[string]interface{}{"query": query} return c.Call(ctx, "subscribe", params) } -// Unsubscribe from an event. Note the server must have a "unsubscribe" route +// Unsubscribe from a query. Note the server must have a "unsubscribe" route // defined. -func (c *WSClient) Unsubscribe(ctx context.Context, eventType string) error { - params := map[string]interface{}{"event": eventType} +func (c *WSClient) Unsubscribe(ctx context.Context, query string) error { + params := map[string]interface{}{"query": query} return c.Call(ctx, "unsubscribe", params) } diff --git a/rpc/lib/doc.go b/rpc/lib/doc.go index 0ea4e5c65..2bc438593 100644 --- a/rpc/lib/doc.go +++ b/rpc/lib/doc.go @@ -77,7 +77,7 @@ Now start the server: ``` mux := http.NewServeMux() rpcserver.RegisterRPCFuncs(mux, Routes) -wm := rpcserver.NewWebsocketManager(Routes, nil) +wm := rpcserver.NewWebsocketManager(Routes) mux.HandleFunc("/websocket", wm.WebsocketHandler) logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) go func() { diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index 2ec3014d5..aa7319026 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -114,7 +114,7 @@ func setup() { tcpLogger := logger.With("socket", "tcp") mux := http.NewServeMux() server.RegisterRPCFuncs(mux, Routes, tcpLogger) - wm := server.NewWebsocketManager(Routes, nil, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second)) + wm := server.NewWebsocketManager(Routes, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second)) wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) go func() { @@ -127,7 +127,7 @@ func setup() { unixLogger := logger.With("socket", "unix") mux2 := http.NewServeMux() server.RegisterRPCFuncs(mux2, Routes, unixLogger) - wm = server.NewWebsocketManager(Routes, nil) + wm = server.NewWebsocketManager(Routes) wm.SetLogger(unixLogger) mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) go func() { diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 3a3c48f02..283be1823 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -18,7 +18,6 @@ import ( types "github.com/tendermint/tendermint/rpc/lib/types" cmn "github.com/tendermint/tmlibs/common" - events "github.com/tendermint/tmlibs/events" "github.com/tendermint/tmlibs/log" ) @@ -361,7 +360,8 @@ type wsConnection struct { writeChan chan types.RPCResponse funcMap map[string]*RPCFunc - evsw events.EventSwitch + + subscriptions map[string]interface{} // write channel capacity writeChanCapacity int @@ -381,12 +381,12 @@ type wsConnection struct { // ping period and pong wait time. // NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect. // see https://github.com/gorilla/websocket/issues/97 -func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection { +func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, options ...func(*wsConnection)) *wsConnection { wsc := &wsConnection{ remoteAddr: baseConn.RemoteAddr().String(), baseConn: baseConn, funcMap: funcMap, - evsw: evsw, + subscriptions: make(map[string]interface{}), writeWait: defaultWSWriteWait, writeChanCapacity: defaultWSWriteChanCapacity, readWait: defaultWSReadWait, @@ -445,9 +445,6 @@ func (wsc *wsConnection) OnStart() error { // OnStop unsubscribes from all events. func (wsc *wsConnection) OnStop() { - if wsc.evsw != nil { - wsc.evsw.RemoveListener(wsc.remoteAddr) - } // Both read and write loops close the websocket connection when they exit their loops. // The writeChan is never closed, to allow WriteRPCResponse() to fail. } @@ -458,12 +455,6 @@ func (wsc *wsConnection) GetRemoteAddr() string { return wsc.remoteAddr } -// GetEventSwitch returns the event switch. -// It implements WSRPCConnection -func (wsc *wsConnection) GetEventSwitch() events.EventSwitch { - return wsc.evsw -} - // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. // It implements WSRPCConnection. It is Goroutine-safe. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { @@ -487,6 +478,23 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { } } +func (wsc *wsConnection) AddSubscription(query string, data interface{}) { + wsc.subscriptions[query] = data +} + +func (wsc *wsConnection) DeleteSubscription(query string) (interface{}, bool) { + data, ok := wsc.subscriptions[query] + if ok { + delete(wsc.subscriptions, query) + return data, true + } + return nil, false +} + +func (wsc *wsConnection) DeleteAllSubscriptions() { + wsc.subscriptions = make(map[string]interface{}) +} + // Read from the socket and subscribe to or unsubscribe from events func (wsc *wsConnection) readRoutine() { defer func() { @@ -644,17 +652,16 @@ func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error type WebsocketManager struct { websocket.Upgrader funcMap map[string]*RPCFunc - evsw events.EventSwitch logger log.Logger wsConnOptions []func(*wsConnection) } -// NewWebsocketManager returns a new WebsocketManager that routes according to the given funcMap, listens on the given event switch, -// and connects to the server with the given connection options. -func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager { +// NewWebsocketManager returns a new WebsocketManager that routes according to +// the given funcMap and connects to the server with the given connection +// options. +func NewWebsocketManager(funcMap map[string]*RPCFunc, wsConnOptions ...func(*wsConnection)) *WebsocketManager { return &WebsocketManager{ funcMap: funcMap, - evsw: evsw, Upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { // TODO ??? @@ -681,7 +688,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ } // register connection - con := NewWSConnection(wsConn, wm.funcMap, wm.evsw, wm.wsConnOptions...) + con := NewWSConnection(wsConn, wm.funcMap, wm.wsConnOptions...) con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr())) wm.logger.Info("New websocket connection", "remote", con.remoteAddr) con.Start() // Blocking diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index 86f9264dd..5a3c9171c 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -6,7 +6,6 @@ import ( "strings" "github.com/pkg/errors" - events "github.com/tendermint/tmlibs/events" ) //---------------------------------------- @@ -135,9 +134,12 @@ func RPCServerError(id string, err error) RPCResponse { // *wsConnection implements this interface. type WSRPCConnection interface { GetRemoteAddr() string - GetEventSwitch() events.EventSwitch WriteRPCResponse(resp RPCResponse) TryWriteRPCResponse(resp RPCResponse) bool + + AddSubscription(string, interface{}) + DeleteSubscription(string) (interface{}, bool) + DeleteAllSubscriptions() } // websocket-only RPCFuncs take this as the first parameter. diff --git a/state/execution.go b/state/execution.go index b917bfbe8..76205d0fd 100644 --- a/state/execution.go +++ b/state/execution.go @@ -20,14 +20,14 @@ import ( // ValExecBlock executes the block, but does NOT mutate State. // + validates the block // + executes block.Txs on the proxyAppConn -func (s *State) ValExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { +func (s *State) ValExecBlock(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { // Validate the block. if err := s.validateBlock(block); err != nil { return nil, ErrInvalidBlock(err) } // Execute the block txs - abciResponses, err := execBlockOnProxyApp(eventCache, proxyAppConn, block, s.logger) + abciResponses, err := execBlockOnProxyApp(txEventPublisher, proxyAppConn, block, s.logger) if err != nil { // There was some error in proxyApp // TODO Report error and wait for proxyApp to be available. @@ -40,7 +40,7 @@ func (s *State) ValExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppCo // Executes block's transactions on proxyAppConn. // Returns a list of transaction results and updates to the validator set // TODO: Generate a bitmap or otherwise store tx validity in state. -func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, logger log.Logger) (*ABCIResponses, error) { +func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, block *types.Block, logger log.Logger) (*ABCIResponses, error) { var validTxs, invalidTxs = 0, 0 txIndex := 0 @@ -77,7 +77,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo Log: txResult.Log, Error: txError, } - types.FireEventTx(eventCache, event) + txEventPublisher.PublishEventTx(event) } } proxyAppConn.SetResponseCallback(proxyCb) @@ -213,10 +213,10 @@ func (s *State) validateBlock(block *types.Block) error { // ApplyBlock validates the block against the state, executes it against the app, // commits it, and saves the block and state. It's the only function that needs to be called // from outside this package to process and commit an entire block. -func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, +func (s *State) ApplyBlock(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error { - abciResponses, err := s.ValExecBlock(eventCache, proxyAppConn, block) + abciResponses, err := s.ValExecBlock(txEventPublisher, proxyAppConn, block) if err != nil { return fmt.Errorf("Exec failed for application: %v", err) } @@ -295,8 +295,7 @@ func (s *State) indexTxs(abciResponses *ABCIResponses) { // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. // It returns the application root hash (result of abci.Commit). func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) { - var eventCache types.Fireable // nil - _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block, logger) + _, err := execBlockOnProxyApp(types.NopEventBus{}, appConnConsensus, block, logger) if err != nil { logger.Error("Error executing block on proxy app", "height", block.Height, "err", err) return nil, err diff --git a/state/execution_test.go b/state/execution_test.go index 425f59ed2..8fcdcf1c5 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -37,7 +37,7 @@ func TestApplyBlock(t *testing.T) { // make block block := makeBlock(1, state) - err = state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), types.MockMempool{}) + err = state.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), types.MockMempool{}) require.Nil(t, err) assert.Equal(t, nTxsPerBlock, indexer.Indexed) // test indexing works diff --git a/types/event_buffer.go b/types/event_buffer.go new file mode 100644 index 000000000..84f855372 --- /dev/null +++ b/types/event_buffer.go @@ -0,0 +1,46 @@ +package types + +// Interface assertions +var _ TxEventPublisher = (*TxEventBuffer)(nil) + +// TxEventBuffer is a buffer of events, which uses a slice to temporary store +// events. +type TxEventBuffer struct { + next TxEventPublisher + capacity int + events []EventDataTx +} + +// NewTxEventBuffer accepts an EventBus and returns a new buffer with the given +// capacity. +func NewTxEventBuffer(next TxEventPublisher, capacity int) *TxEventBuffer { + return &TxEventBuffer{ + next: next, + capacity: capacity, + events: make([]EventDataTx, 0, capacity), + } +} + +// Len returns the number of events cached. +func (b TxEventBuffer) Len() int { + return len(b.events) +} + +// PublishEventTx buffers an event to be fired upon finality. +func (b *TxEventBuffer) PublishEventTx(e EventDataTx) error { + b.events = append(b.events, e) + return nil +} + +// Flush publishes events by running next.PublishWithTags on all cached events. +// Blocks. Clears cached events. +func (b *TxEventBuffer) Flush() error { + for _, e := range b.events { + err := b.next.PublishEventTx(e) + if err != nil { + return err + } + } + b.events = make([]EventDataTx, 0, b.capacity) + return nil +} diff --git a/types/event_buffer_test.go b/types/event_buffer_test.go new file mode 100644 index 000000000..74ae9da29 --- /dev/null +++ b/types/event_buffer_test.go @@ -0,0 +1,21 @@ +package types + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type eventBusMock struct{} + +func (eventBusMock) PublishEventTx(e EventDataTx) error { + return nil +} + +func TestEventBuffer(t *testing.T) { + b := NewTxEventBuffer(eventBusMock{}, 1) + b.PublishEventTx(EventDataTx{}) + assert.Equal(t, 1, b.Len()) + b.Flush() + assert.Equal(t, 0, b.Len()) +} diff --git a/types/event_bus.go b/types/event_bus.go new file mode 100644 index 000000000..762f1af61 --- /dev/null +++ b/types/event_bus.go @@ -0,0 +1,133 @@ +package types + +import ( + "context" + "fmt" + + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" + tmpubsub "github.com/tendermint/tmlibs/pubsub" +) + +const defaultCapacity = 1000 + +// EventBus is a common bus for all events going through the system. All calls +// are proxied to underlying pubsub server. All events must be published using +// EventBus to ensure correct data types. +type EventBus struct { + cmn.BaseService + pubsub *tmpubsub.Server +} + +// NewEventBus returns a new event bus. +func NewEventBus() *EventBus { + return NewEventBusWithBufferCapacity(defaultCapacity) +} + +// NewEventBusWithBufferCapacity returns a new event bus with the given buffer capacity. +func NewEventBusWithBufferCapacity(cap int) *EventBus { + // capacity could be exposed later if needed + pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(cap)) + b := &EventBus{pubsub: pubsub} + b.BaseService = *cmn.NewBaseService(nil, "EventBus", b) + return b +} + +func (b *EventBus) SetLogger(l log.Logger) { + b.BaseService.SetLogger(l) + b.pubsub.SetLogger(l.With("module", "pubsub")) +} + +func (b *EventBus) OnStart() error { + return b.pubsub.OnStart() +} + +func (b *EventBus) OnStop() { + b.pubsub.OnStop() +} + +func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error { + return b.pubsub.Subscribe(ctx, subscriber, query, out) +} + +func (b *EventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { + return b.pubsub.Unsubscribe(ctx, subscriber, query) +} + +func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error { + return b.pubsub.UnsubscribeAll(ctx, subscriber) +} + +func (b *EventBus) publish(eventType string, eventData TMEventData) error { + if b.pubsub != nil { + // no explicit deadline for publishing events + ctx := context.Background() + b.pubsub.PublishWithTags(ctx, eventData, map[string]interface{}{EventTypeKey: eventType}) + } + return nil +} + +//--- block, tx, and vote events + +func (b *EventBus) PublishEventNewBlock(block EventDataNewBlock) error { + return b.publish(EventNewBlock, TMEventData{block}) +} + +func (b *EventBus) PublishEventNewBlockHeader(header EventDataNewBlockHeader) error { + return b.publish(EventNewBlockHeader, TMEventData{header}) +} + +func (b *EventBus) PublishEventVote(vote EventDataVote) error { + return b.publish(EventVote, TMEventData{vote}) +} + +func (b *EventBus) PublishEventTx(tx EventDataTx) error { + if b.pubsub != nil { + // no explicit deadline for publishing events + ctx := context.Background() + b.pubsub.PublishWithTags(ctx, TMEventData{tx}, map[string]interface{}{EventTypeKey: EventTx, TxHashKey: fmt.Sprintf("%X", tx.Tx.Hash())}) + } + return nil +} + +//--- EventDataRoundState events + +func (b *EventBus) PublishEventNewRoundStep(rs EventDataRoundState) error { + return b.publish(EventNewRoundStep, TMEventData{rs}) +} + +func (b *EventBus) PublishEventTimeoutPropose(rs EventDataRoundState) error { + return b.publish(EventTimeoutPropose, TMEventData{rs}) +} + +func (b *EventBus) PublishEventTimeoutWait(rs EventDataRoundState) error { + return b.publish(EventTimeoutWait, TMEventData{rs}) +} + +func (b *EventBus) PublishEventNewRound(rs EventDataRoundState) error { + return b.publish(EventNewRound, TMEventData{rs}) +} + +func (b *EventBus) PublishEventCompleteProposal(rs EventDataRoundState) error { + return b.publish(EventCompleteProposal, TMEventData{rs}) +} + +func (b *EventBus) PublishEventPolka(rs EventDataRoundState) error { + return b.publish(EventPolka, TMEventData{rs}) +} + +func (b *EventBus) PublishEventUnlock(rs EventDataRoundState) error { + return b.publish(EventUnlock, TMEventData{rs}) +} + +func (b *EventBus) PublishEventRelock(rs EventDataRoundState) error { + return b.publish(EventRelock, TMEventData{rs}) +} + +func (b *EventBus) PublishEventLock(rs EventDataRoundState) error { + return b.publish(EventLock, TMEventData{rs}) +} + +func (b *EventBus) PublishEventProposalHeartbeat(ph EventDataProposalHeartbeat) error { + return b.publish(EventProposalHeartbeat, TMEventData{ph}) +} diff --git a/types/event_bus_test.go b/types/event_bus_test.go new file mode 100644 index 000000000..4c10fc219 --- /dev/null +++ b/types/event_bus_test.go @@ -0,0 +1,122 @@ +package types + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + tmpubsub "github.com/tendermint/tmlibs/pubsub" +) + +func BenchmarkEventBus(b *testing.B) { + benchmarks := []struct { + name string + numClients int + randQueries bool + randEvents bool + }{ + {"10Clients1Query1Event", 10, false, false}, + {"100Clients", 100, false, false}, + {"1000Clients", 1000, false, false}, + + {"10ClientsRandQueries1Event", 10, true, false}, + {"100Clients", 100, true, false}, + {"1000Clients", 1000, true, false}, + + {"10ClientsRandQueriesRandEvents", 10, true, true}, + {"100Clients", 100, true, true}, + {"1000Clients", 1000, true, true}, + + {"10Clients1QueryRandEvents", 10, false, true}, + {"100Clients", 100, false, true}, + {"1000Clients", 1000, false, true}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + benchmarkEventBus(bm.numClients, bm.randQueries, bm.randEvents, b) + }) + } +} + +func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *testing.B) { + // for random* functions + rand.Seed(time.Now().Unix()) + + eventBus := NewEventBusWithBufferCapacity(0) // set buffer capacity to 0 so we are not testing cache + eventBus.Start() + defer eventBus.Stop() + + ctx := context.Background() + q := EventQueryNewBlock + + for i := 0; i < numClients; i++ { + ch := make(chan interface{}) + go func() { + for range ch { + } + }() + if randQueries { + q = randQuery() + } + eventBus.Subscribe(ctx, fmt.Sprintf("client-%d", i), q, ch) + } + + eventType := EventNewBlock + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if randEvents { + eventType = randEvent() + } + + eventBus.publish(eventType, TMEventData{"Gamora"}) + } +} + +var events = []string{EventBond, + EventUnbond, + EventRebond, + EventDupeout, + EventFork, + EventNewBlock, + EventNewBlockHeader, + EventNewRound, + EventNewRoundStep, + EventTimeoutPropose, + EventCompleteProposal, + EventPolka, + EventUnlock, + EventLock, + EventRelock, + EventTimeoutWait, + EventVote} + +func randEvent() string { + return events[rand.Intn(len(events))] +} + +var queries = []tmpubsub.Query{EventQueryBond, + EventQueryUnbond, + EventQueryRebond, + EventQueryDupeout, + EventQueryFork, + EventQueryNewBlock, + EventQueryNewBlockHeader, + EventQueryNewRound, + EventQueryNewRoundStep, + EventQueryTimeoutPropose, + EventQueryCompleteProposal, + EventQueryPolka, + EventQueryUnlock, + EventQueryLock, + EventQueryRelock, + EventQueryTimeoutWait, + EventQueryVote} + +func randQuery() tmpubsub.Query { + return queries[rand.Intn(len(queries))] +} diff --git a/types/events.go b/types/events.go index 79e17fe0f..57851af4a 100644 --- a/types/events.go +++ b/types/events.go @@ -1,39 +1,40 @@ package types import ( - // for registering TMEventData as events.EventData + "fmt" + abci "github.com/tendermint/abci/types" "github.com/tendermint/go-wire/data" - cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/events" + tmpubsub "github.com/tendermint/tmlibs/pubsub" + tmquery "github.com/tendermint/tmlibs/pubsub/query" +) + +// Reserved event types +const ( + EventBond = "Bond" + EventCompleteProposal = "CompleteProposal" + EventDupeout = "Dupeout" + EventFork = "Fork" + EventLock = "Lock" + EventNewBlock = "NewBlock" + EventNewBlockHeader = "NewBlockHeader" + EventNewRound = "NewRound" + EventNewRoundStep = "NewRoundStep" + EventPolka = "Polka" + EventRebond = "Rebond" + EventRelock = "Relock" + EventTimeoutPropose = "TimeoutPropose" + EventTimeoutWait = "TimeoutWait" + EventTx = "Tx" + EventUnbond = "Unbond" + EventUnlock = "Unlock" + EventVote = "Vote" + EventProposalHeartbeat = "ProposalHeartbeat" ) -// Functions to generate eventId strings - -// Reserved -func EventStringBond() string { return "Bond" } -func EventStringUnbond() string { return "Unbond" } -func EventStringRebond() string { return "Rebond" } -func EventStringDupeout() string { return "Dupeout" } -func EventStringFork() string { return "Fork" } -func EventStringTx(tx Tx) string { return cmn.Fmt("Tx:%X", tx.Hash()) } - -func EventStringNewBlock() string { return "NewBlock" } -func EventStringNewBlockHeader() string { return "NewBlockHeader" } -func EventStringNewRound() string { return "NewRound" } -func EventStringNewRoundStep() string { return "NewRoundStep" } -func EventStringTimeoutPropose() string { return "TimeoutPropose" } -func EventStringCompleteProposal() string { return "CompleteProposal" } -func EventStringPolka() string { return "Polka" } -func EventStringUnlock() string { return "Unlock" } -func EventStringLock() string { return "Lock" } -func EventStringRelock() string { return "Relock" } -func EventStringTimeoutWait() string { return "TimeoutWait" } -func EventStringVote() string { return "Vote" } - -func EventStringProposalHeartbeat() string { return "ProposalHeartbeat" } - -//---------------------------------------- +/////////////////////////////////////////////////////////////////////////////// +// ENCODING / DECODING +/////////////////////////////////////////////////////////////////////////////// var ( EventDataNameNewBlock = "new_block" @@ -45,11 +46,9 @@ var ( EventDataNameProposalHeartbeat = "proposer_heartbeat" ) -//---------------------------------------- - // implements events.EventData type TMEventDataInner interface { - events.EventData + // empty interface } type TMEventData struct { @@ -140,112 +139,54 @@ type EventDataVote struct { Vote *Vote } -func (_ EventDataNewBlock) AssertIsTMEventData() {} -func (_ EventDataNewBlockHeader) AssertIsTMEventData() {} -func (_ EventDataTx) AssertIsTMEventData() {} -func (_ EventDataRoundState) AssertIsTMEventData() {} -func (_ EventDataVote) AssertIsTMEventData() {} - +func (_ EventDataNewBlock) AssertIsTMEventData() {} +func (_ EventDataNewBlockHeader) AssertIsTMEventData() {} +func (_ EventDataTx) AssertIsTMEventData() {} +func (_ EventDataRoundState) AssertIsTMEventData() {} +func (_ EventDataVote) AssertIsTMEventData() {} func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {} -//---------------------------------------- -// Wrappers for type safety - -type Fireable interface { - events.Fireable -} - -type Eventable interface { - SetEventSwitch(EventSwitch) -} - -type EventSwitch interface { - events.EventSwitch -} - -type EventCache interface { - Fireable - Flush() -} - -func NewEventSwitch() EventSwitch { - return events.NewEventSwitch() -} - -func NewEventCache(evsw EventSwitch) EventCache { - return events.NewEventCache(evsw) -} - -// All events should be based on this FireEvent to ensure they are TMEventData -func fireEvent(fireable events.Fireable, event string, data TMEventData) { - if fireable != nil { - fireable.FireEvent(event, data) - } -} - -func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEventData)) { - evsw.AddListenerForEvent(id, event, func(data events.EventData) { - cb(data.(TMEventData)) - }) - -} - -//--- block, tx, and vote events +/////////////////////////////////////////////////////////////////////////////// +// PUBSUB +/////////////////////////////////////////////////////////////////////////////// -func FireEventNewBlock(fireable events.Fireable, block EventDataNewBlock) { - fireEvent(fireable, EventStringNewBlock(), TMEventData{block}) -} - -func FireEventNewBlockHeader(fireable events.Fireable, header EventDataNewBlockHeader) { - fireEvent(fireable, EventStringNewBlockHeader(), TMEventData{header}) -} - -func FireEventVote(fireable events.Fireable, vote EventDataVote) { - fireEvent(fireable, EventStringVote(), TMEventData{vote}) -} - -func FireEventTx(fireable events.Fireable, tx EventDataTx) { - fireEvent(fireable, EventStringTx(tx.Tx), TMEventData{tx}) -} - -//--- EventDataRoundState events - -func FireEventNewRoundStep(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringNewRoundStep(), TMEventData{rs}) -} - -func FireEventTimeoutPropose(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringTimeoutPropose(), TMEventData{rs}) -} - -func FireEventTimeoutWait(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringTimeoutWait(), TMEventData{rs}) -} - -func FireEventNewRound(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringNewRound(), TMEventData{rs}) -} - -func FireEventCompleteProposal(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringCompleteProposal(), TMEventData{rs}) -} - -func FireEventPolka(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringPolka(), TMEventData{rs}) -} +const ( + // EventTypeKey is a reserved key, used to specify event type in tags. + EventTypeKey = "tm.events.type" + // TxHashKey is a reserved key, used to specify transaction's hash. + // see EventBus#PublishEventTx + TxHashKey = "tx.hash" +) -func FireEventUnlock(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringUnlock(), TMEventData{rs}) -} +var ( + EventQueryBond = queryForEvent(EventBond) + EventQueryUnbond = queryForEvent(EventUnbond) + EventQueryRebond = queryForEvent(EventRebond) + EventQueryDupeout = queryForEvent(EventDupeout) + EventQueryFork = queryForEvent(EventFork) + EventQueryNewBlock = queryForEvent(EventNewBlock) + EventQueryNewBlockHeader = queryForEvent(EventNewBlockHeader) + EventQueryNewRound = queryForEvent(EventNewRound) + EventQueryNewRoundStep = queryForEvent(EventNewRoundStep) + EventQueryTimeoutPropose = queryForEvent(EventTimeoutPropose) + EventQueryCompleteProposal = queryForEvent(EventCompleteProposal) + EventQueryPolka = queryForEvent(EventPolka) + EventQueryUnlock = queryForEvent(EventUnlock) + EventQueryLock = queryForEvent(EventLock) + EventQueryRelock = queryForEvent(EventRelock) + EventQueryTimeoutWait = queryForEvent(EventTimeoutWait) + EventQueryVote = queryForEvent(EventVote) + EventQueryProposalHeartbeat = queryForEvent(EventProposalHeartbeat) +) -func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringRelock(), TMEventData{rs}) +func EventQueryTx(tx Tx) tmpubsub.Query { + return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash())) } -func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringLock(), TMEventData{rs}) +func queryForEvent(eventType string) tmpubsub.Query { + return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType)) } -func FireEventProposalHeartbeat(fireable events.Fireable, rs EventDataProposalHeartbeat) { - fireEvent(fireable, EventStringProposalHeartbeat(), TMEventData{rs}) +type TxEventPublisher interface { + PublishEventTx(EventDataTx) error } diff --git a/types/nop_event_bus.go b/types/nop_event_bus.go new file mode 100644 index 000000000..06b70987d --- /dev/null +++ b/types/nop_event_bus.go @@ -0,0 +1,77 @@ +package types + +import ( + "context" + + tmpubsub "github.com/tendermint/tmlibs/pubsub" +) + +type NopEventBus struct{} + +func (NopEventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error { + return nil +} + +func (NopEventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { + return nil +} + +func (NopEventBus) UnsubscribeAll(ctx context.Context, subscriber string) error { + return nil +} + +//--- block, tx, and vote events + +func (NopEventBus) PublishEventNewBlock(block EventDataNewBlock) error { + return nil +} + +func (NopEventBus) PublishEventNewBlockHeader(header EventDataNewBlockHeader) error { + return nil +} + +func (NopEventBus) PublishEventVote(vote EventDataVote) error { + return nil +} + +func (NopEventBus) PublishEventTx(tx EventDataTx) error { + return nil +} + +//--- EventDataRoundState events + +func (NopEventBus) PublishEventNewRoundStep(rs EventDataRoundState) error { + return nil +} + +func (NopEventBus) PublishEventTimeoutPropose(rs EventDataRoundState) error { + return nil +} + +func (NopEventBus) PublishEventTimeoutWait(rs EventDataRoundState) error { + return nil +} + +func (NopEventBus) PublishEventNewRound(rs EventDataRoundState) error { + return nil +} + +func (NopEventBus) PublishEventCompleteProposal(rs EventDataRoundState) error { + return nil +} + +func (NopEventBus) PublishEventPolka(rs EventDataRoundState) error { + return nil +} + +func (NopEventBus) PublishEventUnlock(rs EventDataRoundState) error { + return nil +} + +func (NopEventBus) PublishEventRelock(rs EventDataRoundState) error { + return nil +} + +func (NopEventBus) PublishEventLock(rs EventDataRoundState) error { + return nil +}