diff --git a/internal/state/indexer/sink/kv/kv.go b/internal/state/indexer/sink/kv/kv.go index 4c471b4d3..fe7068a1b 100644 --- a/internal/state/indexer/sink/kv/kv.go +++ b/internal/state/indexer/sink/kv/kv.go @@ -18,14 +18,16 @@ var _ indexer.EventSink = (*EventSink)(nil) // The EventSink is an aggregator for redirecting the call path of the tx/block kvIndexer. // For the implementation details please see the kv.go in the indexer/block and indexer/tx folder. type EventSink struct { - txi *kvt.TxIndex - bi *kvb.BlockerIndexer + txi *kvt.TxIndex + bi *kvb.BlockerIndexer + store dbm.DB } func NewEventSink(store dbm.DB) indexer.EventSink { return &EventSink{ - txi: kvt.NewTxIndex(store), - bi: kvb.New(store), + txi: kvt.NewTxIndex(store), + bi: kvb.New(store), + store: store, } } @@ -58,5 +60,5 @@ func (kves *EventSink) HasBlock(h int64) (bool, error) { } func (kves *EventSink) Stop() error { - return nil + return kves.store.Close() } diff --git a/node/node.go b/node/node.go index 6092f4c43..6ee0d30f9 100644 --- a/node/node.go +++ b/node/node.go @@ -23,6 +23,7 @@ import ( "github.com/tendermint/tendermint/internal/proxy" rpccore "github.com/tendermint/tendermint/internal/rpc/core" sm "github.com/tendermint/tendermint/internal/state" + "github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/internal/statesync" "github.com/tendermint/tendermint/internal/store" "github.com/tendermint/tendermint/libs/log" @@ -62,6 +63,7 @@ type nodeImpl struct { // services eventBus *types.EventBus // pub/sub for services + eventSinks []indexer.EventSink stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk bcReactor service.Service // for block-syncing @@ -73,6 +75,7 @@ type nodeImpl struct { pexReactor service.Service // for exchanging peer addresses evidenceReactor service.Service rpcListeners []net.Listener // rpc servers + shutdownOps closer indexerService service.Service rpcEnv *rpccore.Environment prometheusSrv *http.Server @@ -106,6 +109,7 @@ func newDefaultNode(cfg *config.Config, logger log.Logger) (service.Service, err } appClient, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) + return makeNode(cfg, pval, nodeKey, @@ -123,33 +127,41 @@ func makeNode(cfg *config.Config, clientCreator abciclient.Creator, genesisDocProvider genesisDocProvider, dbProvider config.DBProvider, - logger log.Logger) (service.Service, error) { + logger log.Logger, +) (service.Service, error) { + closers := []closer{} - blockStore, stateDB, err := initDBs(cfg, dbProvider) + blockStore, stateDB, dbCloser, err := initDBs(cfg, dbProvider) if err != nil { - return nil, err + return nil, combineCloseError(err, dbCloser) } + closers = append(closers, dbCloser) + stateStore := sm.NewStore(stateDB) genDoc, err := genesisDocProvider() if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) } err = genDoc.ValidateAndComplete() if err != nil { - return nil, fmt.Errorf("error in genesis doc: %w", err) + return nil, combineCloseError( + fmt.Errorf("error in genesis doc: %w", err), + makeCloser(closers)) } state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). proxyApp, err := createAndStartProxyAppConns(clientCreator, logger) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } // EventBus and IndexerService must be started before the handshake because @@ -158,12 +170,13 @@ func makeNode(cfg *config.Config, // but before it indexed the txs, or, endblocker panicked) eventBus, err := createAndStartEventBus(logger) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) } // If an address is provided, listen on the socket for a connection from an @@ -175,12 +188,16 @@ func makeNode(cfg *config.Config, case "grpc": privValidator, err = createAndStartPrivValidatorGRPCClient(cfg, genDoc.ChainID, logger) if err != nil { - return nil, fmt.Errorf("error with private validator grpc client: %w", err) + return nil, combineCloseError( + fmt.Errorf("error with private validator grpc client: %w", err), + makeCloser(closers)) } default: privValidator, err = createAndStartPrivValidatorSocketClient(cfg.PrivValidator.ListenAddr, genDoc.ChainID, logger) if err != nil { - return nil, fmt.Errorf("error with private validator socket client: %w", err) + return nil, combineCloseError( + fmt.Errorf("error with private validator socket client: %w", err), + makeCloser(closers)) } } } @@ -188,10 +205,14 @@ func makeNode(cfg *config.Config, if cfg.Mode == config.ModeValidator { pubKey, err = privValidator.GetPubKey(context.TODO()) if err != nil { - return nil, fmt.Errorf("can't get pubkey: %w", err) + return nil, combineCloseError(fmt.Errorf("can't get pubkey: %w", err), + makeCloser(closers)) + } if pubKey == nil { - return nil, errors.New("could not retrieve public key from private validator") + return nil, combineCloseError( + errors.New("could not retrieve public key from private validator"), + makeCloser(closers)) } } @@ -207,7 +228,8 @@ func makeNode(cfg *config.Config, consensusLogger := logger.With("module", "consensus") if !stateSync { if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } // Reload the state. It will have the Version.Consensus.App set by the @@ -215,7 +237,9 @@ func makeNode(cfg *config.Config, // what happened during block replay). state, err = stateStore.Load() if err != nil { - return nil, fmt.Errorf("cannot load state: %w", err) + return nil, combineCloseError( + fmt.Errorf("cannot load state: %w", err), + makeCloser(closers)) } } @@ -229,38 +253,45 @@ func makeNode(cfg *config.Config, // TODO: Use a persistent peer database. nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } p2pLogger := logger.With("module", "p2p") transport := createTransport(p2pLogger, cfg) - peerManager, err := createPeerManager(cfg, dbProvider, nodeKey.ID) + peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, nodeKey.ID) + closers = append(closers, peerCloser) if err != nil { - return nil, fmt.Errorf("failed to create peer manager: %w", err) + return nil, combineCloseError( + fmt.Errorf("failed to create peer manager: %w", err), + makeCloser(closers)) } - nodeMetrics := - defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) + nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey, peerManager, transport, getRouterConfig(cfg, proxyApp)) if err != nil { - return nil, fmt.Errorf("failed to create router: %w", err) + return nil, combineCloseError( + fmt.Errorf("failed to create router: %w", err), + makeCloser(closers)) } mpReactorShim, mpReactor, mp, err := createMempoolReactor( cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger, ) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } evReactorShim, evReactor, evPool, err := createEvidenceReactor( cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, ) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } // make block executor for consensus and blockchain reactors to execute blocks @@ -287,7 +318,9 @@ func makeNode(cfg *config.Config, peerManager, router, blockSync && !stateSync, nodeMetrics.consensus, ) if err != nil { - return nil, fmt.Errorf("could not create blockchain reactor: %w", err) + return nil, combineCloseError( + fmt.Errorf("could not create blockchain reactor: %w", err), + makeCloser(closers)) } // Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first. @@ -305,6 +338,7 @@ func makeNode(cfg *config.Config, ssLogger := logger.With("module", "statesync") ssReactorShim := p2p.NewReactorShim(ssLogger, "StateSyncShim", statesync.ChannelShims) channels := makeChannelsFromShims(router, statesync.ChannelShims) + peerUpdates := peerManager.Subscribe() stateSyncReactor := statesync.NewReactor( genDoc.ChainID, @@ -354,7 +388,8 @@ func makeNode(cfg *config.Config, pexReactor, err = createPEXReactor(logger, peerManager, router) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } if cfg.RPC.PprofListenAddress != "" { @@ -387,6 +422,9 @@ func makeNode(cfg *config.Config, evidenceReactor: evReactor, indexerService: indexerService, eventBus: eventBus, + eventSinks: eventSinks, + + shutdownOps: makeCloser(closers), rpcEnv: &rpccore.Environment{ ProxyAppQuery: proxyApp.Query(), @@ -434,6 +472,7 @@ func makeSeedNode(cfg *config.Config, state, err := sm.MakeGenesisState(genDoc) if err != nil { return nil, err + } nodeInfo, err := makeSeedNodeInfo(cfg, nodeKey, genDoc, state) @@ -446,15 +485,19 @@ func makeSeedNode(cfg *config.Config, p2pLogger := logger.With("module", "p2p") transport := createTransport(p2pLogger, cfg) - peerManager, err := createPeerManager(cfg, dbProvider, nodeKey.ID) + peerManager, closer, err := createPeerManager(cfg, dbProvider, nodeKey.ID) if err != nil { - return nil, fmt.Errorf("failed to create peer manager: %w", err) + return nil, combineCloseError( + fmt.Errorf("failed to create peer manager: %w", err), + closer) } router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, peerManager, transport, getRouterConfig(cfg, nil)) if err != nil { - return nil, fmt.Errorf("failed to create router: %w", err) + return nil, combineCloseError( + fmt.Errorf("failed to create router: %w", err), + closer) } var pexReactor service.Service @@ -468,7 +511,8 @@ func makeSeedNode(cfg *config.Config, pexReactor, err = createPEXReactor(logger, peerManager, router) if err != nil { - return nil, err + return nil, combineCloseError(err, closer) + } if cfg.RPC.PprofListenAddress != "" { @@ -488,6 +532,8 @@ func makeSeedNode(cfg *config.Config, peerManager: peerManager, router: router, + shutdownOps: closer, + pexReactor: pexReactor, } node.BaseService = *service.NewBaseService(logger, "SeedNode", node) @@ -653,6 +699,12 @@ func (n *nodeImpl) OnStop() { } } + for _, es := range n.eventSinks { + if err := es.Stop(); err != nil { + n.Logger.Error("failed to stop event sink", "err", err) + } + } + if n.config.Mode != config.ModeSeed { // now stop the reactors if n.config.BlockSync.Enable { @@ -716,6 +768,10 @@ func (n *nodeImpl) OnStop() { // Error from closing listeners, or context timeout: n.Logger.Error("Prometheus HTTP server Shutdown", "err", err) } + + } + if err := n.shutdownOps(); err != nil { + n.Logger.Error("problem shutting down additional services", "err", err) } } diff --git a/node/node_test.go b/node/node_test.go index d5ea39aa6..7039b09e1 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -592,7 +592,7 @@ func TestNodeSetEventSink(t *testing.T) { cfg.TxIndex.Indexer = []string{"kvv"} ns, err := newDefaultNode(cfg, logger) assert.Nil(t, ns) - assert.Equal(t, errors.New("unsupported event sink type"), err) + assert.Contains(t, err.Error(), "unsupported event sink type") t.Cleanup(cleanup(ns)) cfg.TxIndex.Indexer = []string{} @@ -604,7 +604,7 @@ func TestNodeSetEventSink(t *testing.T) { cfg.TxIndex.Indexer = []string{"psql"} ns, err = newDefaultNode(cfg, logger) assert.Nil(t, ns) - assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err) + assert.Contains(t, err.Error(), "the psql connection settings cannot be empty") t.Cleanup(cleanup(ns)) var psqlConn = "test" @@ -646,14 +646,14 @@ func TestNodeSetEventSink(t *testing.T) { cfg.TxIndex.PsqlConn = psqlConn ns, err = newDefaultNode(cfg, logger) require.Error(t, err) - assert.Equal(t, e, err) + assert.Contains(t, err.Error(), e.Error()) t.Cleanup(cleanup(ns)) cfg.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"} cfg.TxIndex.PsqlConn = psqlConn ns, err = newDefaultNode(cfg, logger) require.Error(t, err) - assert.Equal(t, e, err) + assert.Contains(t, err.Error(), e.Error()) t.Cleanup(cleanup(ns)) } diff --git a/node/setup.go b/node/setup.go index f6b9c028d..ac36b0858 100644 --- a/node/setup.go +++ b/node/setup.go @@ -2,7 +2,9 @@ package node import ( "bytes" + "errors" "fmt" + "strings" "time" dbm "github.com/tendermint/tm-db" @@ -35,16 +37,57 @@ import ( _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port ) -func initDBs(cfg *config.Config, dbProvider config.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { //nolint:lll - var blockStoreDB dbm.DB - blockStoreDB, err = dbProvider(&config.DBContext{ID: "blockstore", Config: cfg}) +type closer func() error + +func makeCloser(cs []closer) closer { + return func() error { + errs := make([]string, 0, len(cs)) + for _, cl := range cs { + if err := cl(); err != nil { + errs = append(errs, err.Error()) + } + } + if len(errs) >= 0 { + return errors.New(strings.Join(errs, "; ")) + } + return nil + } +} + +func combineCloseError(err error, cl closer) error { + if err == nil { + return cl() + } + + clerr := cl() + if clerr == nil { + return err + } + + return fmt.Errorf("error=%q closerError=%q", err.Error(), clerr.Error()) +} + +func initDBs( + cfg *config.Config, + dbProvider config.DBProvider, +) (*store.BlockStore, dbm.DB, closer, error) { + + blockStoreDB, err := dbProvider(&config.DBContext{ID: "blockstore", Config: cfg}) if err != nil { - return + return nil, nil, func() error { return nil }, err } - blockStore = store.NewBlockStore(blockStoreDB) + closers := []closer{} + blockStore := store.NewBlockStore(blockStoreDB) + closers = append(closers, blockStoreDB.Close) - stateDB, err = dbProvider(&config.DBContext{ID: "state", Config: cfg}) - return + stateDB, err := dbProvider(&config.DBContext{ID: "state", Config: cfg}) + if err != nil { + return nil, nil, makeCloser(closers), err + } + + closers = append(closers, stateDB.Close) + + return blockStore, stateDB, makeCloser(closers), nil } func createAndStartProxyAppConns(clientCreator abciclient.Creator, logger log.Logger) (proxy.AppConns, error) { @@ -354,7 +397,7 @@ func createPeerManager( cfg *config.Config, dbProvider config.DBProvider, nodeID types.NodeID, -) (*p2p.PeerManager, error) { +) (*p2p.PeerManager, closer, error) { var maxConns uint16 @@ -385,7 +428,7 @@ func createPeerManager( for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " ") { address, err := p2p.ParseNodeAddress(p) if err != nil { - return nil, fmt.Errorf("invalid peer address %q: %w", p, err) + return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err) } peers = append(peers, address) @@ -395,28 +438,28 @@ func createPeerManager( for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.BootstrapPeers, ",", " ") { address, err := p2p.ParseNodeAddress(p) if err != nil { - return nil, fmt.Errorf("invalid peer address %q: %w", p, err) + return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err) } peers = append(peers, address) } peerDB, err := dbProvider(&config.DBContext{ID: "peerstore", Config: cfg}) if err != nil { - return nil, err + return nil, func() error { return nil }, err } peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options) if err != nil { - return nil, fmt.Errorf("failed to create peer manager: %w", err) + return nil, peerDB.Close, fmt.Errorf("failed to create peer manager: %w", err) } for _, peer := range peers { if _, err := peerManager.Add(peer); err != nil { - return nil, fmt.Errorf("failed to add peer %q: %w", peer, err) + return nil, peerDB.Close, fmt.Errorf("failed to add peer %q: %w", peer, err) } } - return peerManager, nil + return peerManager, peerDB.Close, nil } func createRouter(