diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index c4a92aabf..1cdae1abb 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -162,9 +162,7 @@ func (cli *grpcClient) StopForError(err error) { cli.mtx.Unlock() cli.logger.Error("Stopping abci.grpcClient for error", "err", err) - if err := cli.Stop(); err != nil { - cli.logger.Error("error stopping abci.grpcClient", "err", err) - } + cli.Stop() } func (cli *grpcClient) Error() error { diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 30a97cade..074dd1d00 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -555,7 +555,5 @@ func (cli *socketClient) stopForError(err error) { cli.mtx.Unlock() cli.logger.Info("Stopping abci.socketClient", "reason", err) - if err := cli.Stop(); err != nil { - cli.logger.Error("error stopping abci.socketClient", "err", err) - } + cli.Stop() } diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 156640b35..18e8bbe45 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -235,9 +235,7 @@ func (pool *BlockPool) PopRequest() { defer pool.mtx.Unlock() if r := pool.requesters[pool.height]; r != nil { - if err := r.Stop(); err != nil { - pool.logger.Error("error stopping requester", "err", err) - } + r.Stop() delete(pool.requesters, pool.height) pool.height++ pool.lastAdvance = time.Now() @@ -676,9 +674,7 @@ OUTER_LOOP: case <-ctx.Done(): return case <-bpr.pool.exitedCh: - if err := bpr.Stop(); err != nil { - bpr.logger.Error("error stopped requester", "err", err) - } + bpr.Stop() return case peerID := <-bpr.redoCh: if peerID == bpr.peerID { diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 841deb849..0261c055a 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -187,9 +187,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { // blocking until they all exit. func (r *Reactor) OnStop() { if r.blockSync.IsSet() { - if err := r.pool.Stop(); err != nil { - r.logger.Error("failed to stop pool", "err", err) - } + r.pool.Stop() } // wait for the poolRoutine and requestRoutine goroutines to gracefully exit @@ -485,9 +483,7 @@ FOR_LOOP: continue } - if err := r.pool.Stop(); err != nil { - r.logger.Error("failed to stop pool", "err", err) - } + r.pool.Stop() r.blockSync.UnSet() diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index bddc2c2c3..c0f5ec94f 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -915,15 +915,9 @@ type mockTicker struct { fired bool } -func (m *mockTicker) Start(context.Context) error { - return nil -} - -func (m *mockTicker) Stop() error { - return nil -} - -func (m *mockTicker) IsRunning() bool { return false } +func (m *mockTicker) Start(context.Context) error { return nil } +func (m *mockTicker) Stop() {} +func (m *mockTicker) IsRunning() bool { return false } func (m *mockTicker) ScheduleTimeout(ti timeoutInfo) { m.mtx.Lock() @@ -941,8 +935,6 @@ func (m *mockTicker) Chan() <-chan timeoutInfo { return m.c } -func (*mockTicker) SetLogger(log.Logger) {} - func newPersistentKVStore(t *testing.T, logger log.Logger) abci.Application { t.Helper() diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index f52c7b00d..1673c0e21 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -219,11 +219,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { func (r *Reactor) OnStop() { r.unsubscribeFromBroadcastEvents() - if err := r.state.Stop(); err != nil { - if !errors.Is(err, service.ErrAlreadyStopped) { - r.logger.Error("failed to stop consensus state", "err", err) - } - } + r.state.Stop() if !r.WaitSync() { r.state.Wait() diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index e2d855b7c..310eb0ab6 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -142,9 +142,7 @@ func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *pl // go back count steps by resetting the state and running (pb.count - count) steps func (pb *playback) replayReset(ctx context.Context, count int, newStepSub eventbus.Subscription) error { - if err := pb.cs.Stop(); err != nil { - return err - } + pb.cs.Stop() pb.cs.Wait() newCS := NewState(ctx, pb.cs.logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index c39f36611..82ca54f31 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -79,9 +79,7 @@ func startNewStateAndWaitForBlock(ctx context.Context, t *testing.T, consensusRe require.NoError(t, cs.Start(ctx)) defer func() { - if err := cs.Stop(); err != nil { - t.Error(err) - } + cs.Stop() }() t.Cleanup(cs.Wait) // This is just a signal that we haven't halted; its not something contained @@ -208,7 +206,7 @@ LOOP: startNewStateAndWaitForBlock(ctx, t, consensusReplayConfig, cs.Height, blockDB, stateStore) // stop consensus state and transactions sender (initFn) - cs.Stop() //nolint:errcheck // Logging this error causes failure + cs.Stop() cancel() // if we reached the required height, exit @@ -292,7 +290,7 @@ func (w *crashingWAL) SearchForEndHeight( } func (w *crashingWAL) Start(ctx context.Context) error { return w.next.Start(ctx) } -func (w *crashingWAL) Stop() error { return w.next.Stop() } +func (w *crashingWAL) Stop() { w.next.Stop() } func (w *crashingWAL) Wait() { w.next.Wait() } //------------------------------------------------------------------------------------------ diff --git a/internal/consensus/state.go b/internal/consensus/state.go index eb0c26bed..f8f832381 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -396,10 +396,7 @@ func (cs *State) OnStart(ctx context.Context) error { cs.logger.Error("the WAL file is corrupted; attempting repair", "err", err) // 1) prep work - if err := cs.wal.Stop(); err != nil { - - return err - } + cs.wal.Stop() repairAttempted = true @@ -494,19 +491,11 @@ func (cs *State) OnStop() { close(cs.onStopCh) if cs.evsw.IsRunning() { - if err := cs.evsw.Stop(); err != nil { - if !errors.Is(err, service.ErrAlreadyStopped) { - cs.logger.Error("failed trying to stop eventSwitch", "error", err) - } - } + cs.evsw.Stop() } if cs.timeoutTicker.IsRunning() { - if err := cs.timeoutTicker.Stop(); err != nil { - if !errors.Is(err, service.ErrAlreadyStopped) { - cs.logger.Error("failed trying to stop timeoutTicket", "error", err) - } - } + cs.timeoutTicker.Stop() } // WAL is stopped in receiveRoutine. } @@ -515,6 +504,7 @@ func (cs *State) OnStop() { // NOTE: be sure to Stop() the event switch and drain // any event channels or this may deadlock func (cs *State) Wait() { + cs.evsw.Wait() <-cs.done } @@ -840,12 +830,7 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) { // priv_val tracks LastSig // close wal now that we're done writing to it - if err := cs.wal.Stop(); err != nil { - if !errors.Is(err, service.ErrAlreadyStopped) { - cs.logger.Error("failed trying to stop WAL", "error", err) - } - } - + cs.wal.Stop() cs.wal.Wait() close(cs.done) } diff --git a/internal/consensus/ticker.go b/internal/consensus/ticker.go index 26570b734..103c48efc 100644 --- a/internal/consensus/ticker.go +++ b/internal/consensus/ticker.go @@ -17,7 +17,7 @@ var ( // The timeoutInfo.Duration may be non-positive. type TimeoutTicker interface { Start(context.Context) error - Stop() error + Stop() IsRunning() bool Chan() <-chan timeoutInfo // on which to receive a timeout ScheduleTimeout(ti timeoutInfo) // reset the timer diff --git a/internal/consensus/wal.go b/internal/consensus/wal.go index 92d7a6b82..d5dd15844 100644 --- a/internal/consensus/wal.go +++ b/internal/consensus/wal.go @@ -67,7 +67,7 @@ type WAL interface { // service methods Start(context.Context) error - Stop() error + Stop() Wait() } @@ -164,15 +164,9 @@ func (wal *BaseWAL) FlushAndSync() error { func (wal *BaseWAL) OnStop() { wal.flushTicker.Stop() if err := wal.FlushAndSync(); err != nil { - if !errors.Is(err, service.ErrAlreadyStopped) { - wal.logger.Error("error on flush data to disk", "error", err) - } - } - if err := wal.group.Stop(); err != nil { - if !errors.Is(err, service.ErrAlreadyStopped) { - wal.logger.Error("error trying to stop wal", "error", err) - } + wal.logger.Error("error on flush data to disk", "error", err) } + wal.group.Stop() wal.group.Close() } @@ -438,5 +432,5 @@ func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io return nil, false, nil } func (nilWAL) Start(context.Context) error { return nil } -func (nilWAL) Stop() error { return nil } +func (nilWAL) Stop() {} func (nilWAL) Wait() {} diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index 76294288c..57f6d6704 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -102,14 +102,10 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr select { case <-numBlocksWritten: - if err := consensusState.Stop(); err != nil { - t.Error(err) - } + consensusState.Stop() return nil case <-time.After(1 * time.Minute): - if err := consensusState.Stop(); err != nil { - t.Error(err) - } + consensusState.Stop() return fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks) } } @@ -219,5 +215,5 @@ func (w *byteBufferWAL) SearchForEndHeight( } func (w *byteBufferWAL) Start(context.Context) error { return nil } -func (w *byteBufferWAL) Stop() error { return nil } +func (w *byteBufferWAL) Stop() {} func (w *byteBufferWAL) Wait() {} diff --git a/internal/consensus/wal_test.go b/internal/consensus/wal_test.go index 8a39b31b5..d9c05810d 100644 --- a/internal/consensus/wal_test.go +++ b/internal/consensus/wal_test.go @@ -3,7 +3,6 @@ package consensus import ( "bytes" "context" - "errors" "path/filepath" "testing" @@ -17,7 +16,6 @@ import ( "github.com/tendermint/tendermint/internal/consensus/types" "github.com/tendermint/tendermint/internal/libs/autofile" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/service" tmtime "github.com/tendermint/tendermint/libs/time" tmtypes "github.com/tendermint/tendermint/types" ) @@ -191,11 +189,7 @@ func TestWALPeriodicSync(t *testing.T) { require.NoError(t, wal.Start(ctx)) t.Cleanup(func() { - if err := wal.Stop(); err != nil { - if !errors.Is(err, service.ErrAlreadyStopped) { - t.Error(err) - } - } + wal.Stop() wal.Wait() }) diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index 7ee081426..7d9c812c6 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -110,7 +110,8 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint t.Cleanup(func() { for _, r := range rts.reactors { if r.IsRunning() { - require.NoError(t, r.Stop()) + r.Stop() + r.Wait() require.False(t, r.IsRunning()) } } diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index 9c849cf2a..c073a7356 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -96,7 +96,7 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint) t.Cleanup(func() { for nodeID := range rts.reactors { if rts.reactors[nodeID].IsRunning() { - require.NoError(t, rts.reactors[nodeID].Stop()) + rts.reactors[nodeID].Stop() rts.reactors[nodeID].Wait() require.False(t, rts.reactors[nodeID].IsRunning()) } @@ -184,8 +184,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { }() } - err := primaryReactor.Stop() - require.NoError(t, err) + primaryReactor.Stop() wg.Wait() } diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index ba9e9a2e3..ab0f45739 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -294,9 +294,7 @@ func (c *MConnection) _recover(ctx context.Context) { } func (c *MConnection) stopForError(ctx context.Context, r interface{}) { - if err := c.Stop(); err != nil { - c.logger.Error("error stopping connection", "err", err) - } + c.Stop() if atomic.CompareAndSwapUint32(&c.errored, 0, 1) { if c.onError != nil { diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 3117472be..cc195c22a 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -208,7 +208,8 @@ func (n *Network) Remove(ctx context.Context, t *testing.T, id types.NodeID) { require.NoError(t, node.Transport.Close()) node.cancel() if node.Router.IsRunning() { - require.NoError(t, node.Router.Stop()) + node.Router.Stop() + node.Router.Wait() } for _, sub := range subs { @@ -275,7 +276,8 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions) t.Cleanup(func() { if router.IsRunning() { - require.NoError(t, router.Stop()) + router.Stop() + router.Wait() } require.NoError(t, transport.Close()) cancel() diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 2b5632104..61bdd2f89 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -438,7 +438,7 @@ func TestRouter_AcceptPeers(t *testing.T) { } } - require.NoError(t, router.Stop()) + router.Stop() mockTransport.AssertExpectations(t) mockConnection.AssertExpectations(t) }) @@ -478,7 +478,7 @@ func TestRouter_AcceptPeers_Error(t *testing.T) { require.NoError(t, router.Start(ctx)) time.Sleep(time.Second) - require.NoError(t, router.Stop()) + router.Stop() mockTransport.AssertExpectations(t) } @@ -516,7 +516,7 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) { require.NoError(t, router.Start(ctx)) time.Sleep(time.Second) - require.NoError(t, router.Stop()) + router.Stop() mockTransport.AssertExpectations(t) } @@ -573,7 +573,7 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) { close(closeCh) time.Sleep(100 * time.Millisecond) - require.NoError(t, router.Stop()) + router.Stop() mockTransport.AssertExpectations(t) mockConnection.AssertExpectations(t) } @@ -687,7 +687,7 @@ func TestRouter_DialPeers(t *testing.T) { } } - require.NoError(t, router.Stop()) + router.Stop() mockTransport.AssertExpectations(t) mockConnection.AssertExpectations(t) }) @@ -778,7 +778,7 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { close(closeCh) time.Sleep(500 * time.Millisecond) - require.NoError(t, router.Stop()) + router.Stop() mockTransport.AssertExpectations(t) mockConnection.AssertExpectations(t) } @@ -845,7 +845,7 @@ func TestRouter_EvictPeers(t *testing.T) { Status: p2p.PeerStatusDown, }) - require.NoError(t, router.Stop()) + router.Stop() mockTransport.AssertExpectations(t) mockConnection.AssertExpectations(t) } @@ -895,7 +895,7 @@ func TestRouter_ChannelCompatability(t *testing.T) { require.NoError(t, err) require.NoError(t, router.Start(ctx)) time.Sleep(1 * time.Second) - require.NoError(t, router.Stop()) + router.Stop() require.Empty(t, peerManager.Peers()) mockConnection.AssertExpectations(t) @@ -964,6 +964,6 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { Message: &p2ptest.Message{Value: "Hi"}, })) - require.NoError(t, router.Stop()) + router.Stop() mockTransport.AssertExpectations(t) } diff --git a/internal/p2p/transport_mconn.go b/internal/p2p/transport_mconn.go index 222dbf79c..b52bd5d7b 100644 --- a/internal/p2p/transport_mconn.go +++ b/internal/p2p/transport_mconn.go @@ -478,12 +478,13 @@ func (c *mConnConnection) RemoteEndpoint() Endpoint { func (c *mConnConnection) Close() error { var err error c.closeOnce.Do(func() { + defer close(c.doneCh) + if c.mconn != nil && c.mconn.IsRunning() { - err = c.mconn.Stop() + c.mconn.Stop() } else { err = c.conn.Close() } - close(c.doneCh) }) return err } diff --git a/internal/proxy/multi_app_conn.go b/internal/proxy/multi_app_conn.go index 56b70b7b3..54d522270 100644 --- a/internal/proxy/multi_app_conn.go +++ b/internal/proxy/multi_app_conn.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "errors" "fmt" "os" "syscall" @@ -67,7 +66,7 @@ type multiAppConn struct { // of reasonable lifecycle witout needing an explicit stop method. type stoppableClient interface { abciclient.Client - Stop() error + Stop() } // NewMultiAppConn makes all necessary abci connections to the application. @@ -81,53 +80,42 @@ func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metric return multiAppConn } -func (app *multiAppConn) Mempool() AppConnMempool { - return app.mempoolConn -} - -func (app *multiAppConn) Consensus() AppConnConsensus { - return app.consensusConn -} - -func (app *multiAppConn) Query() AppConnQuery { - return app.queryConn -} - -func (app *multiAppConn) Snapshot() AppConnSnapshot { - return app.snapshotConn -} +func (app *multiAppConn) Mempool() AppConnMempool { return app.mempoolConn } +func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn } +func (app *multiAppConn) Query() AppConnQuery { return app.queryConn } +func (app *multiAppConn) Snapshot() AppConnSnapshot { return app.snapshotConn } func (app *multiAppConn) OnStart(ctx context.Context) error { - c, err := app.abciClientFor(ctx, connQuery) + var err error + defer func() { + if err != nil { + app.stopAllClients() + } + }() + + app.queryConnClient, err = app.abciClientFor(ctx, connQuery) if err != nil { return err } - app.queryConnClient = c.(stoppableClient) - app.queryConn = NewAppConnQuery(c, app.metrics) + app.queryConn = NewAppConnQuery(app.queryConnClient, app.metrics) - c, err = app.abciClientFor(ctx, connSnapshot) + app.snapshotConnClient, err = app.abciClientFor(ctx, connSnapshot) if err != nil { - app.stopAllClients() return err } - app.snapshotConnClient = c.(stoppableClient) - app.snapshotConn = NewAppConnSnapshot(c, app.metrics) + app.snapshotConn = NewAppConnSnapshot(app.snapshotConnClient, app.metrics) - c, err = app.abciClientFor(ctx, connMempool) + app.mempoolConnClient, err = app.abciClientFor(ctx, connMempool) if err != nil { - app.stopAllClients() return err } - app.mempoolConnClient = c.(stoppableClient) - app.mempoolConn = NewAppConnMempool(c, app.metrics) + app.mempoolConn = NewAppConnMempool(app.mempoolConnClient, app.metrics) - c, err = app.abciClientFor(ctx, connConsensus) + app.consensusConnClient, err = app.abciClientFor(ctx, connConsensus) if err != nil { - app.stopAllClients() return err } - app.consensusConnClient = c.(stoppableClient) - app.consensusConn = NewAppConnConsensus(c, app.metrics) + app.consensusConn = NewAppConnConsensus(app.consensusConnClient, app.metrics) // Kill Tendermint if the ABCI application crashes. app.startWatchersForClientErrorToKillTendermint(ctx) @@ -135,9 +123,7 @@ func (app *multiAppConn) OnStart(ctx context.Context) error { return nil } -func (app *multiAppConn) OnStop() { - app.stopAllClients() -} +func (app *multiAppConn) OnStop() { app.stopAllClients() } func (app *multiAppConn) startWatchersForClientErrorToKillTendermint(ctx context.Context) { // this function starts a number of threads (per abci client) @@ -154,12 +140,10 @@ func (app *multiAppConn) startWatchersForClientErrorToKillTendermint(ctx context } } - type op struct { + for _, client := range []struct { connClient stoppableClient name string - } - - for _, client := range []op{ + }{ { connClient: app.consensusConnClient, name: connConsensus, @@ -190,47 +174,36 @@ func (app *multiAppConn) startWatchersForClientErrorToKillTendermint(ctx context } func (app *multiAppConn) stopAllClients() { - if app.consensusConnClient != nil { - if err := app.consensusConnClient.Stop(); err != nil { - if !errors.Is(err, service.ErrAlreadyStopped) { - app.logger.Error("error while stopping consensus client", "error", err) - } - } - } - if app.mempoolConnClient != nil { - if err := app.mempoolConnClient.Stop(); err != nil { - if !errors.Is(err, service.ErrAlreadyStopped) { - app.logger.Error("error while stopping mempool client", "error", err) - } - } - } - if app.queryConnClient != nil { - if err := app.queryConnClient.Stop(); err != nil { - if !errors.Is(err, service.ErrAlreadyStopped) { - app.logger.Error("error while stopping query client", "error", err) - } - } - } - if app.snapshotConnClient != nil { - if err := app.snapshotConnClient.Stop(); err != nil { - if !errors.Is(err, service.ErrAlreadyStopped) { - app.logger.Error("error while stopping snapshot client", "error", err) - } + for _, client := range []stoppableClient{ + app.consensusConnClient, + app.mempoolConnClient, + app.queryConnClient, + app.snapshotConnClient, + } { + if client != nil { + client.Stop() } } } -func (app *multiAppConn) abciClientFor(ctx context.Context, conn string) (abciclient.Client, error) { +func (app *multiAppConn) abciClientFor(ctx context.Context, conn string) (stoppableClient, error) { c, err := app.clientCreator(app.logger.With( "module", "abci-client", "connection", conn)) if err != nil { return nil, fmt.Errorf("error creating ABCI client (%s connection): %w", conn, err) } + if err := c.Start(ctx); err != nil { return nil, fmt.Errorf("error starting ABCI client (%s connection): %w", conn, err) } - return c, nil + + client, ok := c.(stoppableClient) + if !ok { + return nil, fmt.Errorf("%T is not a stoppable client", c) + } + + return client, nil } func kill() error { diff --git a/internal/proxy/multi_app_conn_test.go b/internal/proxy/multi_app_conn_test.go index 98ea0ca53..dd6dad5de 100644 --- a/internal/proxy/multi_app_conn_test.go +++ b/internal/proxy/multi_app_conn_test.go @@ -23,7 +23,7 @@ type noopStoppableClientImpl struct { count int } -func (c *noopStoppableClientImpl) Stop() error { c.count++; return nil } +func (c *noopStoppableClientImpl) Stop() { c.count++ } func TestAppConns_Start_Stop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/state/execution_test.go b/internal/state/execution_test.go index a8ae6c138..f3088dbac 100644 --- a/internal/state/execution_test.go +++ b/internal/state/execution_test.go @@ -439,7 +439,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { eventBus := eventbus.NewDefault(logger) err = eventBus.Start(ctx) require.NoError(t, err) - defer eventBus.Stop() //nolint:errcheck // ignore for tests + defer eventBus.Stop() blockExec.SetEventBus(eventBus) diff --git a/libs/events/events.go b/libs/events/events.go index f97dfb1a1..636aa102d 100644 --- a/libs/events/events.go +++ b/libs/events/events.go @@ -47,7 +47,7 @@ type Fireable interface { type EventSwitch interface { service.Service Fireable - Stop() error + Stop() AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error RemoveListenerForEvent(event string, listenerID string) diff --git a/libs/service/service.go b/libs/service/service.go index daeead03e..f7701633a 100644 --- a/libs/service/service.go +++ b/libs/service/service.go @@ -9,15 +9,9 @@ import ( ) var ( - // ErrAlreadyStarted is returned when somebody tries to start an already - // running service. - ErrAlreadyStarted = errors.New("already started") - // ErrAlreadyStopped is returned when somebody tries to stop an already + // errAlreadyStopped is returned when somebody tries to stop an already // stopped service (without resetting it). - ErrAlreadyStopped = errors.New("already stopped") - // ErrNotStarted is returned when somebody tries to stop a not running - // service. - ErrNotStarted = errors.New("not started") + errAlreadyStopped = errors.New("already stopped") ) // Service defines a service that can be started, stopped, and reset. @@ -46,13 +40,14 @@ type Implementation interface { /* Classical-inheritance-style service declarations. Services can be started, then -stopped, then optionally restarted. +stopped, but cannot be restarted. -Users can override the OnStart/OnStop methods. In the absence of errors, these +Users must implement OnStart/OnStop methods. In the absence of errors, these methods are guaranteed to be called at most once. If OnStart returns an error, service won't be marked as started, so the user can call Start again. -It is safe, but an error, to call Stop without calling Start first. +The BaseService implementation ensures that the OnStop method is +called after the context passed to Start is canceled. Typical usage: @@ -74,7 +69,7 @@ Typical usage: // start subroutines, etc. } - func (fs *FooService) OnStop() error { + func (fs *FooService) OnStop() { // close/destroy private fields // stop subroutines, etc. } @@ -99,20 +94,20 @@ func NewBaseService(logger log.Logger, name string, impl Implementation) *BaseSe } } -// Start starts the Service and calls its OnStart method. An error will be -// returned if the service is already running or stopped. To restart a -// stopped service, call Reset. +// Start starts the Service and calls its OnStart method. An error +// will be returned if the service is stopped, but not if it is +// already running. func (bs *BaseService) Start(ctx context.Context) error { bs.mtx.Lock() defer bs.mtx.Unlock() if bs.quit != nil { - return ErrAlreadyStarted + return nil } select { case <-bs.quit: - return ErrAlreadyStopped + return errAlreadyStopped default: bs.logger.Info("starting service", "service", bs.name, "impl", bs.name) if err := bs.impl.OnStart(ctx); err != nil { @@ -132,7 +127,7 @@ func (bs *BaseService) Start(ctx context.Context) error { // this means stop was called manually return case <-ctx.Done(): - _ = bs.Stop() + bs.Stop() } bs.logger.Info("stopped service", @@ -143,25 +138,26 @@ func (bs *BaseService) Start(ctx context.Context) error { } } -// Stop implements Service by calling OnStop (if defined) and closing quit -// channel. An error will be returned if the service is already stopped. -func (bs *BaseService) Stop() error { +// Stop manually terminates the service by calling OnStop method from +// the implementation and releases all resources related to the +// service. +func (bs *BaseService) Stop() { bs.mtx.Lock() defer bs.mtx.Unlock() if bs.quit == nil { - return ErrNotStarted + return } select { case <-bs.quit: - return ErrAlreadyStopped + return default: bs.logger.Info("stopping service", "service", bs.name) bs.impl.OnStop() bs.cancel() - return nil + return } } diff --git a/libs/service/service_test.go b/libs/service/service_test.go index 9b4f84de9..d0b8ce57e 100644 --- a/libs/service/service_test.go +++ b/libs/service/service_test.go @@ -96,7 +96,7 @@ func TestBaseService(t *testing.T) { require.True(t, ts.isStarted()) - require.NoError(t, ts.Stop()) + ts.Stop() require.True(t, ts.isStopped()) require.False(t, ts.IsRunning()) }) @@ -107,10 +107,10 @@ func TestBaseService(t *testing.T) { require.NoError(t, ts.Start(ctx)) require.True(t, ts.isStarted()) - require.NoError(t, ts.Stop()) + ts.Stop() require.True(t, ts.isStopped()) require.False(t, ts.isMultiStopped()) - require.Error(t, ts.Stop()) + ts.Stop() require.False(t, ts.isMultiStopped()) }) t.Run("MultiThreaded", func(t *testing.T) { @@ -123,7 +123,7 @@ func TestBaseService(t *testing.T) { require.NoError(t, ts.Start(ctx)) require.True(t, ts.isStarted()) - go func() { _ = ts.Stop() }() + go ts.Stop() go cancel() ts.Wait() diff --git a/node/node.go b/node/node.go index b2020829c..275363c58 100644 --- a/node/node.go +++ b/node/node.go @@ -177,6 +177,7 @@ func makeNode( if err != nil { return nil, combineCloseError(err, makeCloser(closers)) } + closers = append(closers, func() error { indexerService.Stop(); return nil }) privValidator, err := createPrivval(ctx, logger, cfg, genDoc, filePrivval) if err != nil { @@ -363,7 +364,6 @@ func makeNode( services: []service.Service{ eventBus, - indexerService, evReactor, mpReactor, csReactor, @@ -473,10 +473,6 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { for _, reactor := range n.services { if err := reactor.Start(ctx); err != nil { - if errors.Is(err, service.ErrAlreadyStarted) { - continue - } - return fmt.Errorf("problem starting service '%T': %w ", reactor, err) } } @@ -515,9 +511,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { if err != nil { n.logger.Error("state sync failed; shutting down this node", "err", err) // stop the node - if err := n.Stop(); err != nil { - n.logger.Error("failed to shut down node", "err", err) - } + n.Stop() return err } diff --git a/node/node_test.go b/node/node_test.go index a9ab13f8c..e42bf29d8 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -178,7 +178,7 @@ func TestNodeSetPrivValTCP(t *testing.T) { err := signerServer.Start(ctx) require.NoError(t, err) }() - defer signerServer.Stop() //nolint:errcheck // ignore for tests + defer signerServer.Stop() genDoc, err := defaultGenesisDocProviderFunc(cfg)() require.NoError(t, err) @@ -242,7 +242,7 @@ func TestNodeSetPrivValIPC(t *testing.T) { err := pvsc.Start(ctx) require.NoError(t, err) }() - defer pvsc.Stop() //nolint:errcheck // ignore for tests + defer pvsc.Stop() genDoc, err := defaultGenesisDocProviderFunc(cfg)() require.NoError(t, err) diff --git a/privval/signer_client.go b/privval/signer_client.go index 3247a74b7..10ce8bc75 100644 --- a/privval/signer_client.go +++ b/privval/signer_client.go @@ -41,12 +41,12 @@ func NewSignerClient(ctx context.Context, endpoint *SignerListenerEndpoint, chai // Close closes the underlying connection func (sc *SignerClient) Close() error { - err := sc.endpoint.Stop() - cerr := sc.endpoint.Close() + sc.endpoint.Stop() + err := sc.endpoint.Close() if err != nil { return err } - return cerr + return nil } // IsConnected indicates with the signer is connected to a remote signing service diff --git a/privval/signer_client_test.go b/privval/signer_client_test.go index d8bb25828..272902fc9 100644 --- a/privval/signer_client_test.go +++ b/privval/signer_client_test.go @@ -81,7 +81,7 @@ func TestSignerClose(t *testing.T) { }() assert.NoError(t, tc.signerClient.Close()) - assert.NoError(t, tc.signerServer.Stop()) + tc.signerServer.Stop() }) } } diff --git a/privval/signer_listener_endpoint_test.go b/privval/signer_listener_endpoint_test.go index 4c9c31c42..6049c6245 100644 --- a/privval/signer_listener_endpoint_test.go +++ b/privval/signer_listener_endpoint_test.go @@ -125,9 +125,7 @@ func TestRetryConnToRemoteSigner(t *testing.T) { t.Cleanup(signerServer.Wait) <-endpointIsOpenCh - if err := signerServer.Stop(); err != nil { - t.Error(err) - } + signerServer.Stop() dialerEndpoint2 := NewSignerDialerEndpoint( logger, @@ -138,8 +136,8 @@ func TestRetryConnToRemoteSigner(t *testing.T) { // let some pings pass require.NoError(t, signerServer2.Start(ctx)) assert.True(t, signerServer2.IsRunning()) + t.Cleanup(signerServer2.Stop) t.Cleanup(signerServer2.Wait) - t.Cleanup(func() { _ = signerServer2.Stop() }) // give the client some time to re-establish the conn to the remote signer // should see sth like this in the logs: