diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index b0f22e54f..18a6310c3 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -885,8 +885,11 @@ func randConsensusNetWithPeers( app := appFunc(logger, filepath.Join(cfg.DBDir(), fmt.Sprintf("%s_%d", testName, i))) vals := types.TM2PB.ValidatorUpdates(state.Validators) - if _, ok := app.(*kvstore.PersistentKVStoreApplication); ok { - // simulate handshake, receive app version. If don't do this, replay test will fail + switch app.(type) { + // simulate handshake, receive app version. If don't do this, replay test will fail + case *kvstore.PersistentKVStoreApplication: + state.Version.Consensus.App = kvstore.ProtocolVersion + case *kvstore.Application: state.Version.Consensus.App = kvstore.ProtocolVersion } app.InitChain(abci.RequestInitChain{Validators: vals}) @@ -973,10 +976,6 @@ func newEpehemeralKVStore(_ log.Logger, _ string) abci.Application { return kvstore.NewApplication() } -func newPersistentKVStore(logger log.Logger, dbDir string) abci.Application { - return kvstore.NewPersistentKVStoreApplication(logger, dbDir) -} - func signDataIsEqual(v1 *types.Vote, v2 *tmproto.Vote) bool { if v1 == nil || v2 == nil { return false diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 822e8c627..52e547bf6 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -348,6 +348,8 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote) // internal pubsub defined in the consensus state to broadcast them to peers // upon receiving. func (r *Reactor) subscribeToBroadcastEvents() { + onStopCh := r.state.getOnStopCh() + err := r.state.evsw.AddListenerForEvent( listenerIDConsensus, types.EventNewRoundStepValue, @@ -356,7 +358,7 @@ func (r *Reactor) subscribeToBroadcastEvents() { return err } select { - case r.state.onStopCh <- data.(*cstypes.RoundState): + case onStopCh <- data.(*cstypes.RoundState): return nil case <-ctx.Done(): return ctx.Err() diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index c735e9977..66e890b83 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -8,7 +8,6 @@ import ( "io" "math/rand" "os" - "path/filepath" "runtime" "testing" "time" @@ -338,13 +337,13 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite { nPeers, "replay_test", newMockTickerFunc(true), - newPersistentKVStore) + newEpehemeralKVStore) sim.Config = cfg + defer func() { t.Cleanup(cleanup) }() var err error sim.GenesisState, err = sm.MakeGenesisState(genDoc) require.NoError(t, err) - sim.CleanupFunc = cleanup partSize := types.BlockPartSizeBytes @@ -584,9 +583,6 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite { sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i))) sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i))) } - if sim.CleanupFunc != nil { - t.Cleanup(sim.CleanupFunc) - } return sim } @@ -743,19 +739,14 @@ func testHandshakeReplay( ) latestAppHash := state.AppHash - // make a new client creator - kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger, - filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int()))) - t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) }) - eventBus := eventbus.NewDefault(logger) require.NoError(t, eventBus.Start(ctx)) - clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp) + client := abciclient.NewLocalClient(logger, kvstore.NewApplication()) if nBlocks > 0 { // run nBlocks against a new client to build up the app state. // use a throwaway tendermint state - proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics()) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) stateDB1 := dbm.NewMemDB() stateStore := sm.NewStore(stateDB1) err := stateStore.Save(genesisState) @@ -776,7 +767,7 @@ func testHandshakeReplay( genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) require.NoError(t, err) handshaker := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc) - proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics()) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") require.True(t, proxyApp.IsRunning()) require.NotNil(t, proxyApp) @@ -905,10 +896,7 @@ func buildTMStateFromChain( t.Helper() // run the whole chain against this client to build up the tendermint state - kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger, - filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode))) - defer kvstoreApp.Close() - client := abciclient.NewLocalClient(logger, kvstoreApp) + client := abciclient.NewLocalClient(logger, kvstore.NewApplication()) proxyApp := proxy.New(client, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 00519a884..109e90683 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -452,10 +452,6 @@ func (cs *State) OnStart(ctx context.Context) error { } } - if err := cs.evsw.Start(ctx); err != nil { - return err - } - // Double Signing Risk Reduction if err := cs.checkDoubleSigningRisk(cs.Height); err != nil { return err @@ -497,12 +493,19 @@ func (cs *State) loadWalFile(ctx context.Context) error { return nil } +func (cs *State) getOnStopCh() chan *cstypes.RoundState { + cs.mtx.RLock() + defer cs.mtx.RUnlock() + + return cs.onStopCh +} + // OnStop implements service.Service. func (cs *State) OnStop() { // If the node is committing a new block, wait until it is finished! if cs.GetRoundState().Step == cstypes.RoundStepCommit { select { - case <-cs.onStopCh: + case <-cs.getOnStopCh(): case <-time.After(cs.config.TimeoutCommit): cs.logger.Error("OnStop: timeout waiting for commit to finish", "time", cs.config.TimeoutCommit) } @@ -510,10 +513,6 @@ func (cs *State) OnStop() { close(cs.onStopCh) - if cs.evsw.IsRunning() { - cs.evsw.Stop() - } - if cs.timeoutTicker.IsRunning() { cs.timeoutTicker.Stop() } diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index e99c054c0..b2e2e7131 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -7,7 +7,6 @@ import ( "fmt" "io" mrand "math/rand" - "path/filepath" "testing" "time" @@ -26,18 +25,17 @@ import ( "github.com/tendermint/tendermint/types" ) -// WALGenerateNBlocks generates a consensus WAL. It does this by spinning up a -// stripped down version of node (proxy app, event bus, consensus state) with a -// persistent kvstore application and special consensus wal instance -// (byteBufferWAL) and waits until numBlocks are created. +// WALGenerateNBlocks generates a consensus WAL. It does this by +// spinning up a stripped down version of node (proxy app, event bus, +// consensus state) with a kvstore application and special consensus +// wal instance (byteBufferWAL) and waits until numBlocks are created. // If the node fails to produce given numBlocks, it fails the test. func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr io.Writer, numBlocks int) { t.Helper() cfg := getConfig(t) - app := kvstore.NewPersistentKVStoreApplication(logger, filepath.Join(cfg.DBDir(), "wal_generator")) - t.Cleanup(func() { require.NoError(t, app.Close()) }) + app := kvstore.NewApplication() logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks) diff --git a/internal/libs/autofile/group.go b/internal/libs/autofile/group.go index 0d1806f22..81e16feea 100644 --- a/internal/libs/autofile/group.go +++ b/internal/libs/autofile/group.go @@ -69,11 +69,6 @@ type Group struct { minIndex int // Includes head maxIndex int // Includes head, where Head will move to - // close this when the processTicks routine is done. - // this ensures we can cleanup the dir after calling Stop - // and the routine won't be trying to access it anymore - doneProcessTicks chan struct{} - // TODO: When we start deleting files, we need to start tracking GroupReaders // and their dependencies. } @@ -101,7 +96,6 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt groupCheckDuration: defaultGroupCheckDuration, minIndex: 0, maxIndex: 0, - doneProcessTicks: make(chan struct{}), } for _, option := range groupOptions { @@ -154,13 +148,6 @@ func (g *Group) OnStop() { } } -// Wait blocks until all internal goroutines are finished. Supposed to be -// called after Stop. -func (g *Group) Wait() { - // wait for processTicks routine to finish - <-g.doneProcessTicks -} - // Close closes the head file. The group must be stopped by this moment. func (g *Group) Close() { if err := g.FlushAndSync(); err != nil { @@ -241,8 +228,6 @@ func (g *Group) FlushAndSync() error { } func (g *Group) processTicks(ctx context.Context) { - defer close(g.doneProcessTicks) - for { select { case <-ctx.Done(): diff --git a/libs/events/events.go b/libs/events/events.go index 5ab5961f6..d96afd7bd 100644 --- a/libs/events/events.go +++ b/libs/events/events.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/service" ) // ErrListenerWasRemoved is returned by AddEvent if the listener was removed. @@ -45,16 +44,11 @@ type Fireable interface { // They can be removed by calling either RemoveListenerForEvent or // RemoveListener (for all events). type EventSwitch interface { - service.Service Fireable - Stop() - AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error } type eventSwitch struct { - service.BaseService - mtx sync.RWMutex eventCells map[string]*eventCell listeners map[string]*eventListener @@ -65,13 +59,9 @@ func NewEventSwitch(logger log.Logger) EventSwitch { eventCells: make(map[string]*eventCell), listeners: make(map[string]*eventListener), } - evsw.BaseService = *service.NewBaseService(logger, "EventSwitch", evsw) return evsw } -func (evsw *eventSwitch) OnStart(ctx context.Context) error { return nil } -func (evsw *eventSwitch) OnStop() {} - func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error { // Get/Create eventCell and listener. evsw.mtx.Lock() diff --git a/libs/events/events_test.go b/libs/events/events_test.go index 15f208e06..55e0103da 100644 --- a/libs/events/events_test.go +++ b/libs/events/events_test.go @@ -21,8 +21,6 @@ func TestAddListenerForEventFireOnce(t *testing.T) { logger := log.NewTestingLogger(t) evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) messages := make(chan EventData) require.NoError(t, evsw.AddListenerForEvent("listener", "event", @@ -50,8 +48,6 @@ func TestAddListenerForEventFireMany(t *testing.T) { logger := log.NewTestingLogger(t) evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) doneSum := make(chan uint64) doneSending := make(chan uint64) @@ -88,8 +84,6 @@ func TestAddListenerForDifferentEvents(t *testing.T) { logger := log.NewTestingLogger(t) evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) doneSum := make(chan uint64) doneSending1 := make(chan uint64) @@ -151,9 +145,6 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) { logger := log.NewTestingLogger(t) evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - - t.Cleanup(evsw.Wait) doneSum1 := make(chan uint64) doneSum2 := make(chan uint64) @@ -247,8 +238,6 @@ func TestManageListenersAsync(t *testing.T) { logger := log.NewTestingLogger(t) evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) doneSum1 := make(chan uint64) doneSum2 := make(chan uint64)