Browse Source

node: always close database engine (#7113)

pull/7114/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
4781d04d18
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 153 additions and 52 deletions
  1. +7
    -5
      internal/state/indexer/sink/kv/kv.go
  2. +85
    -29
      node/node.go
  3. +4
    -4
      node/node_test.go
  4. +57
    -14
      node/setup.go

+ 7
- 5
internal/state/indexer/sink/kv/kv.go View File

@ -18,14 +18,16 @@ var _ indexer.EventSink = (*EventSink)(nil)
// The EventSink is an aggregator for redirecting the call path of the tx/block kvIndexer.
// For the implementation details please see the kv.go in the indexer/block and indexer/tx folder.
type EventSink struct {
txi *kvt.TxIndex
bi *kvb.BlockerIndexer
txi *kvt.TxIndex
bi *kvb.BlockerIndexer
store dbm.DB
}
func NewEventSink(store dbm.DB) indexer.EventSink {
return &EventSink{
txi: kvt.NewTxIndex(store),
bi: kvb.New(store),
txi: kvt.NewTxIndex(store),
bi: kvb.New(store),
store: store,
}
}
@ -58,5 +60,5 @@ func (kves *EventSink) HasBlock(h int64) (bool, error) {
}
func (kves *EventSink) Stop() error {
return nil
return kves.store.Close()
}

+ 85
- 29
node/node.go View File

@ -23,6 +23,7 @@ import (
"github.com/tendermint/tendermint/internal/proxy"
rpccore "github.com/tendermint/tendermint/internal/rpc/core"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/statesync"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
@ -62,6 +63,7 @@ type nodeImpl struct {
// services
eventBus *types.EventBus // pub/sub for services
eventSinks []indexer.EventSink
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor service.Service // for block-syncing
@ -73,6 +75,7 @@ type nodeImpl struct {
pexReactor service.Service // for exchanging peer addresses
evidenceReactor service.Service
rpcListeners []net.Listener // rpc servers
shutdownOps closer
indexerService service.Service
rpcEnv *rpccore.Environment
prometheusSrv *http.Server
@ -106,6 +109,7 @@ func newDefaultNode(cfg *config.Config, logger log.Logger) (service.Service, err
}
appClient, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
return makeNode(cfg,
pval,
nodeKey,
@ -123,33 +127,41 @@ func makeNode(cfg *config.Config,
clientCreator abciclient.Creator,
genesisDocProvider genesisDocProvider,
dbProvider config.DBProvider,
logger log.Logger) (service.Service, error) {
logger log.Logger,
) (service.Service, error) {
closers := []closer{}
blockStore, stateDB, err := initDBs(cfg, dbProvider)
blockStore, stateDB, dbCloser, err := initDBs(cfg, dbProvider)
if err != nil {
return nil, err
return nil, combineCloseError(err, dbCloser)
}
closers = append(closers, dbCloser)
stateStore := sm.NewStore(stateDB)
genDoc, err := genesisDocProvider()
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
err = genDoc.ValidateAndComplete()
if err != nil {
return nil, fmt.Errorf("error in genesis doc: %w", err)
return nil, combineCloseError(
fmt.Errorf("error in genesis doc: %w", err),
makeCloser(closers))
}
state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
// EventBus and IndexerService must be started before the handshake because
@ -158,12 +170,13 @@ func makeNode(cfg *config.Config,
// but before it indexed the txs, or, endblocker panicked)
eventBus, err := createAndStartEventBus(logger)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
// If an address is provided, listen on the socket for a connection from an
@ -175,12 +188,16 @@ func makeNode(cfg *config.Config,
case "grpc":
privValidator, err = createAndStartPrivValidatorGRPCClient(cfg, genDoc.ChainID, logger)
if err != nil {
return nil, fmt.Errorf("error with private validator grpc client: %w", err)
return nil, combineCloseError(
fmt.Errorf("error with private validator grpc client: %w", err),
makeCloser(closers))
}
default:
privValidator, err = createAndStartPrivValidatorSocketClient(cfg.PrivValidator.ListenAddr, genDoc.ChainID, logger)
if err != nil {
return nil, fmt.Errorf("error with private validator socket client: %w", err)
return nil, combineCloseError(
fmt.Errorf("error with private validator socket client: %w", err),
makeCloser(closers))
}
}
}
@ -188,10 +205,14 @@ func makeNode(cfg *config.Config,
if cfg.Mode == config.ModeValidator {
pubKey, err = privValidator.GetPubKey(context.TODO())
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
return nil, combineCloseError(fmt.Errorf("can't get pubkey: %w", err),
makeCloser(closers))
}
if pubKey == nil {
return nil, errors.New("could not retrieve public key from private validator")
return nil, combineCloseError(
errors.New("could not retrieve public key from private validator"),
makeCloser(closers))
}
}
@ -207,7 +228,8 @@ func makeNode(cfg *config.Config,
consensusLogger := logger.With("module", "consensus")
if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
// Reload the state. It will have the Version.Consensus.App set by the
@ -215,7 +237,9 @@ func makeNode(cfg *config.Config,
// what happened during block replay).
state, err = stateStore.Load()
if err != nil {
return nil, fmt.Errorf("cannot load state: %w", err)
return nil, combineCloseError(
fmt.Errorf("cannot load state: %w", err),
makeCloser(closers))
}
}
@ -229,38 +253,45 @@ func makeNode(cfg *config.Config,
// TODO: Use a persistent peer database.
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, cfg)
peerManager, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
closers = append(closers, peerCloser)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
return nil, combineCloseError(
fmt.Errorf("failed to create peer manager: %w", err),
makeCloser(closers))
}
nodeMetrics :=
defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey,
peerManager, transport, getRouterConfig(cfg, proxyApp))
if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err)
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
makeCloser(closers))
}
mpReactorShim, mpReactor, mp, err := createMempoolReactor(
cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger,
)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
evReactorShim, evReactor, evPool, err := createEvidenceReactor(
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger,
)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
// make block executor for consensus and blockchain reactors to execute blocks
@ -287,7 +318,9 @@ func makeNode(cfg *config.Config,
peerManager, router, blockSync && !stateSync, nodeMetrics.consensus,
)
if err != nil {
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
return nil, combineCloseError(
fmt.Errorf("could not create blockchain reactor: %w", err),
makeCloser(closers))
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
@ -305,6 +338,7 @@ func makeNode(cfg *config.Config,
ssLogger := logger.With("module", "statesync")
ssReactorShim := p2p.NewReactorShim(ssLogger, "StateSyncShim", statesync.ChannelShims)
channels := makeChannelsFromShims(router, statesync.ChannelShims)
peerUpdates := peerManager.Subscribe()
stateSyncReactor := statesync.NewReactor(
genDoc.ChainID,
@ -354,7 +388,8 @@ func makeNode(cfg *config.Config,
pexReactor, err = createPEXReactor(logger, peerManager, router)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
if cfg.RPC.PprofListenAddress != "" {
@ -387,6 +422,9 @@ func makeNode(cfg *config.Config,
evidenceReactor: evReactor,
indexerService: indexerService,
eventBus: eventBus,
eventSinks: eventSinks,
shutdownOps: makeCloser(closers),
rpcEnv: &rpccore.Environment{
ProxyAppQuery: proxyApp.Query(),
@ -434,6 +472,7 @@ func makeSeedNode(cfg *config.Config,
state, err := sm.MakeGenesisState(genDoc)
if err != nil {
return nil, err
}
nodeInfo, err := makeSeedNodeInfo(cfg, nodeKey, genDoc, state)
@ -446,15 +485,19 @@ func makeSeedNode(cfg *config.Config,
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, cfg)
peerManager, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
peerManager, closer, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
return nil, combineCloseError(
fmt.Errorf("failed to create peer manager: %w", err),
closer)
}
router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey,
peerManager, transport, getRouterConfig(cfg, nil))
if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err)
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
closer)
}
var pexReactor service.Service
@ -468,7 +511,8 @@ func makeSeedNode(cfg *config.Config,
pexReactor, err = createPEXReactor(logger, peerManager, router)
if err != nil {
return nil, err
return nil, combineCloseError(err, closer)
}
if cfg.RPC.PprofListenAddress != "" {
@ -488,6 +532,8 @@ func makeSeedNode(cfg *config.Config,
peerManager: peerManager,
router: router,
shutdownOps: closer,
pexReactor: pexReactor,
}
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)
@ -653,6 +699,12 @@ func (n *nodeImpl) OnStop() {
}
}
for _, es := range n.eventSinks {
if err := es.Stop(); err != nil {
n.Logger.Error("failed to stop event sink", "err", err)
}
}
if n.config.Mode != config.ModeSeed {
// now stop the reactors
if n.config.BlockSync.Enable {
@ -716,6 +768,10 @@ func (n *nodeImpl) OnStop() {
// Error from closing listeners, or context timeout:
n.Logger.Error("Prometheus HTTP server Shutdown", "err", err)
}
}
if err := n.shutdownOps(); err != nil {
n.Logger.Error("problem shutting down additional services", "err", err)
}
}


+ 4
- 4
node/node_test.go View File

@ -592,7 +592,7 @@ func TestNodeSetEventSink(t *testing.T) {
cfg.TxIndex.Indexer = []string{"kvv"}
ns, err := newDefaultNode(cfg, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("unsupported event sink type"), err)
assert.Contains(t, err.Error(), "unsupported event sink type")
t.Cleanup(cleanup(ns))
cfg.TxIndex.Indexer = []string{}
@ -604,7 +604,7 @@ func TestNodeSetEventSink(t *testing.T) {
cfg.TxIndex.Indexer = []string{"psql"}
ns, err = newDefaultNode(cfg, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)
assert.Contains(t, err.Error(), "the psql connection settings cannot be empty")
t.Cleanup(cleanup(ns))
var psqlConn = "test"
@ -646,14 +646,14 @@ func TestNodeSetEventSink(t *testing.T) {
cfg.TxIndex.PsqlConn = psqlConn
ns, err = newDefaultNode(cfg, logger)
require.Error(t, err)
assert.Equal(t, e, err)
assert.Contains(t, err.Error(), e.Error())
t.Cleanup(cleanup(ns))
cfg.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
cfg.TxIndex.PsqlConn = psqlConn
ns, err = newDefaultNode(cfg, logger)
require.Error(t, err)
assert.Equal(t, e, err)
assert.Contains(t, err.Error(), e.Error())
t.Cleanup(cleanup(ns))
}


+ 57
- 14
node/setup.go View File

@ -2,7 +2,9 @@ package node
import (
"bytes"
"errors"
"fmt"
"strings"
"time"
dbm "github.com/tendermint/tm-db"
@ -35,16 +37,57 @@ import (
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
)
func initDBs(cfg *config.Config, dbProvider config.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { //nolint:lll
var blockStoreDB dbm.DB
blockStoreDB, err = dbProvider(&config.DBContext{ID: "blockstore", Config: cfg})
type closer func() error
func makeCloser(cs []closer) closer {
return func() error {
errs := make([]string, 0, len(cs))
for _, cl := range cs {
if err := cl(); err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) >= 0 {
return errors.New(strings.Join(errs, "; "))
}
return nil
}
}
func combineCloseError(err error, cl closer) error {
if err == nil {
return cl()
}
clerr := cl()
if clerr == nil {
return err
}
return fmt.Errorf("error=%q closerError=%q", err.Error(), clerr.Error())
}
func initDBs(
cfg *config.Config,
dbProvider config.DBProvider,
) (*store.BlockStore, dbm.DB, closer, error) {
blockStoreDB, err := dbProvider(&config.DBContext{ID: "blockstore", Config: cfg})
if err != nil {
return
return nil, nil, func() error { return nil }, err
}
blockStore = store.NewBlockStore(blockStoreDB)
closers := []closer{}
blockStore := store.NewBlockStore(blockStoreDB)
closers = append(closers, blockStoreDB.Close)
stateDB, err = dbProvider(&config.DBContext{ID: "state", Config: cfg})
return
stateDB, err := dbProvider(&config.DBContext{ID: "state", Config: cfg})
if err != nil {
return nil, nil, makeCloser(closers), err
}
closers = append(closers, stateDB.Close)
return blockStore, stateDB, makeCloser(closers), nil
}
func createAndStartProxyAppConns(clientCreator abciclient.Creator, logger log.Logger) (proxy.AppConns, error) {
@ -354,7 +397,7 @@ func createPeerManager(
cfg *config.Config,
dbProvider config.DBProvider,
nodeID types.NodeID,
) (*p2p.PeerManager, error) {
) (*p2p.PeerManager, closer, error) {
var maxConns uint16
@ -385,7 +428,7 @@ func createPeerManager(
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)
}
peers = append(peers, address)
@ -395,28 +438,28 @@ func createPeerManager(
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.BootstrapPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)
}
peers = append(peers, address)
}
peerDB, err := dbProvider(&config.DBContext{ID: "peerstore", Config: cfg})
if err != nil {
return nil, err
return nil, func() error { return nil }, err
}
peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
return nil, peerDB.Close, fmt.Errorf("failed to create peer manager: %w", err)
}
for _, peer := range peers {
if _, err := peerManager.Add(peer); err != nil {
return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
return nil, peerDB.Close, fmt.Errorf("failed to add peer %q: %w", peer, err)
}
}
return peerManager, nil
return peerManager, peerDB.Close, nil
}
func createRouter(


Loading…
Cancel
Save