|
package node
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
_ "net/http/pprof"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/rs/cors"
|
|
|
|
amino "github.com/tendermint/go-amino"
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
bc "github.com/tendermint/tendermint/blockchain"
|
|
cfg "github.com/tendermint/tendermint/config"
|
|
cs "github.com/tendermint/tendermint/consensus"
|
|
"github.com/tendermint/tendermint/crypto/ed25519"
|
|
"github.com/tendermint/tendermint/evidence"
|
|
cmn "github.com/tendermint/tendermint/libs/common"
|
|
dbm "github.com/tendermint/tendermint/libs/db"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
mempl "github.com/tendermint/tendermint/mempool"
|
|
"github.com/tendermint/tendermint/p2p"
|
|
"github.com/tendermint/tendermint/p2p/pex"
|
|
"github.com/tendermint/tendermint/privval"
|
|
"github.com/tendermint/tendermint/proxy"
|
|
rpccore "github.com/tendermint/tendermint/rpc/core"
|
|
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
|
grpccore "github.com/tendermint/tendermint/rpc/grpc"
|
|
"github.com/tendermint/tendermint/rpc/lib/server"
|
|
sm "github.com/tendermint/tendermint/state"
|
|
"github.com/tendermint/tendermint/state/txindex"
|
|
"github.com/tendermint/tendermint/state/txindex/kv"
|
|
"github.com/tendermint/tendermint/state/txindex/null"
|
|
"github.com/tendermint/tendermint/types"
|
|
tmtime "github.com/tendermint/tendermint/types/time"
|
|
"github.com/tendermint/tendermint/version"
|
|
)
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
// DBContext specifies config information for loading a new DB.
|
|
type DBContext struct {
|
|
ID string
|
|
Config *cfg.Config
|
|
}
|
|
|
|
// DBProvider takes a DBContext and returns an instantiated DB.
|
|
type DBProvider func(*DBContext) (dbm.DB, error)
|
|
|
|
// DefaultDBProvider returns a database using the DBBackend and DBDir
|
|
// specified in the ctx.Config.
|
|
func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
|
|
dbType := dbm.DBBackendType(ctx.Config.DBBackend)
|
|
return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()), nil
|
|
}
|
|
|
|
// GenesisDocProvider returns a GenesisDoc.
|
|
// It allows the GenesisDoc to be pulled from sources other than the
|
|
// filesystem, for instance from a distributed key-value store cluster.
|
|
type GenesisDocProvider func() (*types.GenesisDoc, error)
|
|
|
|
// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
|
|
// the GenesisDoc from the config.GenesisFile() on the filesystem.
|
|
func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
|
|
return func() (*types.GenesisDoc, error) {
|
|
return types.GenesisDocFromFile(config.GenesisFile())
|
|
}
|
|
}
|
|
|
|
// NodeProvider takes a config and a logger and returns a ready to go Node.
|
|
type NodeProvider func(*cfg.Config, log.Logger) (*Node, error)
|
|
|
|
// DefaultNewNode returns a Tendermint node with default settings for the
|
|
// PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
|
|
// It implements NodeProvider.
|
|
func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
|
|
// Generate node PrivKey
|
|
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return NewNode(config,
|
|
privval.LoadOrGenFilePV(config.PrivValidatorFile()),
|
|
nodeKey,
|
|
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
|
|
DefaultGenesisDocProviderFunc(config),
|
|
DefaultDBProvider,
|
|
DefaultMetricsProvider(config.Instrumentation),
|
|
logger,
|
|
)
|
|
}
|
|
|
|
// MetricsProvider returns a consensus, p2p and mempool Metrics.
|
|
type MetricsProvider func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
|
|
|
|
// DefaultMetricsProvider returns Metrics build using Prometheus client library
|
|
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
|
|
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
|
|
return func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
|
|
if config.Prometheus {
|
|
return cs.PrometheusMetrics(config.Namespace), p2p.PrometheusMetrics(config.Namespace),
|
|
mempl.PrometheusMetrics(config.Namespace), sm.PrometheusMetrics(config.Namespace)
|
|
}
|
|
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
// Node is the highest level interface to a full Tendermint node.
|
|
// It includes all configuration information and running services.
|
|
type Node struct {
|
|
cmn.BaseService
|
|
|
|
// config
|
|
config *cfg.Config
|
|
genesisDoc *types.GenesisDoc // initial validator set
|
|
privValidator types.PrivValidator // local node's validator key
|
|
|
|
// network
|
|
transport *p2p.MultiplexTransport
|
|
sw *p2p.Switch // p2p connections
|
|
addrBook pex.AddrBook // known peers
|
|
nodeInfo p2p.NodeInfo
|
|
nodeKey *p2p.NodeKey // our node privkey
|
|
isListening bool
|
|
|
|
// services
|
|
eventBus *types.EventBus // pub/sub for services
|
|
stateDB dbm.DB
|
|
blockStore *bc.BlockStore // store the blockchain to disk
|
|
bcReactor *bc.BlockchainReactor // for fast-syncing
|
|
mempoolReactor *mempl.MempoolReactor // for gossipping transactions
|
|
consensusState *cs.ConsensusState // latest consensus state
|
|
consensusReactor *cs.ConsensusReactor // for participating in the consensus
|
|
evidencePool *evidence.EvidencePool // tracking evidence
|
|
proxyApp proxy.AppConns // connection to the application
|
|
rpcListeners []net.Listener // rpc servers
|
|
txIndexer txindex.TxIndexer
|
|
indexerService *txindex.IndexerService
|
|
prometheusSrv *http.Server
|
|
}
|
|
|
|
// NewNode returns a new, ready to go, Tendermint Node.
|
|
func NewNode(config *cfg.Config,
|
|
privValidator types.PrivValidator,
|
|
nodeKey *p2p.NodeKey,
|
|
clientCreator proxy.ClientCreator,
|
|
genesisDocProvider GenesisDocProvider,
|
|
dbProvider DBProvider,
|
|
metricsProvider MetricsProvider,
|
|
logger log.Logger) (*Node, error) {
|
|
|
|
// Get BlockStore
|
|
blockStoreDB, err := dbProvider(&DBContext{"blockstore", config})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
blockStore := bc.NewBlockStore(blockStoreDB)
|
|
|
|
// Get State
|
|
stateDB, err := dbProvider(&DBContext{"state", config})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Get genesis doc
|
|
// TODO: move to state package?
|
|
genDoc, err := loadGenesisDoc(stateDB)
|
|
if err != nil {
|
|
genDoc, err = genesisDocProvider()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// save genesis doc to prevent a certain class of user errors (e.g. when it
|
|
// was changed, accidentally or not). Also good for audit trail.
|
|
saveGenesisDoc(stateDB, genDoc)
|
|
}
|
|
|
|
state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
|
|
proxyApp := proxy.NewAppConns(clientCreator)
|
|
proxyApp.SetLogger(logger.With("module", "proxy"))
|
|
if err := proxyApp.Start(); err != nil {
|
|
return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
|
|
}
|
|
|
|
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
|
|
// and replays any blocks as necessary to sync tendermint with the app.
|
|
consensusLogger := logger.With("module", "consensus")
|
|
handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
|
|
handshaker.SetLogger(consensusLogger)
|
|
if err := handshaker.Handshake(proxyApp); err != nil {
|
|
return nil, fmt.Errorf("Error during handshake: %v", err)
|
|
}
|
|
|
|
// Reload the state. It will have the Version.Consensus.App set by the
|
|
// Handshake, and may have other modifications as well (ie. depending on
|
|
// what happened during block replay).
|
|
state = sm.LoadState(stateDB)
|
|
|
|
// Log the version info.
|
|
logger.Info("Version info",
|
|
"software", version.TMCoreSemVer,
|
|
"block", version.BlockProtocol,
|
|
"p2p", version.P2PProtocol,
|
|
)
|
|
|
|
// If the state and software differ in block version, at least log it.
|
|
if state.Version.Consensus.Block != version.BlockProtocol {
|
|
logger.Info("Software and state have different block protocols",
|
|
"software", version.BlockProtocol,
|
|
"state", state.Version.Consensus.Block,
|
|
)
|
|
}
|
|
|
|
if config.PrivValidatorListenAddr != "" {
|
|
// If an address is provided, listen on the socket for a connection from an
|
|
// external signing process.
|
|
// FIXME: we should start services inside OnStart
|
|
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Error with private validator socket client")
|
|
}
|
|
}
|
|
|
|
// Decide whether to fast-sync or not
|
|
// We don't fast-sync when the only validator is us.
|
|
fastSync := config.FastSync
|
|
if state.Validators.Size() == 1 {
|
|
addr, _ := state.Validators.GetByIndex(0)
|
|
if bytes.Equal(privValidator.GetAddress(), addr) {
|
|
fastSync = false
|
|
}
|
|
}
|
|
|
|
// Log whether this node is a validator or an observer
|
|
if state.Validators.HasAddress(privValidator.GetAddress()) {
|
|
consensusLogger.Info("This node is a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
|
|
} else {
|
|
consensusLogger.Info("This node is not a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
|
|
}
|
|
|
|
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider()
|
|
|
|
// Make MempoolReactor
|
|
mempool := mempl.NewMempool(
|
|
config.Mempool,
|
|
proxyApp.Mempool(),
|
|
state.LastBlockHeight,
|
|
mempl.WithMetrics(memplMetrics),
|
|
mempl.WithPreCheck(sm.TxPreCheck(state)),
|
|
mempl.WithPostCheck(sm.TxPostCheck(state)),
|
|
)
|
|
mempoolLogger := logger.With("module", "mempool")
|
|
mempool.SetLogger(mempoolLogger)
|
|
if config.Mempool.WalEnabled() {
|
|
mempool.InitWAL() // no need to have the mempool wal during tests
|
|
}
|
|
mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
|
|
mempoolReactor.SetLogger(mempoolLogger)
|
|
|
|
if config.Consensus.WaitForTxs() {
|
|
mempool.EnableTxsAvailable()
|
|
}
|
|
|
|
// Make Evidence Reactor
|
|
evidenceDB, err := dbProvider(&DBContext{"evidence", config})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
evidenceLogger := logger.With("module", "evidence")
|
|
evidenceStore := evidence.NewEvidenceStore(evidenceDB)
|
|
evidencePool := evidence.NewEvidencePool(stateDB, evidenceStore)
|
|
evidencePool.SetLogger(evidenceLogger)
|
|
evidenceReactor := evidence.NewEvidenceReactor(evidencePool)
|
|
evidenceReactor.SetLogger(evidenceLogger)
|
|
|
|
blockExecLogger := logger.With("module", "state")
|
|
// make block executor for consensus and blockchain reactors to execute blocks
|
|
blockExec := sm.NewBlockExecutor(
|
|
stateDB,
|
|
blockExecLogger,
|
|
proxyApp.Consensus(),
|
|
mempool,
|
|
evidencePool,
|
|
sm.BlockExecutorWithMetrics(smMetrics),
|
|
)
|
|
|
|
// Make BlockchainReactor
|
|
bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
|
|
bcReactor.SetLogger(logger.With("module", "blockchain"))
|
|
|
|
// Make ConsensusReactor
|
|
consensusState := cs.NewConsensusState(
|
|
config.Consensus,
|
|
state.Copy(),
|
|
blockExec,
|
|
blockStore,
|
|
mempool,
|
|
evidencePool,
|
|
cs.StateMetrics(csMetrics),
|
|
)
|
|
consensusState.SetLogger(consensusLogger)
|
|
if privValidator != nil {
|
|
consensusState.SetPrivValidator(privValidator)
|
|
}
|
|
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics))
|
|
consensusReactor.SetLogger(consensusLogger)
|
|
|
|
eventBus := types.NewEventBus()
|
|
eventBus.SetLogger(logger.With("module", "events"))
|
|
|
|
// services which will be publishing and/or subscribing for messages (events)
|
|
// consensusReactor will set it on consensusState and blockExecutor
|
|
consensusReactor.SetEventBus(eventBus)
|
|
|
|
// Transaction indexing
|
|
var txIndexer txindex.TxIndexer
|
|
switch config.TxIndex.Indexer {
|
|
case "kv":
|
|
store, err := dbProvider(&DBContext{"tx_index", config})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if config.TxIndex.IndexTags != "" {
|
|
txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
|
|
} else if config.TxIndex.IndexAllTags {
|
|
txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
|
|
} else {
|
|
txIndexer = kv.NewTxIndex(store)
|
|
}
|
|
default:
|
|
txIndexer = &null.TxIndex{}
|
|
}
|
|
|
|
indexerService := txindex.NewIndexerService(txIndexer, eventBus)
|
|
indexerService.SetLogger(logger.With("module", "txindex"))
|
|
|
|
p2pLogger := logger.With("module", "p2p")
|
|
nodeInfo, err := makeNodeInfo(
|
|
config,
|
|
nodeKey.ID(),
|
|
txIndexer,
|
|
genDoc.ChainID,
|
|
p2p.NewProtocolVersion(
|
|
version.P2PProtocol, // global
|
|
state.Version.Consensus.Block,
|
|
state.Version.Consensus.App,
|
|
),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Setup Transport.
|
|
var (
|
|
mConnConfig = p2p.MConnConfig(config.P2P)
|
|
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig)
|
|
connFilters = []p2p.ConnFilterFunc{}
|
|
peerFilters = []p2p.PeerFilterFunc{}
|
|
)
|
|
|
|
if !config.P2P.AllowDuplicateIP {
|
|
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter())
|
|
}
|
|
|
|
// Filter peers by addr or pubkey with an ABCI query.
|
|
// If the query return code is OK, add peer.
|
|
if config.FilterPeers {
|
|
connFilters = append(
|
|
connFilters,
|
|
// ABCI query for address filtering.
|
|
func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
|
|
res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
|
|
Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if res.IsErr() {
|
|
return fmt.Errorf("Error querying abci app: %v", res)
|
|
}
|
|
|
|
return nil
|
|
},
|
|
)
|
|
|
|
peerFilters = append(
|
|
peerFilters,
|
|
// ABCI query for ID filtering.
|
|
func(_ p2p.IPeerSet, p p2p.Peer) error {
|
|
res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
|
|
Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if res.IsErr() {
|
|
return fmt.Errorf("Error querying abci app: %v", res)
|
|
}
|
|
|
|
return nil
|
|
},
|
|
)
|
|
}
|
|
|
|
p2p.MultiplexTransportConnFilters(connFilters...)(transport)
|
|
|
|
// Setup Switch.
|
|
sw := p2p.NewSwitch(
|
|
config.P2P,
|
|
transport,
|
|
p2p.WithMetrics(p2pMetrics),
|
|
p2p.SwitchPeerFilters(peerFilters...),
|
|
)
|
|
sw.SetLogger(p2pLogger)
|
|
sw.AddReactor("MEMPOOL", mempoolReactor)
|
|
sw.AddReactor("BLOCKCHAIN", bcReactor)
|
|
sw.AddReactor("CONSENSUS", consensusReactor)
|
|
sw.AddReactor("EVIDENCE", evidenceReactor)
|
|
sw.SetNodeInfo(nodeInfo)
|
|
sw.SetNodeKey(nodeKey)
|
|
|
|
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
|
|
|
|
// Optionally, start the pex reactor
|
|
//
|
|
// TODO:
|
|
//
|
|
// We need to set Seeds and PersistentPeers on the switch,
|
|
// since it needs to be able to use these (and their DNS names)
|
|
// even if the PEX is off. We can include the DNS name in the NetAddress,
|
|
// but it would still be nice to have a clear list of the current "PersistentPeers"
|
|
// somewhere that we can return with net_info.
|
|
//
|
|
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
|
|
// Note we currently use the addrBook regardless at least for AddOurAddress
|
|
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
|
|
|
|
// Add ourselves to addrbook to prevent dialing ourselves
|
|
addrBook.AddOurAddress(nodeInfo.NetAddress())
|
|
|
|
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
|
|
if config.P2P.PexReactor {
|
|
// TODO persistent peers ? so we can have their DNS addrs saved
|
|
pexReactor := pex.NewPEXReactor(addrBook,
|
|
&pex.PEXReactorConfig{
|
|
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
|
|
SeedMode: config.P2P.SeedMode,
|
|
})
|
|
pexReactor.SetLogger(logger.With("module", "pex"))
|
|
sw.AddReactor("PEX", pexReactor)
|
|
}
|
|
|
|
sw.SetAddrBook(addrBook)
|
|
|
|
// run the profile server
|
|
profileHost := config.ProfListenAddress
|
|
if profileHost != "" {
|
|
go func() {
|
|
logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil))
|
|
}()
|
|
}
|
|
|
|
node := &Node{
|
|
config: config,
|
|
genesisDoc: genDoc,
|
|
privValidator: privValidator,
|
|
|
|
transport: transport,
|
|
sw: sw,
|
|
addrBook: addrBook,
|
|
nodeInfo: nodeInfo,
|
|
nodeKey: nodeKey,
|
|
|
|
stateDB: stateDB,
|
|
blockStore: blockStore,
|
|
bcReactor: bcReactor,
|
|
mempoolReactor: mempoolReactor,
|
|
consensusState: consensusState,
|
|
consensusReactor: consensusReactor,
|
|
evidencePool: evidencePool,
|
|
proxyApp: proxyApp,
|
|
txIndexer: txIndexer,
|
|
indexerService: indexerService,
|
|
eventBus: eventBus,
|
|
}
|
|
node.BaseService = *cmn.NewBaseService(logger, "Node", node)
|
|
return node, nil
|
|
}
|
|
|
|
// OnStart starts the Node. It implements cmn.Service.
|
|
func (n *Node) OnStart() error {
|
|
now := tmtime.Now()
|
|
genTime := n.genesisDoc.GenesisTime
|
|
if genTime.After(now) {
|
|
n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
|
|
time.Sleep(genTime.Sub(now))
|
|
}
|
|
|
|
err := n.eventBus.Start()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Add private IDs to addrbook to block those peers being added
|
|
n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
|
|
|
|
// Start the RPC server before the P2P server
|
|
// so we can eg. receive txs for the first block
|
|
if n.config.RPC.ListenAddress != "" {
|
|
listeners, err := n.startRPC()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n.rpcListeners = listeners
|
|
}
|
|
|
|
if n.config.Instrumentation.Prometheus &&
|
|
n.config.Instrumentation.PrometheusListenAddr != "" {
|
|
n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
|
|
}
|
|
|
|
// Start the transport.
|
|
addr, err := p2p.NewNetAddressStringWithOptionalID(n.config.P2P.ListenAddress)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := n.transport.Listen(*addr); err != nil {
|
|
return err
|
|
}
|
|
|
|
n.isListening = true
|
|
|
|
// Start the switch (the P2P server).
|
|
err = n.sw.Start()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Always connect to persistent peers
|
|
if n.config.P2P.PersistentPeers != "" {
|
|
err = n.sw.DialPeersAsync(n.addrBook, splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "), true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// start tx indexer
|
|
return n.indexerService.Start()
|
|
}
|
|
|
|
// OnStop stops the Node. It implements cmn.Service.
|
|
func (n *Node) OnStop() {
|
|
n.BaseService.OnStop()
|
|
|
|
n.Logger.Info("Stopping Node")
|
|
|
|
// first stop the non-reactor services
|
|
n.eventBus.Stop()
|
|
n.indexerService.Stop()
|
|
|
|
// now stop the reactors
|
|
// TODO: gracefully disconnect from peers.
|
|
n.sw.Stop()
|
|
|
|
// stop mempool WAL
|
|
if n.config.Mempool.WalEnabled() {
|
|
n.mempoolReactor.Mempool.CloseWAL()
|
|
}
|
|
|
|
if err := n.transport.Close(); err != nil {
|
|
n.Logger.Error("Error closing transport", "err", err)
|
|
}
|
|
|
|
n.isListening = false
|
|
|
|
// finally stop the listeners / external services
|
|
for _, l := range n.rpcListeners {
|
|
n.Logger.Info("Closing rpc listener", "listener", l)
|
|
if err := l.Close(); err != nil {
|
|
n.Logger.Error("Error closing listener", "listener", l, "err", err)
|
|
}
|
|
}
|
|
|
|
if pvsc, ok := n.privValidator.(cmn.Service); ok {
|
|
pvsc.Stop()
|
|
}
|
|
|
|
if n.prometheusSrv != nil {
|
|
if err := n.prometheusSrv.Shutdown(context.Background()); err != nil {
|
|
// Error from closing listeners, or context timeout:
|
|
n.Logger.Error("Prometheus HTTP server Shutdown", "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ConfigureRPC sets all variables in rpccore so they will serve
|
|
// rpc calls from this node
|
|
func (n *Node) ConfigureRPC() {
|
|
rpccore.SetStateDB(n.stateDB)
|
|
rpccore.SetBlockStore(n.blockStore)
|
|
rpccore.SetConsensusState(n.consensusState)
|
|
rpccore.SetMempool(n.mempoolReactor.Mempool)
|
|
rpccore.SetEvidencePool(n.evidencePool)
|
|
rpccore.SetP2PPeers(n.sw)
|
|
rpccore.SetP2PTransport(n)
|
|
rpccore.SetPubKey(n.privValidator.GetPubKey())
|
|
rpccore.SetGenesisDoc(n.genesisDoc)
|
|
rpccore.SetAddrBook(n.addrBook)
|
|
rpccore.SetProxyAppQuery(n.proxyApp.Query())
|
|
rpccore.SetTxIndexer(n.txIndexer)
|
|
rpccore.SetConsensusReactor(n.consensusReactor)
|
|
rpccore.SetEventBus(n.eventBus)
|
|
rpccore.SetLogger(n.Logger.With("module", "rpc"))
|
|
}
|
|
|
|
func (n *Node) startRPC() ([]net.Listener, error) {
|
|
n.ConfigureRPC()
|
|
listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ")
|
|
coreCodec := amino.NewCodec()
|
|
ctypes.RegisterAmino(coreCodec)
|
|
|
|
if n.config.RPC.Unsafe {
|
|
rpccore.AddUnsafeRoutes()
|
|
}
|
|
|
|
// we may expose the rpc over both a unix and tcp socket
|
|
listeners := make([]net.Listener, len(listenAddrs))
|
|
for i, listenAddr := range listenAddrs {
|
|
mux := http.NewServeMux()
|
|
rpcLogger := n.Logger.With("module", "rpc-server")
|
|
wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.EventSubscriber(n.eventBus))
|
|
wm.SetLogger(rpcLogger.With("protocol", "websocket"))
|
|
mux.HandleFunc("/websocket", wm.WebsocketHandler)
|
|
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)
|
|
|
|
listener, err := rpcserver.Listen(
|
|
listenAddr,
|
|
rpcserver.Config{MaxOpenConnections: n.config.RPC.MaxOpenConnections},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var rootHandler http.Handler = mux
|
|
if n.config.RPC.IsCorsEnabled() {
|
|
corsMiddleware := cors.New(cors.Options{
|
|
AllowedOrigins: n.config.RPC.CORSAllowedOrigins,
|
|
AllowedMethods: n.config.RPC.CORSAllowedMethods,
|
|
AllowedHeaders: n.config.RPC.CORSAllowedHeaders,
|
|
})
|
|
rootHandler = corsMiddleware.Handler(mux)
|
|
}
|
|
|
|
go rpcserver.StartHTTPServer(
|
|
listener,
|
|
rootHandler,
|
|
rpcLogger,
|
|
)
|
|
listeners[i] = listener
|
|
}
|
|
|
|
// we expose a simplified api over grpc for convenience to app devs
|
|
grpcListenAddr := n.config.RPC.GRPCListenAddress
|
|
if grpcListenAddr != "" {
|
|
listener, err := rpcserver.Listen(
|
|
grpcListenAddr, rpcserver.Config{MaxOpenConnections: n.config.RPC.GRPCMaxOpenConnections})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
go grpccore.StartGRPCServer(listener)
|
|
listeners = append(listeners, listener)
|
|
}
|
|
|
|
return listeners, nil
|
|
}
|
|
|
|
// startPrometheusServer starts a Prometheus HTTP server, listening for metrics
|
|
// collectors on addr.
|
|
func (n *Node) startPrometheusServer(addr string) *http.Server {
|
|
srv := &http.Server{
|
|
Addr: addr,
|
|
Handler: promhttp.InstrumentMetricHandler(
|
|
prometheus.DefaultRegisterer, promhttp.HandlerFor(
|
|
prometheus.DefaultGatherer,
|
|
promhttp.HandlerOpts{MaxRequestsInFlight: n.config.Instrumentation.MaxOpenConnections},
|
|
),
|
|
),
|
|
}
|
|
go func() {
|
|
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
|
|
// Error starting or closing listener:
|
|
n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
|
|
}
|
|
}()
|
|
return srv
|
|
}
|
|
|
|
// Switch returns the Node's Switch.
|
|
func (n *Node) Switch() *p2p.Switch {
|
|
return n.sw
|
|
}
|
|
|
|
// BlockStore returns the Node's BlockStore.
|
|
func (n *Node) BlockStore() *bc.BlockStore {
|
|
return n.blockStore
|
|
}
|
|
|
|
// ConsensusState returns the Node's ConsensusState.
|
|
func (n *Node) ConsensusState() *cs.ConsensusState {
|
|
return n.consensusState
|
|
}
|
|
|
|
// ConsensusReactor returns the Node's ConsensusReactor.
|
|
func (n *Node) ConsensusReactor() *cs.ConsensusReactor {
|
|
return n.consensusReactor
|
|
}
|
|
|
|
// MempoolReactor returns the Node's MempoolReactor.
|
|
func (n *Node) MempoolReactor() *mempl.MempoolReactor {
|
|
return n.mempoolReactor
|
|
}
|
|
|
|
// EvidencePool returns the Node's EvidencePool.
|
|
func (n *Node) EvidencePool() *evidence.EvidencePool {
|
|
return n.evidencePool
|
|
}
|
|
|
|
// EventBus returns the Node's EventBus.
|
|
func (n *Node) EventBus() *types.EventBus {
|
|
return n.eventBus
|
|
}
|
|
|
|
// PrivValidator returns the Node's PrivValidator.
|
|
// XXX: for convenience only!
|
|
func (n *Node) PrivValidator() types.PrivValidator {
|
|
return n.privValidator
|
|
}
|
|
|
|
// GenesisDoc returns the Node's GenesisDoc.
|
|
func (n *Node) GenesisDoc() *types.GenesisDoc {
|
|
return n.genesisDoc
|
|
}
|
|
|
|
// ProxyApp returns the Node's AppConns, representing its connections to the ABCI application.
|
|
func (n *Node) ProxyApp() proxy.AppConns {
|
|
return n.proxyApp
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
func (n *Node) Listeners() []string {
|
|
return []string{
|
|
fmt.Sprintf("Listener(@%v)", n.config.P2P.ExternalAddress),
|
|
}
|
|
}
|
|
|
|
func (n *Node) IsListening() bool {
|
|
return n.isListening
|
|
}
|
|
|
|
// NodeInfo returns the Node's Info from the Switch.
|
|
func (n *Node) NodeInfo() p2p.NodeInfo {
|
|
return n.nodeInfo
|
|
}
|
|
|
|
func makeNodeInfo(
|
|
config *cfg.Config,
|
|
nodeID p2p.ID,
|
|
txIndexer txindex.TxIndexer,
|
|
chainID string,
|
|
protocolVersion p2p.ProtocolVersion,
|
|
) (p2p.NodeInfo, error) {
|
|
txIndexerStatus := "on"
|
|
if _, ok := txIndexer.(*null.TxIndex); ok {
|
|
txIndexerStatus = "off"
|
|
}
|
|
nodeInfo := p2p.DefaultNodeInfo{
|
|
ProtocolVersion: protocolVersion,
|
|
ID_: nodeID,
|
|
Network: chainID,
|
|
Version: version.TMCoreSemVer,
|
|
Channels: []byte{
|
|
bc.BlockchainChannel,
|
|
cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
|
|
mempl.MempoolChannel,
|
|
evidence.EvidenceChannel,
|
|
},
|
|
Moniker: config.Moniker,
|
|
Other: p2p.DefaultNodeInfoOther{
|
|
TxIndex: txIndexerStatus,
|
|
RPCAddress: config.RPC.ListenAddress,
|
|
},
|
|
}
|
|
|
|
if config.P2P.PexReactor {
|
|
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
|
|
}
|
|
|
|
lAddr := config.P2P.ExternalAddress
|
|
|
|
if lAddr == "" {
|
|
lAddr = config.P2P.ListenAddress
|
|
}
|
|
|
|
nodeInfo.ListenAddr = lAddr
|
|
|
|
err := nodeInfo.Validate()
|
|
return nodeInfo, err
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
var (
|
|
genesisDocKey = []byte("genesisDoc")
|
|
)
|
|
|
|
// panics if failed to unmarshal bytes
|
|
func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) {
|
|
bytes := db.Get(genesisDocKey)
|
|
if len(bytes) == 0 {
|
|
return nil, errors.New("Genesis doc not found")
|
|
}
|
|
var genDoc *types.GenesisDoc
|
|
err := cdc.UnmarshalJSON(bytes, &genDoc)
|
|
if err != nil {
|
|
cmn.PanicCrisis(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, bytes))
|
|
}
|
|
return genDoc, nil
|
|
}
|
|
|
|
// panics if failed to marshal the given genesis document
|
|
func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
|
|
bytes, err := cdc.MarshalJSON(genDoc)
|
|
if err != nil {
|
|
cmn.PanicCrisis(fmt.Sprintf("Failed to save genesis doc due to marshaling error: %v", err))
|
|
}
|
|
db.SetSync(genesisDocKey, bytes)
|
|
}
|
|
|
|
func createAndStartPrivValidatorSocketClient(
|
|
listenAddr string,
|
|
logger log.Logger,
|
|
) (types.PrivValidator, error) {
|
|
var pvsc types.PrivValidator
|
|
|
|
protocol, address := cmn.ProtocolAndAddress(listenAddr)
|
|
switch protocol {
|
|
case "unix":
|
|
pvsc = privval.NewIPCVal(logger.With("module", "privval"), address)
|
|
case "tcp":
|
|
// TODO: persist this key so external signer
|
|
// can actually authenticate us
|
|
pvsc = privval.NewTCPVal(logger.With("module", "privval"), listenAddr, ed25519.GenPrivKey())
|
|
default:
|
|
return nil, fmt.Errorf(
|
|
"Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
|
|
protocol,
|
|
)
|
|
}
|
|
|
|
if pvsc, ok := pvsc.(cmn.Service); ok {
|
|
if err := pvsc.Start(); err != nil {
|
|
return nil, errors.Wrap(err, "failed to start")
|
|
}
|
|
}
|
|
|
|
return pvsc, nil
|
|
}
|
|
|
|
// splitAndTrimEmpty slices s into all subslices separated by sep and returns a
|
|
// slice of the string s with all leading and trailing Unicode code points
|
|
// contained in cutset removed. If sep is empty, SplitAndTrim splits after each
|
|
// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of
|
|
// -1. also filter out empty strings, only return non-empty strings.
|
|
func splitAndTrimEmpty(s, sep, cutset string) []string {
|
|
if s == "" {
|
|
return []string{}
|
|
}
|
|
|
|
spl := strings.Split(s, sep)
|
|
nonEmptyStrings := make([]string, 0, len(spl))
|
|
for i := 0; i < len(spl); i++ {
|
|
element := strings.Trim(spl[i], cutset)
|
|
if element != "" {
|
|
nonEmptyStrings = append(nonEmptyStrings, element)
|
|
}
|
|
}
|
|
return nonEmptyStrings
|
|
}
|