From c320eb14079e0091ebef56458ceee4ead9a75b61 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Wed, 12 May 2021 11:11:24 +0200 Subject: [PATCH] split out initializers into setup.go (#6446) --- node/node.go | 1462 +++++++++++++------------------------------------ node/setup.go | 751 +++++++++++++++++++++++++ 2 files changed, 1120 insertions(+), 1093 deletions(-) create mode 100644 node/setup.go diff --git a/node/node.go b/node/node.go index 10322333c..3cb50cb54 100644 --- a/node/node.go +++ b/node/node.go @@ -1,11 +1,9 @@ package node import ( - "bytes" "context" "errors" "fmt" - "math" "net" "net/http" _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port @@ -18,8 +16,6 @@ import ( dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" - bcv0 "github.com/tendermint/tendermint/blockchain/v0" - bcv2 "github.com/tendermint/tendermint/blockchain/v2" cfg "github.com/tendermint/tendermint/config" cs "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" @@ -36,971 +32,100 @@ import ( "github.com/tendermint/tendermint/p2p/pex" "github.com/tendermint/tendermint/privval" tmgrpc "github.com/tendermint/tendermint/privval/grpc" - protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p" "github.com/tendermint/tendermint/proxy" rpccore "github.com/tendermint/tendermint/rpc/core" grpccore "github.com/tendermint/tendermint/rpc/grpc" rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" - blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" - blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null" - "github.com/tendermint/tendermint/state/indexer/tx/kv" - "github.com/tendermint/tendermint/state/indexer/tx/null" "github.com/tendermint/tendermint/statesync" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" - "github.com/tendermint/tendermint/version" ) -// DBContext specifies config information for loading a new DB. -type DBContext struct { - ID string - Config *cfg.Config -} - -// DBProvider takes a DBContext and returns an instantiated DB. -type DBProvider func(*DBContext) (dbm.DB, error) - -// DefaultDBProvider returns a database using the DBBackend and DBDir -// specified in the ctx.Config. -func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) { - dbType := dbm.BackendType(ctx.Config.DBBackend) - return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()) -} - -// GenesisDocProvider returns a GenesisDoc. -// It allows the GenesisDoc to be pulled from sources other than the -// filesystem, for instance from a distributed key-value store cluster. -type GenesisDocProvider func() (*types.GenesisDoc, error) - -// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads -// the GenesisDoc from the config.GenesisFile() on the filesystem. -func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider { - return func() (*types.GenesisDoc, error) { - return types.GenesisDocFromFile(config.GenesisFile()) - } -} - -// Provider takes a config and a logger and returns a ready to go Node. -type Provider func(*cfg.Config, log.Logger) (*Node, error) - -// DefaultNewNode returns a Tendermint node with default settings for the -// PrivValidator, ClientCreator, GenesisDoc, and DBProvider. -// It implements NodeProvider. -func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { - nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) - if err != nil { - return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err) - } - if config.Mode == cfg.ModeSeed { - return NewSeedNode(config, - DefaultDBProvider, - nodeKey, - DefaultGenesisDocProviderFunc(config), - logger, - ) - } - - var pval *privval.FilePV - if config.Mode == cfg.ModeValidator { - pval, err = privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()) - if err != nil { - return nil, err - } - } else { - pval = nil - } - - appClient, _ := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()) - return NewNode(config, - pval, - nodeKey, - appClient, - DefaultGenesisDocProviderFunc(config), - DefaultDBProvider, - DefaultMetricsProvider(config.Instrumentation), - logger, - ) -} - -// MetricsProvider returns a consensus, p2p and mempool Metrics. -type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) - -// DefaultMetricsProvider returns Metrics build using Prometheus client library -// if Prometheus is enabled. Otherwise, it returns no-op Metrics. -func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { - return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) { - if config.Prometheus { - return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), - p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), - mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID), - sm.PrometheusMetrics(config.Namespace, "chain_id", chainID) - } - return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics() - } -} - -// Option sets a parameter for the node. -type Option func(*Node) - -// Temporary interface for switching to fast sync, we should get rid of v0. -// See: https://github.com/tendermint/tendermint/issues/4595 -type fastSyncReactor interface { - SwitchToFastSync(sm.State) error -} - -// CustomReactors allows you to add custom reactors (name -> p2p.Reactor) to -// the node's Switch. -// -// WARNING: using any name from the below list of the existing reactors will -// result in replacing it with the custom one. -// -// - MEMPOOL -// - BLOCKCHAIN -// - CONSENSUS -// - EVIDENCE -// - PEX -// - STATESYNC -func CustomReactors(reactors map[string]p2p.Reactor) Option { - return func(n *Node) { - for name, reactor := range reactors { - if existingReactor := n.sw.Reactor(name); existingReactor != nil { - n.sw.Logger.Info("Replacing existing reactor with a custom one", - "name", name, "existing", existingReactor, "custom", reactor) - n.sw.RemoveReactor(name, existingReactor) - } - n.sw.AddReactor(name, reactor) - } - } -} - -// StateProvider overrides the state provider used by state sync to retrieve trusted app hashes and -// build a State object for bootstrapping the node. -// WARNING: this interface is considered unstable and subject to change. -func StateProvider(stateProvider statesync.StateProvider) Option { - return func(n *Node) { - n.stateSyncProvider = stateProvider - } -} - -//------------------------------------------------------------------------------ - -// Node is the highest level interface to a full Tendermint node. -// It includes all configuration information and running services. -type Node struct { - service.BaseService - - // config - config *cfg.Config - genesisDoc *types.GenesisDoc // initial validator set - privValidator types.PrivValidator // local node's validator key - - // network - transport *p2p.MConnTransport - sw *p2p.Switch // p2p connections - peerManager *p2p.PeerManager - router *p2p.Router - addrBook pex.AddrBook // known peers - nodeInfo p2p.NodeInfo - nodeKey p2p.NodeKey // our node privkey - isListening bool - - // services - eventBus *types.EventBus // pub/sub for services - stateStore sm.Store - blockStore *store.BlockStore // store the blockchain to disk - bcReactor service.Service // for fast-syncing - mempoolReactor *mempl.Reactor // for gossipping transactions - mempool mempl.Mempool - stateSync bool // whether the node should state sync on startup - stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots - stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node - stateSyncGenesis sm.State // provides the genesis state for state sync - consensusState *cs.State // latest consensus state - consensusReactor *cs.Reactor // for participating in the consensus - pexReactor *pex.Reactor // for exchanging peer addresses - pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses - evidenceReactor *evidence.Reactor - evidencePool *evidence.Pool // tracking evidence - proxyApp proxy.AppConns // connection to the application - rpcListeners []net.Listener // rpc servers - txIndexer indexer.TxIndexer - blockIndexer indexer.BlockIndexer - indexerService *indexer.Service - prometheusSrv *http.Server -} - -func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { - var blockStoreDB dbm.DB - blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) - if err != nil { - return - } - blockStore = store.NewBlockStore(blockStoreDB) - - stateDB, err = dbProvider(&DBContext{"state", config}) - return -} - -func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) { - proxyApp := proxy.NewAppConns(clientCreator) - proxyApp.SetLogger(logger.With("module", "proxy")) - if err := proxyApp.Start(); err != nil { - return nil, fmt.Errorf("error starting proxy app connections: %v", err) - } - return proxyApp, nil -} - -func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { - eventBus := types.NewEventBus() - eventBus.SetLogger(logger.With("module", "events")) - if err := eventBus.Start(); err != nil { - return nil, err - } - return eventBus, nil -} - -func createAndStartIndexerService( - config *cfg.Config, - dbProvider DBProvider, - eventBus *types.EventBus, - logger log.Logger, -) (*indexer.Service, indexer.TxIndexer, indexer.BlockIndexer, error) { - - var ( - txIndexer indexer.TxIndexer - blockIndexer indexer.BlockIndexer - ) - - switch config.TxIndex.Indexer { - case "kv": - store, err := dbProvider(&DBContext{"tx_index", config}) - if err != nil { - return nil, nil, nil, err - } - - txIndexer = kv.NewTxIndex(store) - blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))) - default: - txIndexer = &null.TxIndex{} - blockIndexer = &blockidxnull.BlockerIndexer{} - } - - indexerService := indexer.NewIndexerService(txIndexer, blockIndexer, eventBus) - indexerService.SetLogger(logger.With("module", "txindex")) - - if err := indexerService.Start(); err != nil { - return nil, nil, nil, err - } - - return indexerService, txIndexer, blockIndexer, nil -} - -func doHandshake( - stateStore sm.Store, - state sm.State, - blockStore sm.BlockStore, - genDoc *types.GenesisDoc, - eventBus types.BlockEventPublisher, - proxyApp proxy.AppConns, - consensusLogger log.Logger) error { - - handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc) - handshaker.SetLogger(consensusLogger) - handshaker.SetEventBus(eventBus) - if err := handshaker.Handshake(proxyApp); err != nil { - return fmt.Errorf("error during handshake: %v", err) - } - return nil -} - -func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) { - // Log the version info. - logger.Info("Version info", - "software", version.TMCoreSemVer, - "block", version.BlockProtocol, - "p2p", version.P2PProtocol, - "mode", mode, - ) - - // If the state and software differ in block version, at least log it. - if state.Version.Consensus.Block != version.BlockProtocol { - logger.Info("Software and state have different block protocols", - "software", version.BlockProtocol, - "state", state.Version.Consensus.Block, - ) - } - switch { - case mode == cfg.ModeFull: - consensusLogger.Info("This node is a fullnode") - case mode == cfg.ModeValidator: - addr := pubKey.Address() - // Log whether this node is a validator or an observer - if state.Validators.HasAddress(addr) { - consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes()) - } else { - consensusLogger.Info("This node is a validator (NOT in the active validator set)", - "addr", addr, "pubKey", pubKey.Bytes()) - } - } -} - -func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { - if state.Validators.Size() > 1 { - return false - } - addr, _ := state.Validators.GetByIndex(0) - return pubKey != nil && bytes.Equal(pubKey.Address(), addr) -} - -func createMempoolReactor( - config *cfg.Config, - proxyApp proxy.AppConns, - state sm.State, - memplMetrics *mempl.Metrics, - peerManager *p2p.PeerManager, - router *p2p.Router, - logger log.Logger, -) (*p2p.ReactorShim, *mempl.Reactor, *mempl.CListMempool) { - - logger = logger.With("module", "mempool") - mempool := mempl.NewCListMempool( - config.Mempool, - proxyApp.Mempool(), - state.LastBlockHeight, - mempl.WithMetrics(memplMetrics), - mempl.WithPreCheck(sm.TxPreCheck(state)), - mempl.WithPostCheck(sm.TxPostCheck(state)), - ) - - mempool.SetLogger(logger) - - channelShims := mempl.GetChannelShims(config.Mempool) - reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims) - - var ( - channels map[p2p.ChannelID]*p2p.Channel - peerUpdates *p2p.PeerUpdates - ) - - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, channelShims) - peerUpdates = peerManager.Subscribe() - } else { - channels = getChannelsFromShim(reactorShim) - peerUpdates = reactorShim.PeerUpdates - } - - reactor := mempl.NewReactor( - logger, - config.Mempool, - peerManager, - mempool, - channels[mempl.MempoolChannel], - peerUpdates, - ) - - if config.Consensus.WaitForTxs() { - mempool.EnableTxsAvailable() - } - - return reactorShim, reactor, mempool -} - -func createEvidenceReactor( - config *cfg.Config, - dbProvider DBProvider, - stateDB dbm.DB, - blockStore *store.BlockStore, - peerManager *p2p.PeerManager, - router *p2p.Router, - logger log.Logger, -) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) { - evidenceDB, err := dbProvider(&DBContext{"evidence", config}) - if err != nil { - return nil, nil, nil, err - } - - logger = logger.With("module", "evidence") - reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims) - - evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore) - if err != nil { - return nil, nil, nil, err - } - - var ( - channels map[p2p.ChannelID]*p2p.Channel - peerUpdates *p2p.PeerUpdates - ) - - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, evidence.ChannelShims) - peerUpdates = peerManager.Subscribe() - } else { - channels = getChannelsFromShim(reactorShim) - peerUpdates = reactorShim.PeerUpdates - } - - evidenceReactor := evidence.NewReactor( - logger, - channels[evidence.EvidenceChannel], - peerUpdates, - evidencePool, - ) - - return reactorShim, evidenceReactor, evidencePool, nil -} - -func createBlockchainReactor( - logger log.Logger, - config *cfg.Config, - state sm.State, - blockExec *sm.BlockExecutor, - blockStore *store.BlockStore, - csReactor *cs.Reactor, - peerManager *p2p.PeerManager, - router *p2p.Router, - fastSync bool, -) (*p2p.ReactorShim, service.Service, error) { - - logger = logger.With("module", "blockchain") - - switch config.FastSync.Version { - case cfg.BlockchainV0: - reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims) - - var ( - channels map[p2p.ChannelID]*p2p.Channel - peerUpdates *p2p.PeerUpdates - ) - - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, bcv0.ChannelShims) - peerUpdates = peerManager.Subscribe() - } else { - channels = getChannelsFromShim(reactorShim) - peerUpdates = reactorShim.PeerUpdates - } - - reactor, err := bcv0.NewReactor( - logger, state.Copy(), blockExec, blockStore, csReactor, - channels[bcv0.BlockchainChannel], peerUpdates, fastSync, - ) - if err != nil { - return nil, nil, err - } - - return reactorShim, reactor, nil - - case cfg.BlockchainV2: - reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - reactor.SetLogger(logger) - - return nil, reactor, nil - - default: - return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) - } -} - -func createConsensusReactor( - config *cfg.Config, - state sm.State, - blockExec *sm.BlockExecutor, - blockStore sm.BlockStore, - mempool *mempl.CListMempool, - evidencePool *evidence.Pool, - privValidator types.PrivValidator, - csMetrics *cs.Metrics, - waitSync bool, - eventBus *types.EventBus, - peerManager *p2p.PeerManager, - router *p2p.Router, - logger log.Logger, -) (*p2p.ReactorShim, *cs.Reactor, *cs.State) { - - consensusState := cs.NewState( - config.Consensus, - state.Copy(), - blockExec, - blockStore, - mempool, - evidencePool, - cs.StateMetrics(csMetrics), - ) - consensusState.SetLogger(logger) - if privValidator != nil && config.Mode == cfg.ModeValidator { - consensusState.SetPrivValidator(privValidator) - } - - reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims) - - var ( - channels map[p2p.ChannelID]*p2p.Channel - peerUpdates *p2p.PeerUpdates - ) - - if config.P2P.DisableLegacy { - channels = makeChannelsFromShims(router, cs.ChannelShims) - peerUpdates = peerManager.Subscribe() - } else { - channels = getChannelsFromShim(reactorShim) - peerUpdates = reactorShim.PeerUpdates - } - - reactor := cs.NewReactor( - logger, - consensusState, - channels[cs.StateChannel], - channels[cs.DataChannel], - channels[cs.VoteChannel], - channels[cs.VoteSetBitsChannel], - peerUpdates, - waitSync, - cs.ReactorMetrics(csMetrics), - ) - - // Services which will be publishing and/or subscribing for messages (events) - // consensusReactor will set it on consensusState and blockExecutor. - reactor.SetEventBus(eventBus) - - return reactorShim, reactor, consensusState -} - -func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport { - return p2p.NewMConnTransport( - logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{}, - p2p.MConnTransportOptions{ - MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers + - len(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")), - ), - }, - ) -} - -func createPeerManager( - config *cfg.Config, - dbProvider DBProvider, - p2pLogger log.Logger, - nodeID p2p.NodeID, -) (*p2p.PeerManager, error) { - - var maxConns uint16 - - switch { - case config.P2P.MaxConnections > 0: - maxConns = config.P2P.MaxConnections - - case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0: - x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers - if x > math.MaxUint16 { - return nil, fmt.Errorf( - "max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)", - config.P2P.MaxNumInboundPeers, - config.P2P.MaxNumOutboundPeers, - math.MaxUint16, - ) - } - - maxConns = uint16(x) - - default: - maxConns = 64 - } - - privatePeerIDs := make(map[p2p.NodeID]struct{}) - for _, id := range strings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") { - privatePeerIDs[p2p.NodeID(id)] = struct{}{} - } - - options := p2p.PeerManagerOptions{ - MaxConnected: maxConns, - MaxConnectedUpgrade: 4, - MaxPeers: 1000, - MinRetryTime: 100 * time.Millisecond, - MaxRetryTime: 8 * time.Hour, - MaxRetryTimePersistent: 5 * time.Minute, - RetryTimeJitter: 3 * time.Second, - PrivatePeers: privatePeerIDs, - } - - peers := []p2p.NodeAddress{} - for _, p := range strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") { - address, err := p2p.ParseNodeAddress(p) - if err != nil { - return nil, fmt.Errorf("invalid peer address %q: %w", p, err) - } - - peers = append(peers, address) - options.PersistentPeers = append(options.PersistentPeers, address.NodeID) - } - - for _, p := range strings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") { - address, err := p2p.ParseNodeAddress(p) - if err != nil { - return nil, fmt.Errorf("invalid peer address %q: %w", p, err) - } - peers = append(peers, address) - } - - peerDB, err := dbProvider(&DBContext{"peerstore", config}) - if err != nil { - return nil, err - } - - peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options) - if err != nil { - return nil, fmt.Errorf("failed to create peer manager: %w", err) - } - - for _, peer := range peers { - if _, err := peerManager.Add(peer); err != nil { - return nil, fmt.Errorf("failed to add peer %q: %w", peer, err) - } - } - - return peerManager, nil -} - -func createRouter( - p2pLogger log.Logger, - p2pMetrics *p2p.Metrics, - nodeInfo p2p.NodeInfo, - privKey crypto.PrivKey, - peerManager *p2p.PeerManager, - transport p2p.Transport, - options p2p.RouterOptions, -) (*p2p.Router, error) { - - return p2p.NewRouter( - p2pLogger, - p2pMetrics, - nodeInfo, - privKey, - peerManager, - []p2p.Transport{transport}, - options, - ) -} - -func createSwitch( - config *cfg.Config, - transport p2p.Transport, - p2pMetrics *p2p.Metrics, - mempoolReactor *p2p.ReactorShim, - bcReactor p2p.Reactor, - stateSyncReactor *p2p.ReactorShim, - consensusReactor *p2p.ReactorShim, - evidenceReactor *p2p.ReactorShim, - proxyApp proxy.AppConns, - nodeInfo p2p.NodeInfo, - nodeKey p2p.NodeKey, - p2pLogger log.Logger, -) *p2p.Switch { - - var ( - connFilters = []p2p.ConnFilterFunc{} - peerFilters = []p2p.PeerFilterFunc{} - ) - - if !config.P2P.AllowDuplicateIP { - connFilters = append(connFilters, p2p.ConnDuplicateIPFilter) - } - - // Filter peers by addr or pubkey with an ABCI query. - // If the query return code is OK, add peer. - if config.FilterPeers { - connFilters = append( - connFilters, - // ABCI query for address filtering. - func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { - res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{ - Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), - }) - if err != nil { - return err - } - if res.IsErr() { - return fmt.Errorf("error querying abci app: %v", res) - } - - return nil - }, - ) - - peerFilters = append( - peerFilters, - // ABCI query for ID filtering. - func(_ p2p.IPeerSet, p p2p.Peer) error { - res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{ - Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), - }) - if err != nil { - return err - } - if res.IsErr() { - return fmt.Errorf("error querying abci app: %v", res) - } - - return nil - }, - ) - } - - sw := p2p.NewSwitch( - config.P2P, - transport, - p2p.WithMetrics(p2pMetrics), - p2p.SwitchPeerFilters(peerFilters...), - p2p.SwitchConnFilters(connFilters...), - ) - - sw.SetLogger(p2pLogger) - if config.Mode != cfg.ModeSeed { - sw.AddReactor("MEMPOOL", mempoolReactor) - sw.AddReactor("BLOCKCHAIN", bcReactor) - sw.AddReactor("CONSENSUS", consensusReactor) - sw.AddReactor("EVIDENCE", evidenceReactor) - sw.AddReactor("STATESYNC", stateSyncReactor) - } - - sw.SetNodeInfo(nodeInfo) - sw.SetNodeKey(nodeKey) - - p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile()) - return sw -} - -func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch, - p2pLogger log.Logger, nodeKey p2p.NodeKey) (pex.AddrBook, error) { - - addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) - addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) - - // Add ourselves to addrbook to prevent dialing ourselves - if config.P2P.ExternalAddress != "" { - addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ExternalAddress)) - if err != nil { - return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err) - } - addrBook.AddOurAddress(addr) - } - if config.P2P.ListenAddress != "" { - addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ListenAddress)) - if err != nil { - return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err) - } - addrBook.AddOurAddress(addr) - } - - sw.SetAddrBook(addrBook) - - return addrBook, nil -} - -func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, - sw *p2p.Switch, logger log.Logger) *pex.Reactor { - - reactorConfig := &pex.ReactorConfig{ - Seeds: strings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "), - SeedMode: config.Mode == cfg.ModeSeed, - // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000 - // blocks assuming 10s blocks ~ 28 hours. - // TODO (melekes): make it dynamic based on the actual block latencies - // from the live network. - // https://github.com/tendermint/tendermint/issues/3523 - SeedDisconnectWaitPeriod: 28 * time.Hour, - PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod, - } - // TODO persistent peers ? so we can have their DNS addrs saved - pexReactor := pex.NewReactor(addrBook, reactorConfig) - pexReactor.SetLogger(logger.With("module", "pex")) - sw.AddReactor("PEX", pexReactor) - return pexReactor -} - -func createPEXReactorV2( - config *cfg.Config, - logger log.Logger, - peerManager *p2p.PeerManager, - router *p2p.Router, -) (*pex.ReactorV2, error) { - - channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 4096) - if err != nil { - return nil, err - } - - peerUpdates := peerManager.Subscribe() - return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil -} - -// startStateSync starts an asynchronous state sync process, then switches to fast sync mode. -func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reactor, - stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, fastSync bool, - stateStore sm.Store, blockStore *store.BlockStore, state sm.State) error { - ssR.Logger.Info("Starting state sync") - - if stateProvider == nil { - var err error - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - stateProvider, err = statesync.NewLightClientStateProvider( - ctx, - state.ChainID, state.Version, state.InitialHeight, - config.RPCServers, light.TrustOptions{ - Period: config.TrustPeriod, - Height: config.TrustHeight, - Hash: config.TrustHashBytes(), - }, ssR.Logger.With("module", "light")) - if err != nil { - return fmt.Errorf("failed to set up light client state provider: %w", err) - } - } - - go func() { - state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime) - if err != nil { - ssR.Logger.Error("State sync failed", "err", err) - return - } - err = stateStore.Bootstrap(state) - if err != nil { - ssR.Logger.Error("Failed to bootstrap node with new state", "err", err) - return - } - err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit) - if err != nil { - ssR.Logger.Error("Failed to store last seen commit", "err", err) - return - } - - if fastSync { - // FIXME Very ugly to have these metrics bleed through here. - conR.Metrics.StateSyncing.Set(0) - conR.Metrics.FastSyncing.Set(1) - err = bcR.SwitchToFastSync(state) - if err != nil { - ssR.Logger.Error("Failed to switch to fast sync", "err", err) - return - } - } else { - conR.SwitchToConsensus(state, true) - } - }() - return nil -} - -// NewSeedNode returns a new seed node, containing only p2p, pex reactor -func NewSeedNode(config *cfg.Config, - dbProvider DBProvider, - nodeKey p2p.NodeKey, - genesisDocProvider GenesisDocProvider, - logger log.Logger, - options ...Option) (*Node, error) { - - genDoc, err := genesisDocProvider() - if err != nil { - return nil, err - } - - state, err := sm.MakeGenesisState(genDoc) - if err != nil { - return nil, err - } - - nodeInfo, err := makeSeedNodeInfo(config, nodeKey, genDoc, state) - if err != nil { - return nil, err - } - - // Setup Transport and Switch. - p2pMetrics := p2p.PrometheusMetrics(config.Instrumentation.Namespace, "chain_id", genDoc.ChainID) - p2pLogger := logger.With("module", "p2p") - transport := createTransport(p2pLogger, config) - sw := createSwitch( - config, transport, p2pMetrics, nil, nil, - nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger, - ) +// Node is the highest level interface to a full Tendermint node. +// It includes all configuration information and running services. +type Node struct { + service.BaseService - err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) - if err != nil { - return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err) - } + // config + config *cfg.Config + genesisDoc *types.GenesisDoc // initial validator set + privValidator types.PrivValidator // local node's validator key - err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) - if err != nil { - return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) - } + // network + transport *p2p.MConnTransport + sw *p2p.Switch // p2p connections + peerManager *p2p.PeerManager + router *p2p.Router + addrBook pex.AddrBook // known peers + nodeInfo p2p.NodeInfo + nodeKey p2p.NodeKey // our node privkey + isListening bool - addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) - if err != nil { - return nil, fmt.Errorf("could not create addrbook: %w", err) - } + // services + eventBus *types.EventBus // pub/sub for services + stateStore sm.Store + blockStore *store.BlockStore // store the blockchain to disk + bcReactor service.Service // for fast-syncing + mempoolReactor *mempl.Reactor // for gossipping transactions + mempool mempl.Mempool + stateSync bool // whether the node should state sync on startup + stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots + stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node + stateSyncGenesis sm.State // provides the genesis state for state sync + consensusState *cs.State // latest consensus state + consensusReactor *cs.Reactor // for participating in the consensus + pexReactor *pex.Reactor // for exchanging peer addresses + pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses + evidenceReactor *evidence.Reactor + evidencePool *evidence.Pool // tracking evidence + proxyApp proxy.AppConns // connection to the application + rpcListeners []net.Listener // rpc servers + txIndexer indexer.TxIndexer + blockIndexer indexer.BlockIndexer + indexerService *indexer.Service + prometheusSrv *http.Server +} - peerManager, err := createPeerManager(config, dbProvider, p2pLogger, nodeKey.ID) +// DefaultNewNode returns a Tendermint node with default settings for the +// PrivValidator, ClientCreator, GenesisDoc, and DBProvider. +// It implements NodeProvider. +func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { + nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) if err != nil { - return nil, fmt.Errorf("failed to create peer manager: %w", err) + return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err) } - - router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, - peerManager, transport, getRouterConfig(config, nil)) - if err != nil { - return nil, fmt.Errorf("failed to create router: %w", err) + if config.Mode == cfg.ModeSeed { + return NewSeedNode(config, + DefaultDBProvider, + nodeKey, + DefaultGenesisDocProviderFunc(config), + logger, + ) } - var ( - pexReactor *pex.Reactor - pexReactorV2 *pex.ReactorV2 - ) - - // add the pex reactor - // FIXME: we add channel descriptors to both the router and the transport but only the router - // should be aware of channel info. We should remove this from transport once the legacy - // p2p stack is removed. - pexCh := pex.ChannelDescriptor() - transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) - if config.P2P.DisableLegacy { - pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) + var pval *privval.FilePV + if config.Mode == cfg.ModeValidator { + pval, err = privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()) if err != nil { return nil, err } } else { - pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) - } - - if config.RPC.PprofListenAddress != "" { - go func() { - logger.Info("Starting pprof server", "laddr", config.RPC.PprofListenAddress) - logger.Error("pprof server error", "err", http.ListenAndServe(config.RPC.PprofListenAddress, nil)) - }() - } - - node := &Node{ - config: config, - genesisDoc: genDoc, - - transport: transport, - sw: sw, - addrBook: addrBook, - nodeInfo: nodeInfo, - nodeKey: nodeKey, - peerManager: peerManager, - router: router, - - pexReactor: pexReactor, - pexReactorV2: pexReactorV2, - } - node.BaseService = *service.NewBaseService(logger, "SeedNode", node) - - for _, option := range options { - option(node) + pval = nil } - return node, nil + appClient, _ := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()) + return NewNode(config, + pval, + nodeKey, + appClient, + DefaultGenesisDocProviderFunc(config), + DefaultDBProvider, + DefaultMetricsProvider(config.Instrumentation), + logger, + ) } // NewNode returns a new, ready to go, Tendermint Node. @@ -1213,25 +338,151 @@ func NewNode(config *cfg.Config, config.StateSync.TempDir, ) - // add the channel descriptors to both the transports - // FIXME: This should be removed when the legacy p2p stack is removed and - // transports can either be agnostic to channel descriptors or can be - // declared in the constructor. - transport.AddChannelDescriptors(mpReactorShim.GetChannels()) - transport.AddChannelDescriptors(bcReactorForSwitch.GetChannels()) - transport.AddChannelDescriptors(csReactorShim.GetChannels()) - transport.AddChannelDescriptors(evReactorShim.GetChannels()) - transport.AddChannelDescriptors(stateSyncReactorShim.GetChannels()) + // add the channel descriptors to both the transports + // FIXME: This should be removed when the legacy p2p stack is removed and + // transports can either be agnostic to channel descriptors or can be + // declared in the constructor. + transport.AddChannelDescriptors(mpReactorShim.GetChannels()) + transport.AddChannelDescriptors(bcReactorForSwitch.GetChannels()) + transport.AddChannelDescriptors(csReactorShim.GetChannels()) + transport.AddChannelDescriptors(evReactorShim.GetChannels()) + transport.AddChannelDescriptors(stateSyncReactorShim.GetChannels()) + + // setup Transport and Switch + sw := createSwitch( + config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, + stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, + ) + + err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err) + } + + err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) + } + + addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) + if err != nil { + return nil, fmt.Errorf("could not create addrbook: %w", err) + } + + // Optionally, start the pex reactor + // + // TODO: + // + // We need to set Seeds and PersistentPeers on the switch, + // since it needs to be able to use these (and their DNS names) + // even if the PEX is off. We can include the DNS name in the NetAddress, + // but it would still be nice to have a clear list of the current "PersistentPeers" + // somewhere that we can return with net_info. + // + // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. + // Note we currently use the addrBook regardless at least for AddOurAddress + var ( + pexReactor *pex.Reactor + pexReactorV2 *pex.ReactorV2 + ) + + if config.P2P.PexReactor { + pexCh := pex.ChannelDescriptor() + transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) + if config.P2P.DisableLegacy { + pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) + if err != nil { + return nil, err + } + } else { + pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) + } + } + + if config.RPC.PprofListenAddress != "" { + go func() { + logger.Info("Starting pprof server", "laddr", config.RPC.PprofListenAddress) + logger.Error("pprof server error", "err", http.ListenAndServe(config.RPC.PprofListenAddress, nil)) + }() + } + + node := &Node{ + config: config, + genesisDoc: genDoc, + privValidator: privValidator, + + transport: transport, + sw: sw, + peerManager: peerManager, + router: router, + addrBook: addrBook, + nodeInfo: nodeInfo, + nodeKey: nodeKey, + + stateStore: stateStore, + blockStore: blockStore, + bcReactor: bcReactor, + mempoolReactor: mpReactor, + mempool: mempool, + consensusState: csState, + consensusReactor: csReactor, + stateSyncReactor: stateSyncReactor, + stateSync: stateSync, + stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state + pexReactor: pexReactor, + pexReactorV2: pexReactorV2, + evidenceReactor: evReactor, + evidencePool: evPool, + proxyApp: proxyApp, + txIndexer: txIndexer, + indexerService: indexerService, + blockIndexer: blockIndexer, + eventBus: eventBus, + } + node.BaseService = *service.NewBaseService(logger, "Node", node) + + for _, option := range options { + option(node) + } + + return node, nil +} + +// NewSeedNode returns a new seed node, containing only p2p, pex reactor +func NewSeedNode(config *cfg.Config, + dbProvider DBProvider, + nodeKey p2p.NodeKey, + genesisDocProvider GenesisDocProvider, + logger log.Logger, + options ...Option) (*Node, error) { + + genDoc, err := genesisDocProvider() + if err != nil { + return nil, err + } + + state, err := sm.MakeGenesisState(genDoc) + if err != nil { + return nil, err + } - // setup Transport and Switch + nodeInfo, err := makeSeedNodeInfo(config, nodeKey, genDoc, state) + if err != nil { + return nil, err + } + + // Setup Transport and Switch. + p2pMetrics := p2p.PrometheusMetrics(config.Instrumentation.Namespace, "chain_id", genDoc.ChainID) + p2pLogger := logger.With("module", "p2p") + transport := createTransport(p2pLogger, config) sw := createSwitch( - config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, - stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, + config, transport, p2pMetrics, nil, nil, + nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger, ) err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) if err != nil { - return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err) + return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err) } err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) @@ -1244,34 +495,35 @@ func NewNode(config *cfg.Config, return nil, fmt.Errorf("could not create addrbook: %w", err) } - // Optionally, start the pex reactor - // - // TODO: - // - // We need to set Seeds and PersistentPeers on the switch, - // since it needs to be able to use these (and their DNS names) - // even if the PEX is off. We can include the DNS name in the NetAddress, - // but it would still be nice to have a clear list of the current "PersistentPeers" - // somewhere that we can return with net_info. - // - // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. - // Note we currently use the addrBook regardless at least for AddOurAddress + peerManager, err := createPeerManager(config, dbProvider, p2pLogger, nodeKey.ID) + if err != nil { + return nil, fmt.Errorf("failed to create peer manager: %w", err) + } + + router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, + peerManager, transport, getRouterConfig(config, nil)) + if err != nil { + return nil, fmt.Errorf("failed to create router: %w", err) + } + var ( pexReactor *pex.Reactor pexReactorV2 *pex.ReactorV2 ) - if config.P2P.PexReactor { - pexCh := pex.ChannelDescriptor() - transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) - if config.P2P.DisableLegacy { - pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) - if err != nil { - return nil, err - } - } else { - pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) + // add the pex reactor + // FIXME: we add channel descriptors to both the router and the transport but only the router + // should be aware of channel info. We should remove this from transport once the legacy + // p2p stack is removed. + pexCh := pex.ChannelDescriptor() + transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) + if config.P2P.DisableLegacy { + pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) + if err != nil { + return nil, err } + } else { + pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) } if config.RPC.PprofListenAddress != "" { @@ -1282,39 +534,21 @@ func NewNode(config *cfg.Config, } node := &Node{ - config: config, - genesisDoc: genDoc, - privValidator: privValidator, + config: config, + genesisDoc: genDoc, transport: transport, sw: sw, - peerManager: peerManager, - router: router, addrBook: addrBook, nodeInfo: nodeInfo, nodeKey: nodeKey, + peerManager: peerManager, + router: router, - stateStore: stateStore, - blockStore: blockStore, - bcReactor: bcReactor, - mempoolReactor: mpReactor, - mempool: mempool, - consensusState: csState, - consensusReactor: csReactor, - stateSyncReactor: stateSyncReactor, - stateSync: stateSync, - stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state - pexReactor: pexReactor, - pexReactorV2: pexReactorV2, - evidenceReactor: evReactor, - evidencePool: evPool, - proxyApp: proxyApp, - txIndexer: txIndexer, - indexerService: indexerService, - blockIndexer: blockIndexer, - eventBus: eventBus, + pexReactor: pexReactor, + pexReactorV2: pexReactorV2, } - node.BaseService = *service.NewBaseService(logger, "Node", node) + node.BaseService = *service.NewBaseService(logger, "SeedNode", node) for _, option := range options { option(node) @@ -1323,6 +557,49 @@ func NewNode(config *cfg.Config, return node, nil } +// Option sets a parameter for the node. +type Option func(*Node) + +// Temporary interface for switching to fast sync, we should get rid of v0. +// See: https://github.com/tendermint/tendermint/issues/4595 +type fastSyncReactor interface { + SwitchToFastSync(sm.State) error +} + +// CustomReactors allows you to add custom reactors (name -> p2p.Reactor) to +// the node's Switch. +// +// WARNING: using any name from the below list of the existing reactors will +// result in replacing it with the custom one. +// +// - MEMPOOL +// - BLOCKCHAIN +// - CONSENSUS +// - EVIDENCE +// - PEX +// - STATESYNC +func CustomReactors(reactors map[string]p2p.Reactor) Option { + return func(n *Node) { + for name, reactor := range reactors { + if existingReactor := n.sw.Reactor(name); existingReactor != nil { + n.sw.Logger.Info("Replacing existing reactor with a custom one", + "name", name, "existing", existingReactor, "custom", reactor) + n.sw.RemoveReactor(name, existingReactor) + } + n.sw.AddReactor(name, reactor) + } + } +} + +// StateProvider overrides the state provider used by state sync to retrieve trusted app hashes and +// build a State object for bootstrapping the node. +// WARNING: this interface is considered unstable and subject to change. +func StateProvider(stateProvider statesync.StateProvider) Option { + return func(n *Node) { + n.stateSyncProvider = stateProvider + } +} + // OnStart starts the Node. It implements service.Service. func (n *Node) OnStart() error { now := tmtime.Now() @@ -1778,110 +1055,109 @@ func (n *Node) NodeInfo() p2p.NodeInfo { return n.nodeInfo } -func makeNodeInfo( - config *cfg.Config, - nodeKey p2p.NodeKey, - txIndexer indexer.TxIndexer, - genDoc *types.GenesisDoc, - state sm.State, -) (p2p.NodeInfo, error) { - txIndexerStatus := "on" - if _, ok := txIndexer.(*null.TxIndex); ok { - txIndexerStatus = "off" - } - - var bcChannel byte - switch config.FastSync.Version { - case cfg.BlockchainV0: - bcChannel = byte(bcv0.BlockchainChannel) - - case cfg.BlockchainV2: - bcChannel = bcv2.BlockchainChannel - - default: - return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) - } +// startStateSync starts an asynchronous state sync process, then switches to fast sync mode. +func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reactor, + stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, fastSync bool, + stateStore sm.Store, blockStore *store.BlockStore, state sm.State) error { + ssR.Logger.Info("Starting state sync") - nodeInfo := p2p.NodeInfo{ - ProtocolVersion: p2p.NewProtocolVersion( - version.P2PProtocol, // global - state.Version.Consensus.Block, - state.Version.Consensus.App, - ), - NodeID: nodeKey.ID, - Network: genDoc.ChainID, - Version: version.TMCoreSemVer, - Channels: []byte{ - bcChannel, - byte(cs.StateChannel), - byte(cs.DataChannel), - byte(cs.VoteChannel), - byte(cs.VoteSetBitsChannel), - byte(mempl.MempoolChannel), - byte(evidence.EvidenceChannel), - byte(statesync.SnapshotChannel), - byte(statesync.ChunkChannel), - }, - Moniker: config.Moniker, - Other: p2p.NodeInfoOther{ - TxIndex: txIndexerStatus, - RPCAddress: config.RPC.ListenAddress, - }, + if stateProvider == nil { + var err error + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + stateProvider, err = statesync.NewLightClientStateProvider( + ctx, + state.ChainID, state.Version, state.InitialHeight, + config.RPCServers, light.TrustOptions{ + Period: config.TrustPeriod, + Height: config.TrustHeight, + Hash: config.TrustHashBytes(), + }, ssR.Logger.With("module", "light")) + if err != nil { + return fmt.Errorf("failed to set up light client state provider: %w", err) + } } - if config.P2P.PexReactor { - nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel) - } + go func() { + state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime) + if err != nil { + ssR.Logger.Error("State sync failed", "err", err) + return + } + err = stateStore.Bootstrap(state) + if err != nil { + ssR.Logger.Error("Failed to bootstrap node with new state", "err", err) + return + } + err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit) + if err != nil { + ssR.Logger.Error("Failed to store last seen commit", "err", err) + return + } - lAddr := config.P2P.ExternalAddress + if fastSync { + // FIXME Very ugly to have these metrics bleed through here. + conR.Metrics.StateSyncing.Set(0) + conR.Metrics.FastSyncing.Set(1) + err = bcR.SwitchToFastSync(state) + if err != nil { + ssR.Logger.Error("Failed to switch to fast sync", "err", err) + return + } + } else { + conR.SwitchToConsensus(state, true) + } + }() + return nil +} - if lAddr == "" { - lAddr = config.P2P.ListenAddress - } +// DBContext specifies config information for loading a new DB. +type DBContext struct { + ID string + Config *cfg.Config +} - nodeInfo.ListenAddr = lAddr +// DBProvider takes a DBContext and returns an instantiated DB. +type DBProvider func(*DBContext) (dbm.DB, error) - err := nodeInfo.Validate() - return nodeInfo, err +// DefaultDBProvider returns a database using the DBBackend and DBDir +// specified in the ctx.Config. +func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) { + dbType := dbm.BackendType(ctx.Config.DBBackend) + return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()) } -func makeSeedNodeInfo( - config *cfg.Config, - nodeKey p2p.NodeKey, - genDoc *types.GenesisDoc, - state sm.State, -) (p2p.NodeInfo, error) { - nodeInfo := p2p.NodeInfo{ - ProtocolVersion: p2p.NewProtocolVersion( - version.P2PProtocol, // global - state.Version.Consensus.Block, - state.Version.Consensus.App, - ), - NodeID: nodeKey.ID, - Network: genDoc.ChainID, - Version: version.TMCoreSemVer, - Channels: []byte{}, - Moniker: config.Moniker, - Other: p2p.NodeInfoOther{ - TxIndex: "off", - RPCAddress: config.RPC.ListenAddress, - }, - } +// GenesisDocProvider returns a GenesisDoc. +// It allows the GenesisDoc to be pulled from sources other than the +// filesystem, for instance from a distributed key-value store cluster. +type GenesisDocProvider func() (*types.GenesisDoc, error) - if config.P2P.PexReactor { - nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel) +// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads +// the GenesisDoc from the config.GenesisFile() on the filesystem. +func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider { + return func() (*types.GenesisDoc, error) { + return types.GenesisDocFromFile(config.GenesisFile()) } +} - lAddr := config.P2P.ExternalAddress - - if lAddr == "" { - lAddr = config.P2P.ListenAddress - } +// Provider takes a config and a logger and returns a ready to go Node. +type Provider func(*cfg.Config, log.Logger) (*Node, error) - nodeInfo.ListenAddr = lAddr +// MetricsProvider returns a consensus, p2p and mempool Metrics. +type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) - err := nodeInfo.Validate() - return nodeInfo, err +// DefaultMetricsProvider returns Metrics build using Prometheus client library +// if Prometheus is enabled. Otherwise, it returns no-op Metrics. +func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { + return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) { + if config.Prometheus { + return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), + p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), + mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID), + sm.PrometheusMetrics(config.Namespace, "chain_id", chainID) + } + return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics() + } } //------------------------------------------------------------------------------ diff --git a/node/setup.go b/node/setup.go new file mode 100644 index 000000000..855273cb8 --- /dev/null +++ b/node/setup.go @@ -0,0 +1,751 @@ +package node + +import ( + "bytes" + "context" + "fmt" + "math" + "net" + _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port + "time" + + dbm "github.com/tendermint/tm-db" + + abci "github.com/tendermint/tendermint/abci/types" + bcv0 "github.com/tendermint/tendermint/blockchain/v0" + bcv2 "github.com/tendermint/tendermint/blockchain/v2" + cfg "github.com/tendermint/tendermint/config" + cs "github.com/tendermint/tendermint/consensus" + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/evidence" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/libs/strings" + mempl "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/pex" + protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p" + "github.com/tendermint/tendermint/proxy" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" + blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" + blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null" + "github.com/tendermint/tendermint/state/indexer/tx/kv" + "github.com/tendermint/tendermint/state/indexer/tx/null" + "github.com/tendermint/tendermint/statesync" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/version" +) + +func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { + var blockStoreDB dbm.DB + blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) + if err != nil { + return + } + blockStore = store.NewBlockStore(blockStoreDB) + + stateDB, err = dbProvider(&DBContext{"state", config}) + return +} + +func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) { + proxyApp := proxy.NewAppConns(clientCreator) + proxyApp.SetLogger(logger.With("module", "proxy")) + if err := proxyApp.Start(); err != nil { + return nil, fmt.Errorf("error starting proxy app connections: %v", err) + } + return proxyApp, nil +} + +func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { + eventBus := types.NewEventBus() + eventBus.SetLogger(logger.With("module", "events")) + if err := eventBus.Start(); err != nil { + return nil, err + } + return eventBus, nil +} + +func createAndStartIndexerService( + config *cfg.Config, + dbProvider DBProvider, + eventBus *types.EventBus, + logger log.Logger, +) (*indexer.Service, indexer.TxIndexer, indexer.BlockIndexer, error) { + + var ( + txIndexer indexer.TxIndexer + blockIndexer indexer.BlockIndexer + ) + + switch config.TxIndex.Indexer { + case "kv": + store, err := dbProvider(&DBContext{"tx_index", config}) + if err != nil { + return nil, nil, nil, err + } + + txIndexer = kv.NewTxIndex(store) + blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))) + default: + txIndexer = &null.TxIndex{} + blockIndexer = &blockidxnull.BlockerIndexer{} + } + + indexerService := indexer.NewIndexerService(txIndexer, blockIndexer, eventBus) + indexerService.SetLogger(logger.With("module", "txindex")) + + if err := indexerService.Start(); err != nil { + return nil, nil, nil, err + } + + return indexerService, txIndexer, blockIndexer, nil +} + +func doHandshake( + stateStore sm.Store, + state sm.State, + blockStore sm.BlockStore, + genDoc *types.GenesisDoc, + eventBus types.BlockEventPublisher, + proxyApp proxy.AppConns, + consensusLogger log.Logger) error { + + handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc) + handshaker.SetLogger(consensusLogger) + handshaker.SetEventBus(eventBus) + if err := handshaker.Handshake(proxyApp); err != nil { + return fmt.Errorf("error during handshake: %v", err) + } + return nil +} + +func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) { + // Log the version info. + logger.Info("Version info", + "software", version.TMCoreSemVer, + "block", version.BlockProtocol, + "p2p", version.P2PProtocol, + "mode", mode, + ) + + // If the state and software differ in block version, at least log it. + if state.Version.Consensus.Block != version.BlockProtocol { + logger.Info("Software and state have different block protocols", + "software", version.BlockProtocol, + "state", state.Version.Consensus.Block, + ) + } + switch { + case mode == cfg.ModeFull: + consensusLogger.Info("This node is a fullnode") + case mode == cfg.ModeValidator: + addr := pubKey.Address() + // Log whether this node is a validator or an observer + if state.Validators.HasAddress(addr) { + consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes()) + } else { + consensusLogger.Info("This node is a validator (NOT in the active validator set)", + "addr", addr, "pubKey", pubKey.Bytes()) + } + } +} + +func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { + if state.Validators.Size() > 1 { + return false + } + addr, _ := state.Validators.GetByIndex(0) + return pubKey != nil && bytes.Equal(pubKey.Address(), addr) +} + +func createMempoolReactor( + config *cfg.Config, + proxyApp proxy.AppConns, + state sm.State, + memplMetrics *mempl.Metrics, + peerManager *p2p.PeerManager, + router *p2p.Router, + logger log.Logger, +) (*p2p.ReactorShim, *mempl.Reactor, *mempl.CListMempool) { + + logger = logger.With("module", "mempool") + mempool := mempl.NewCListMempool( + config.Mempool, + proxyApp.Mempool(), + state.LastBlockHeight, + mempl.WithMetrics(memplMetrics), + mempl.WithPreCheck(sm.TxPreCheck(state)), + mempl.WithPostCheck(sm.TxPostCheck(state)), + ) + + mempool.SetLogger(logger) + + channelShims := mempl.GetChannelShims(config.Mempool) + reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims) + + var ( + channels map[p2p.ChannelID]*p2p.Channel + peerUpdates *p2p.PeerUpdates + ) + + if config.P2P.DisableLegacy { + channels = makeChannelsFromShims(router, channelShims) + peerUpdates = peerManager.Subscribe() + } else { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates + } + + reactor := mempl.NewReactor( + logger, + config.Mempool, + peerManager, + mempool, + channels[mempl.MempoolChannel], + peerUpdates, + ) + + if config.Consensus.WaitForTxs() { + mempool.EnableTxsAvailable() + } + + return reactorShim, reactor, mempool +} + +func createEvidenceReactor( + config *cfg.Config, + dbProvider DBProvider, + stateDB dbm.DB, + blockStore *store.BlockStore, + peerManager *p2p.PeerManager, + router *p2p.Router, + logger log.Logger, +) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) { + evidenceDB, err := dbProvider(&DBContext{"evidence", config}) + if err != nil { + return nil, nil, nil, err + } + + logger = logger.With("module", "evidence") + reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims) + + evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore) + if err != nil { + return nil, nil, nil, err + } + + var ( + channels map[p2p.ChannelID]*p2p.Channel + peerUpdates *p2p.PeerUpdates + ) + + if config.P2P.DisableLegacy { + channels = makeChannelsFromShims(router, evidence.ChannelShims) + peerUpdates = peerManager.Subscribe() + } else { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates + } + + evidenceReactor := evidence.NewReactor( + logger, + channels[evidence.EvidenceChannel], + peerUpdates, + evidencePool, + ) + + return reactorShim, evidenceReactor, evidencePool, nil +} + +func createBlockchainReactor( + logger log.Logger, + config *cfg.Config, + state sm.State, + blockExec *sm.BlockExecutor, + blockStore *store.BlockStore, + csReactor *cs.Reactor, + peerManager *p2p.PeerManager, + router *p2p.Router, + fastSync bool, +) (*p2p.ReactorShim, service.Service, error) { + + logger = logger.With("module", "blockchain") + + switch config.FastSync.Version { + case cfg.BlockchainV0: + reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims) + + var ( + channels map[p2p.ChannelID]*p2p.Channel + peerUpdates *p2p.PeerUpdates + ) + + if config.P2P.DisableLegacy { + channels = makeChannelsFromShims(router, bcv0.ChannelShims) + peerUpdates = peerManager.Subscribe() + } else { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates + } + + reactor, err := bcv0.NewReactor( + logger, state.Copy(), blockExec, blockStore, csReactor, + channels[bcv0.BlockchainChannel], peerUpdates, fastSync, + ) + if err != nil { + return nil, nil, err + } + + return reactorShim, reactor, nil + + case cfg.BlockchainV2: + reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + reactor.SetLogger(logger) + + return nil, reactor, nil + + default: + return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) + } +} + +func createConsensusReactor( + config *cfg.Config, + state sm.State, + blockExec *sm.BlockExecutor, + blockStore sm.BlockStore, + mempool *mempl.CListMempool, + evidencePool *evidence.Pool, + privValidator types.PrivValidator, + csMetrics *cs.Metrics, + waitSync bool, + eventBus *types.EventBus, + peerManager *p2p.PeerManager, + router *p2p.Router, + logger log.Logger, +) (*p2p.ReactorShim, *cs.Reactor, *cs.State) { + + consensusState := cs.NewState( + config.Consensus, + state.Copy(), + blockExec, + blockStore, + mempool, + evidencePool, + cs.StateMetrics(csMetrics), + ) + consensusState.SetLogger(logger) + if privValidator != nil && config.Mode == cfg.ModeValidator { + consensusState.SetPrivValidator(privValidator) + } + + reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims) + + var ( + channels map[p2p.ChannelID]*p2p.Channel + peerUpdates *p2p.PeerUpdates + ) + + if config.P2P.DisableLegacy { + channels = makeChannelsFromShims(router, cs.ChannelShims) + peerUpdates = peerManager.Subscribe() + } else { + channels = getChannelsFromShim(reactorShim) + peerUpdates = reactorShim.PeerUpdates + } + + reactor := cs.NewReactor( + logger, + consensusState, + channels[cs.StateChannel], + channels[cs.DataChannel], + channels[cs.VoteChannel], + channels[cs.VoteSetBitsChannel], + peerUpdates, + waitSync, + cs.ReactorMetrics(csMetrics), + ) + + // Services which will be publishing and/or subscribing for messages (events) + // consensusReactor will set it on consensusState and blockExecutor. + reactor.SetEventBus(eventBus) + + return reactorShim, reactor, consensusState +} + +func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport { + return p2p.NewMConnTransport( + logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{}, + p2p.MConnTransportOptions{ + MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers + + len(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")), + ), + }, + ) +} + +func createPeerManager( + config *cfg.Config, + dbProvider DBProvider, + p2pLogger log.Logger, + nodeID p2p.NodeID, +) (*p2p.PeerManager, error) { + + var maxConns uint16 + + switch { + case config.P2P.MaxConnections > 0: + maxConns = config.P2P.MaxConnections + + case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0: + x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers + if x > math.MaxUint16 { + return nil, fmt.Errorf( + "max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)", + config.P2P.MaxNumInboundPeers, + config.P2P.MaxNumOutboundPeers, + math.MaxUint16, + ) + } + + maxConns = uint16(x) + + default: + maxConns = 64 + } + + privatePeerIDs := make(map[p2p.NodeID]struct{}) + for _, id := range strings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") { + privatePeerIDs[p2p.NodeID(id)] = struct{}{} + } + + options := p2p.PeerManagerOptions{ + MaxConnected: maxConns, + MaxConnectedUpgrade: 4, + MaxPeers: 1000, + MinRetryTime: 100 * time.Millisecond, + MaxRetryTime: 8 * time.Hour, + MaxRetryTimePersistent: 5 * time.Minute, + RetryTimeJitter: 3 * time.Second, + PrivatePeers: privatePeerIDs, + } + + peers := []p2p.NodeAddress{} + for _, p := range strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") { + address, err := p2p.ParseNodeAddress(p) + if err != nil { + return nil, fmt.Errorf("invalid peer address %q: %w", p, err) + } + + peers = append(peers, address) + options.PersistentPeers = append(options.PersistentPeers, address.NodeID) + } + + for _, p := range strings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") { + address, err := p2p.ParseNodeAddress(p) + if err != nil { + return nil, fmt.Errorf("invalid peer address %q: %w", p, err) + } + peers = append(peers, address) + } + + peerDB, err := dbProvider(&DBContext{"peerstore", config}) + if err != nil { + return nil, err + } + + peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options) + if err != nil { + return nil, fmt.Errorf("failed to create peer manager: %w", err) + } + + for _, peer := range peers { + if _, err := peerManager.Add(peer); err != nil { + return nil, fmt.Errorf("failed to add peer %q: %w", peer, err) + } + } + + return peerManager, nil +} + +func createRouter( + p2pLogger log.Logger, + p2pMetrics *p2p.Metrics, + nodeInfo p2p.NodeInfo, + privKey crypto.PrivKey, + peerManager *p2p.PeerManager, + transport p2p.Transport, + options p2p.RouterOptions, +) (*p2p.Router, error) { + + return p2p.NewRouter( + p2pLogger, + p2pMetrics, + nodeInfo, + privKey, + peerManager, + []p2p.Transport{transport}, + options, + ) +} + +func createSwitch( + config *cfg.Config, + transport p2p.Transport, + p2pMetrics *p2p.Metrics, + mempoolReactor *p2p.ReactorShim, + bcReactor p2p.Reactor, + stateSyncReactor *p2p.ReactorShim, + consensusReactor *p2p.ReactorShim, + evidenceReactor *p2p.ReactorShim, + proxyApp proxy.AppConns, + nodeInfo p2p.NodeInfo, + nodeKey p2p.NodeKey, + p2pLogger log.Logger, +) *p2p.Switch { + + var ( + connFilters = []p2p.ConnFilterFunc{} + peerFilters = []p2p.PeerFilterFunc{} + ) + + if !config.P2P.AllowDuplicateIP { + connFilters = append(connFilters, p2p.ConnDuplicateIPFilter) + } + + // Filter peers by addr or pubkey with an ABCI query. + // If the query return code is OK, add peer. + if config.FilterPeers { + connFilters = append( + connFilters, + // ABCI query for address filtering. + func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { + res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("error querying abci app: %v", res) + } + + return nil + }, + ) + + peerFilters = append( + peerFilters, + // ABCI query for ID filtering. + func(_ p2p.IPeerSet, p p2p.Peer) error { + res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("error querying abci app: %v", res) + } + + return nil + }, + ) + } + + sw := p2p.NewSwitch( + config.P2P, + transport, + p2p.WithMetrics(p2pMetrics), + p2p.SwitchPeerFilters(peerFilters...), + p2p.SwitchConnFilters(connFilters...), + ) + + sw.SetLogger(p2pLogger) + if config.Mode != cfg.ModeSeed { + sw.AddReactor("MEMPOOL", mempoolReactor) + sw.AddReactor("BLOCKCHAIN", bcReactor) + sw.AddReactor("CONSENSUS", consensusReactor) + sw.AddReactor("EVIDENCE", evidenceReactor) + sw.AddReactor("STATESYNC", stateSyncReactor) + } + + sw.SetNodeInfo(nodeInfo) + sw.SetNodeKey(nodeKey) + + p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile()) + return sw +} + +func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch, + p2pLogger log.Logger, nodeKey p2p.NodeKey) (pex.AddrBook, error) { + + addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) + addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) + + // Add ourselves to addrbook to prevent dialing ourselves + if config.P2P.ExternalAddress != "" { + addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ExternalAddress)) + if err != nil { + return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err) + } + addrBook.AddOurAddress(addr) + } + if config.P2P.ListenAddress != "" { + addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ListenAddress)) + if err != nil { + return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err) + } + addrBook.AddOurAddress(addr) + } + + sw.SetAddrBook(addrBook) + + return addrBook, nil +} + +func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, + sw *p2p.Switch, logger log.Logger) *pex.Reactor { + + reactorConfig := &pex.ReactorConfig{ + Seeds: strings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "), + SeedMode: config.Mode == cfg.ModeSeed, + // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000 + // blocks assuming 10s blocks ~ 28 hours. + // TODO (melekes): make it dynamic based on the actual block latencies + // from the live network. + // https://github.com/tendermint/tendermint/issues/3523 + SeedDisconnectWaitPeriod: 28 * time.Hour, + PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod, + } + // TODO persistent peers ? so we can have their DNS addrs saved + pexReactor := pex.NewReactor(addrBook, reactorConfig) + pexReactor.SetLogger(logger.With("module", "pex")) + sw.AddReactor("PEX", pexReactor) + return pexReactor +} + +func createPEXReactorV2( + config *cfg.Config, + logger log.Logger, + peerManager *p2p.PeerManager, + router *p2p.Router, +) (*pex.ReactorV2, error) { + + channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 4096) + if err != nil { + return nil, err + } + + peerUpdates := peerManager.Subscribe() + return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil +} + +func makeNodeInfo( + config *cfg.Config, + nodeKey p2p.NodeKey, + txIndexer indexer.TxIndexer, + genDoc *types.GenesisDoc, + state sm.State, +) (p2p.NodeInfo, error) { + txIndexerStatus := "on" + if _, ok := txIndexer.(*null.TxIndex); ok { + txIndexerStatus = "off" + } + + var bcChannel byte + switch config.FastSync.Version { + case cfg.BlockchainV0: + bcChannel = byte(bcv0.BlockchainChannel) + + case cfg.BlockchainV2: + bcChannel = bcv2.BlockchainChannel + + default: + return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) + } + + nodeInfo := p2p.NodeInfo{ + ProtocolVersion: p2p.NewProtocolVersion( + version.P2PProtocol, // global + state.Version.Consensus.Block, + state.Version.Consensus.App, + ), + NodeID: nodeKey.ID, + Network: genDoc.ChainID, + Version: version.TMCoreSemVer, + Channels: []byte{ + bcChannel, + byte(cs.StateChannel), + byte(cs.DataChannel), + byte(cs.VoteChannel), + byte(cs.VoteSetBitsChannel), + byte(mempl.MempoolChannel), + byte(evidence.EvidenceChannel), + byte(statesync.SnapshotChannel), + byte(statesync.ChunkChannel), + }, + Moniker: config.Moniker, + Other: p2p.NodeInfoOther{ + TxIndex: txIndexerStatus, + RPCAddress: config.RPC.ListenAddress, + }, + } + + if config.P2P.PexReactor { + nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel) + } + + lAddr := config.P2P.ExternalAddress + + if lAddr == "" { + lAddr = config.P2P.ListenAddress + } + + nodeInfo.ListenAddr = lAddr + + err := nodeInfo.Validate() + return nodeInfo, err +} + +func makeSeedNodeInfo( + config *cfg.Config, + nodeKey p2p.NodeKey, + genDoc *types.GenesisDoc, + state sm.State, +) (p2p.NodeInfo, error) { + nodeInfo := p2p.NodeInfo{ + ProtocolVersion: p2p.NewProtocolVersion( + version.P2PProtocol, // global + state.Version.Consensus.Block, + state.Version.Consensus.App, + ), + NodeID: nodeKey.ID, + Network: genDoc.ChainID, + Version: version.TMCoreSemVer, + Channels: []byte{}, + Moniker: config.Moniker, + Other: p2p.NodeInfoOther{ + TxIndex: "off", + RPCAddress: config.RPC.ListenAddress, + }, + } + + if config.P2P.PexReactor { + nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel) + } + + lAddr := config.P2P.ExternalAddress + + if lAddr == "" { + lAddr = config.P2P.ListenAddress + } + + nodeInfo.ListenAddr = lAddr + + err := nodeInfo.Validate() + return nodeInfo, err +}