Browse Source

node: refactor node.NewNode (#3456)

The node.NewNode method is pretty complex at the moment, an in order to address issues like #3156, we need to simplify the interface for partial node instantiation. In some places, we don't need to build up a full node (like in the node.TestCreateProposalBlock test), but the complexity of such partial instantiation needs to be reduced.

This PR aims to eventually make this easier/simpler.

See also this gist https://gist.github.com/thanethomson/56e1640d057a26186e38ad678a1d114c for some background work done when starting to refactor here.

## Commits:

* [WIP] Refactor node.NewNode to simplify

The `node.NewNode` method is pretty complex at the moment, an in order
to address issues like #3156, we need to simplify the interface for
partial node instantiation. In some places, we don't need to build up a
full node (like in the `node.TestCreateProposalBlock` test), but the
complexity of such partial instantiation needs to be reduced.

This PR aims to eventually make this easier/simpler.

* Refactor state loading and genesis doc provider into state package

* Refactor for clarity of return parameters

* Fix incorrect capitalization of error messages

* Simplify extracted functions' names

* Document optionally-prefixed functions

* Refactor optionallyFastSync for clarity of separation of concerns

* Restructure function for early return

* Restructure function for early return

* Remove dependence on deprecated panic functions

* refactor code a bit more

plus, expose PEXReactor on node

* align logger names

* add a changelog entry

* align logger names 2

* add a note about PEXReactor returning nil
pull/3597/head
Thane Thomson 6 years ago
committed by Anton Kaliaev
parent
commit
7b162f5c54
4 changed files with 331 additions and 198 deletions
  1. +2
    -1
      CHANGELOG_PENDING.md
  2. +262
    -196
      node/node.go
  3. +2
    -1
      rpc/test/helpers.go
  4. +65
    -0
      state/store.go

+ 2
- 1
CHANGELOG_PENDING.md View File

@ -9,7 +9,8 @@
* Apps * Apps
* Go API * Go API
- [libs/common] Removed PanicSanity, PanicCrisis, PanicConsensus and PanicQ
- [libs/common] Removed `PanicSanity`, `PanicCrisis`, `PanicConsensus` and `PanicQ`
- [node] Moved `GenesisDocProvider` and `DefaultGenesisDocProviderFunc` to state package
* Blockchain Protocol * Blockchain Protocol


+ 262
- 196
node/node.go View File

@ -18,8 +18,10 @@ import (
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/blockchain"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
cs "github.com/tendermint/tendermint/consensus" cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/evidence" "github.com/tendermint/tendermint/evidence"
@ -63,19 +65,6 @@ func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()), nil return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()), nil
} }
// GenesisDocProvider returns a GenesisDoc.
// It allows the GenesisDoc to be pulled from sources other than the
// filesystem, for instance from a distributed key-value store cluster.
type GenesisDocProvider func() (*types.GenesisDoc, error)
// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
// the GenesisDoc from the config.GenesisFile() on the filesystem.
func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
return func() (*types.GenesisDoc, error) {
return types.GenesisDocFromFile(config.GenesisFile())
}
}
// NodeProvider takes a config and a logger and returns a ready to go Node. // NodeProvider takes a config and a logger and returns a ready to go Node.
type NodeProvider func(*cfg.Config, log.Logger) (*Node, error) type NodeProvider func(*cfg.Config, log.Logger) (*Node, error)
@ -96,7 +85,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
if _, err := os.Stat(oldPrivVal); !os.IsNotExist(err) { if _, err := os.Stat(oldPrivVal); !os.IsNotExist(err) {
oldPV, err := privval.LoadOldFilePV(oldPrivVal) oldPV, err := privval.LoadOldFilePV(oldPrivVal)
if err != nil { if err != nil {
return nil, fmt.Errorf("Error reading OldPrivValidator from %v: %v\n", oldPrivVal, err)
return nil, fmt.Errorf("error reading OldPrivValidator from %v: %v\n", oldPrivVal, err)
} }
logger.Info("Upgrading PrivValidator file", logger.Info("Upgrading PrivValidator file",
"old", oldPrivVal, "old", oldPrivVal,
@ -110,7 +99,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
privval.LoadOrGenFilePV(newPrivValKey, newPrivValState), privval.LoadOrGenFilePV(newPrivValKey, newPrivValState),
nodeKey, nodeKey,
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
DefaultGenesisDocProviderFunc(config),
sm.DefaultGenesisDocProviderFunc(config),
DefaultDBProvider, DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation), DefaultMetricsProvider(config.Instrumentation),
logger, logger,
@ -162,6 +151,7 @@ type Node struct {
mempoolReactor *mempl.MempoolReactor // for gossipping transactions mempoolReactor *mempl.MempoolReactor // for gossipping transactions
consensusState *cs.ConsensusState // latest consensus state consensusState *cs.ConsensusState // latest consensus state
consensusReactor *cs.ConsensusReactor // for participating in the consensus consensusReactor *cs.ConsensusReactor // for participating in the consensus
pexReactor *pex.PEXReactor // for exchanging peer addresses
evidencePool *evidence.EvidencePool // tracking evidence evidencePool *evidence.EvidencePool // tracking evidence
proxyApp proxy.AppConns // connection to the application proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers rpcListeners []net.Listener // rpc servers
@ -170,73 +160,49 @@ type Node struct {
prometheusSrv *http.Server prometheusSrv *http.Server
} }
// NewNode returns a new, ready to go, Tendermint Node.
func NewNode(config *cfg.Config,
privValidator types.PrivValidator,
nodeKey *p2p.NodeKey,
clientCreator proxy.ClientCreator,
genesisDocProvider GenesisDocProvider,
dbProvider DBProvider,
metricsProvider MetricsProvider,
logger log.Logger) (*Node, error) {
// Get BlockStore
blockStoreDB, err := dbProvider(&DBContext{"blockstore", config})
if err != nil {
return nil, err
}
blockStore := bc.NewBlockStore(blockStoreDB)
// Get State
stateDB, err := dbProvider(&DBContext{"state", config})
func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *bc.BlockStore, stateDB dbm.DB, err error) {
var blockStoreDB dbm.DB
blockStoreDB, err = dbProvider(&DBContext{"blockstore", config})
if err != nil { if err != nil {
return nil, err
return
} }
blockStore = bc.NewBlockStore(blockStoreDB)
// Get genesis doc
// TODO: move to state package?
genDoc, err := loadGenesisDoc(stateDB)
stateDB, err = dbProvider(&DBContext{"state", config})
if err != nil { if err != nil {
genDoc, err = genesisDocProvider()
if err != nil {
return nil, err
}
// save genesis doc to prevent a certain class of user errors (e.g. when it
// was changed, accidentally or not). Also good for audit trail.
saveGenesisDoc(stateDB, genDoc)
return
} }
state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
if err != nil {
return nil, err
}
return
}
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator) proxyApp := proxy.NewAppConns(clientCreator)
proxyApp.SetLogger(logger.With("module", "proxy")) proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
} }
return proxyApp, nil
}
// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped after it saved the block
// but before it indexed the txs, or, endblocker panicked)
func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
eventBus := types.NewEventBus() eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events")) eventBus.SetLogger(logger.With("module", "events"))
err = eventBus.Start()
if err != nil {
if err := eventBus.Start(); err != nil {
return nil, err return nil, err
} }
return eventBus, nil
}
func createAndStartIndexerService(config *cfg.Config, dbProvider DBProvider,
eventBus *types.EventBus, logger log.Logger) (*txindex.IndexerService, txindex.TxIndexer, error) {
// Transaction indexing
var txIndexer txindex.TxIndexer var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer { switch config.TxIndex.Indexer {
case "kv": case "kv":
store, err := dbProvider(&DBContext{"tx_index", config}) store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil { if err != nil {
return nil, err
return nil, nil, err
} }
if config.TxIndex.IndexTags != "" { if config.TxIndex.IndexTags != "" {
txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " "))) txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
@ -251,26 +217,26 @@ func NewNode(config *cfg.Config,
indexerService := txindex.NewIndexerService(txIndexer, eventBus) indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex")) indexerService.SetLogger(logger.With("module", "txindex"))
err = indexerService.Start()
if err != nil {
return nil, err
if err := indexerService.Start(); err != nil {
return nil, nil, err
} }
return indexerService, txIndexer, nil
}
func doHandshake(stateDB dbm.DB, state sm.State, blockStore sm.BlockStore,
genDoc *types.GenesisDoc, eventBus *types.EventBus, proxyApp proxy.AppConns, consensusLogger log.Logger) error {
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc) handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger) handshaker.SetLogger(consensusLogger)
handshaker.SetEventBus(eventBus) handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil { if err := handshaker.Handshake(proxyApp); err != nil {
return nil, fmt.Errorf("Error during handshake: %v", err)
return fmt.Errorf("error during handshake: %v", err)
} }
return nil
}
// Reload the state. It will have the Version.Consensus.App set by the
// Handshake, and may have other modifications as well (ie. depending on
// what happened during block replay).
state = sm.LoadState(stateDB)
func logNodeStartupInfo(state sm.State, privValidator types.PrivValidator, logger,
consensusLogger log.Logger) {
// Log the version info. // Log the version info.
logger.Info("Version info", logger.Info("Version info",
@ -287,27 +253,6 @@ func NewNode(config *cfg.Config,
) )
} }
if config.PrivValidatorListenAddr != "" {
// If an address is provided, listen on the socket for a connection from an
// external signing process.
// FIXME: we should start services inside OnStart
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger)
if err != nil {
return nil, errors.Wrap(err, "Error with private validator socket client")
}
}
// Decide whether to fast-sync or not
// We don't fast-sync when the only validator is us.
fastSync := config.FastSync
if state.Validators.Size() == 1 {
addr, _ := state.Validators.GetByIndex(0)
privValAddr := privValidator.GetPubKey().Address()
if bytes.Equal(privValAddr, addr) {
fastSync = false
}
}
pubKey := privValidator.GetPubKey() pubKey := privValidator.GetPubKey()
addr := pubKey.Address() addr := pubKey.Address()
// Log whether this node is a validator or an observer // Log whether this node is a validator or an observer
@ -316,10 +261,19 @@ func NewNode(config *cfg.Config,
} else { } else {
consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey) consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
} }
}
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
func onlyValidatorIsUs(state sm.State, privVal types.PrivValidator) bool {
if state.Validators.Size() > 1 {
return false
}
addr, _ := state.Validators.GetByIndex(0)
return bytes.Equal(privVal.GetPubKey().Address(), addr)
}
func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
state sm.State, memplMetrics *mempl.Metrics, logger log.Logger) (*mempl.MempoolReactor, *mempl.Mempool) {
// Make MempoolReactor
mempool := mempl.NewMempool( mempool := mempl.NewMempool(
config.Mempool, config.Mempool,
proxyApp.Mempool(), proxyApp.Mempool(),
@ -339,34 +293,36 @@ func NewNode(config *cfg.Config,
if config.Consensus.WaitForTxs() { if config.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable() mempool.EnableTxsAvailable()
} }
return mempoolReactor, mempool
}
func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider,
stateDB dbm.DB, logger log.Logger) (*evidence.EvidenceReactor, *evidence.EvidencePool, error) {
// Make Evidence Reactor
evidenceDB, err := dbProvider(&DBContext{"evidence", config}) evidenceDB, err := dbProvider(&DBContext{"evidence", config})
if err != nil { if err != nil {
return nil, err
return nil, nil, err
} }
evidenceLogger := logger.With("module", "evidence") evidenceLogger := logger.With("module", "evidence")
evidencePool := evidence.NewEvidencePool(stateDB, evidenceDB) evidencePool := evidence.NewEvidencePool(stateDB, evidenceDB)
evidencePool.SetLogger(evidenceLogger) evidencePool.SetLogger(evidenceLogger)
evidenceReactor := evidence.NewEvidenceReactor(evidencePool) evidenceReactor := evidence.NewEvidenceReactor(evidencePool)
evidenceReactor.SetLogger(evidenceLogger) evidenceReactor.SetLogger(evidenceLogger)
return evidenceReactor, evidencePool, nil
}
blockExecLogger := logger.With("module", "state")
// make block executor for consensus and blockchain reactors to execute blocks
blockExec := sm.NewBlockExecutor(
stateDB,
blockExecLogger,
proxyApp.Consensus(),
mempool,
evidencePool,
sm.BlockExecutorWithMetrics(smMetrics),
)
// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
func createConsensusReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mempool *mempl.Mempool,
evidencePool *evidence.EvidencePool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
fastSync bool,
eventBus *types.EventBus,
consensusLogger log.Logger) (*consensus.ConsensusReactor, *consensus.ConsensusState) {
// Make ConsensusReactor
consensusState := cs.NewConsensusState( consensusState := cs.NewConsensusState(
config.Consensus, config.Consensus,
state.Copy(), state.Copy(),
@ -382,28 +338,13 @@ func NewNode(config *cfg.Config,
} }
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics)) consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics))
consensusReactor.SetLogger(consensusLogger) consensusReactor.SetLogger(consensusLogger)
// services which will be publishing and/or subscribing for messages (events) // services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor // consensusReactor will set it on consensusState and blockExecutor
consensusReactor.SetEventBus(eventBus) consensusReactor.SetEventBus(eventBus)
return consensusReactor, consensusState
}
p2pLogger := logger.With("module", "p2p")
nodeInfo, err := makeNodeInfo(
config,
nodeKey.ID(),
txIndexer,
genDoc.ChainID,
p2p.NewProtocolVersion(
version.P2PProtocol, // global
state.Version.Consensus.Block,
state.Version.Consensus.App,
),
)
if err != nil {
return nil, err
}
// Setup Transport.
func createTransport(config *cfg.Config, nodeInfo p2p.NodeInfo, nodeKey *p2p.NodeKey, proxyApp proxy.AppConns) (*p2p.MultiplexTransport, []p2p.PeerFilterFunc) {
var ( var (
mConnConfig = p2p.MConnConfig(config.P2P) mConnConfig = p2p.MConnConfig(config.P2P)
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig) transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig)
@ -429,7 +370,7 @@ func NewNode(config *cfg.Config,
return err return err
} }
if res.IsErr() { if res.IsErr() {
return fmt.Errorf("Error querying abci app: %v", res)
return fmt.Errorf("error querying abci app: %v", res)
} }
return nil return nil
@ -447,7 +388,7 @@ func NewNode(config *cfg.Config,
return err return err
} }
if res.IsErr() { if res.IsErr() {
return fmt.Errorf("Error querying abci app: %v", res)
return fmt.Errorf("error querying abci app: %v", res)
} }
return nil return nil
@ -456,8 +397,21 @@ func NewNode(config *cfg.Config,
} }
p2p.MultiplexTransportConnFilters(connFilters...)(transport) p2p.MultiplexTransportConnFilters(connFilters...)(transport)
return transport, peerFilters
}
func createSwitch(config *cfg.Config,
transport *p2p.MultiplexTransport,
p2pMetrics *p2p.Metrics,
peerFilters []p2p.PeerFilterFunc,
mempoolReactor *mempl.MempoolReactor,
bcReactor *blockchain.BlockchainReactor,
consensusReactor *consensus.ConsensusReactor,
evidenceReactor *evidence.EvidenceReactor,
nodeInfo p2p.NodeInfo,
nodeKey *p2p.NodeKey,
p2pLogger log.Logger) *p2p.Switch {
// Setup Switch.
sw := p2p.NewSwitch( sw := p2p.NewSwitch(
config.P2P, config.P2P,
transport, transport,
@ -473,6 +427,159 @@ func NewNode(config *cfg.Config,
sw.SetNodeKey(nodeKey) sw.SetNodeKey(nodeKey)
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile()) p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
return sw
}
func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
p2pLogger log.Logger) pex.AddrBook {
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
// Add ourselves to addrbook to prevent dialing ourselves
addrBook.AddOurAddress(sw.NetAddress())
sw.SetAddrBook(addrBook)
return addrBook
}
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
sw *p2p.Switch, logger log.Logger) *pex.PEXReactor {
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewPEXReactor(addrBook,
&pex.PEXReactorConfig{
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.P2P.SeedMode,
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
// blocks assuming 10s blocks ~ 28 hours.
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
})
pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor)
return pexReactor
}
// NewNode returns a new, ready to go, Tendermint Node.
func NewNode(config *cfg.Config,
privValidator types.PrivValidator,
nodeKey *p2p.NodeKey,
clientCreator proxy.ClientCreator,
genesisDocProvider sm.GenesisDocProvider,
dbProvider DBProvider,
metricsProvider MetricsProvider,
logger log.Logger) (*Node, error) {
blockStore, stateDB, err := initDBs(config, dbProvider)
if err != nil {
return nil, err
}
state, genDoc, err := sm.LoadStateFromDBOrGenesisDocProvider(stateDB, genesisDocProvider)
if err != nil {
return nil, err
}
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger)
if err != nil {
return nil, err
}
// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped after it saved the block
// but before it indexed the txs, or, endblocker panicked)
eventBus, err := createAndStartEventBus(logger)
if err != nil {
return nil, err
}
// Transaction indexing
indexerService, txIndexer, err := createAndStartIndexerService(config, dbProvider, eventBus, logger)
if err != nil {
return nil, err
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
if err := doHandshake(stateDB, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, err
}
// Reload the state. It will have the Version.Consensus.App set by the
// Handshake, and may have other modifications as well (ie. depending on
// what happened during block replay).
state = sm.LoadState(stateDB)
// If an address is provided, listen on the socket for a connection from an
// external signing process.
if config.PrivValidatorListenAddr != "" {
// FIXME: we should start services inside OnStart
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger)
if err != nil {
return nil, errors.Wrap(err, "error with private validator socket client")
}
}
logNodeStartupInfo(state, privValidator, logger, consensusLogger)
// Decide whether to fast-sync or not
// We don't fast-sync when the only validator is us.
fastSync := config.FastSync && !onlyValidatorIsUs(state, privValidator)
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
// Make MempoolReactor
mempoolReactor, mempool := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)
// Make Evidence Reactor
evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, logger)
if err != nil {
return nil, err
}
// make block executor for consensus and blockchain reactors to execute blocks
blockExec := sm.NewBlockExecutor(
stateDB,
logger.With("module", "state"),
proxyApp.Consensus(),
mempool,
evidencePool,
sm.BlockExecutorWithMetrics(smMetrics),
)
// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
// Make ConsensusReactor
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, fastSync, eventBus, consensusLogger,
)
nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state)
if err != nil {
return nil, err
}
// Setup Transport.
transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp)
// Setup Switch.
p2pLogger := logger.With("module", "p2p")
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
)
addrBook := createAddrBookAndSetOnSwitch(config, sw, p2pLogger)
// Optionally, start the pex reactor // Optionally, start the pex reactor
// //
@ -486,37 +593,13 @@ func NewNode(config *cfg.Config,
// //
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
// Note we currently use the addrBook regardless at least for AddOurAddress // Note we currently use the addrBook regardless at least for AddOurAddress
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
// Add ourselves to addrbook to prevent dialing ourselves
addrBook.AddOurAddress(sw.NetAddress())
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
var pexReactor *pex.PEXReactor
if config.P2P.PexReactor { if config.P2P.PexReactor {
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewPEXReactor(addrBook,
&pex.PEXReactorConfig{
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.P2P.SeedMode,
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
// blocks assuming 10s blocks ~ 28 hours.
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
})
pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor)
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
} }
sw.SetAddrBook(addrBook)
// run the profile server
profileHost := config.ProfListenAddress
if profileHost != "" {
go func() {
logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil))
}()
if config.ProfListenAddress != "" {
go logger.Error("Profile server", "err", http.ListenAndServe(config.ProfListenAddress, nil))
} }
node := &Node{ node := &Node{
@ -536,6 +619,7 @@ func NewNode(config *cfg.Config,
mempoolReactor: mempoolReactor, mempoolReactor: mempoolReactor,
consensusState: consensusState, consensusState: consensusState,
consensusReactor: consensusReactor, consensusReactor: consensusReactor,
pexReactor: pexReactor,
evidencePool: evidencePool, evidencePool: evidencePool,
proxyApp: proxyApp, proxyApp: proxyApp,
txIndexer: txIndexer, txIndexer: txIndexer,
@ -804,6 +888,11 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor {
return n.mempoolReactor return n.mempoolReactor
} }
// PEXReactor returns the Node's PEXReactor. It returns nil if PEX is disabled.
func (n *Node) PEXReactor() *pex.PEXReactor {
return n.pexReactor
}
// EvidencePool returns the Node's EvidencePool. // EvidencePool returns the Node's EvidencePool.
func (n *Node) EvidencePool() *evidence.EvidencePool { func (n *Node) EvidencePool() *evidence.EvidencePool {
return n.evidencePool return n.evidencePool
@ -854,20 +943,24 @@ func (n *Node) NodeInfo() p2p.NodeInfo {
func makeNodeInfo( func makeNodeInfo(
config *cfg.Config, config *cfg.Config,
nodeID p2p.ID,
nodeKey *p2p.NodeKey,
txIndexer txindex.TxIndexer, txIndexer txindex.TxIndexer,
chainID string,
protocolVersion p2p.ProtocolVersion,
genDoc *types.GenesisDoc,
state sm.State,
) (p2p.NodeInfo, error) { ) (p2p.NodeInfo, error) {
txIndexerStatus := "on" txIndexerStatus := "on"
if _, ok := txIndexer.(*null.TxIndex); ok { if _, ok := txIndexer.(*null.TxIndex); ok {
txIndexerStatus = "off" txIndexerStatus = "off"
} }
nodeInfo := p2p.DefaultNodeInfo{ nodeInfo := p2p.DefaultNodeInfo{
ProtocolVersion: protocolVersion,
ID_: nodeID,
Network: chainID,
Version: version.TMCoreSemVer,
ProtocolVersion: p2p.NewProtocolVersion(
version.P2PProtocol, // global
state.Version.Consensus.Block,
state.Version.Consensus.App,
),
ID_: nodeKey.ID(),
Network: genDoc.ChainID,
Version: version.TMCoreSemVer,
Channels: []byte{ Channels: []byte{
bc.BlockchainChannel, bc.BlockchainChannel,
cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel, cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
@ -899,33 +992,6 @@ func makeNodeInfo(
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
var (
genesisDocKey = []byte("genesisDoc")
)
// panics if failed to unmarshal bytes
func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) {
bytes := db.Get(genesisDocKey)
if len(bytes) == 0 {
return nil, errors.New("Genesis doc not found")
}
var genDoc *types.GenesisDoc
err := cdc.UnmarshalJSON(bytes, &genDoc)
if err != nil {
panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, bytes))
}
return genDoc, nil
}
// panics if failed to marshal the given genesis document
func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
bytes, err := cdc.MarshalJSON(genDoc)
if err != nil {
panic(fmt.Sprintf("Failed to save genesis doc due to marshaling error: %v", err))
}
db.SetSync(genesisDocKey, bytes)
}
func createAndStartPrivValidatorSocketClient( func createAndStartPrivValidatorSocketClient(
listenAddr string, listenAddr string,
logger log.Logger, logger log.Logger,
@ -946,7 +1012,7 @@ func createAndStartPrivValidatorSocketClient(
listener = privval.NewTCPListener(ln, ed25519.GenPrivKey()) listener = privval.NewTCPListener(ln, ed25519.GenPrivKey())
default: default:
return nil, fmt.Errorf( return nil, fmt.Errorf(
"Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
"wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
protocol, protocol,
) )
} }


+ 2
- 1
rpc/test/helpers.go View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/state"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
@ -166,7 +167,7 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node {
panic(err) panic(err)
} }
node, err := nm.NewNode(config, pv, nodeKey, papp, node, err := nm.NewNode(config, pv, nodeKey, papp,
nm.DefaultGenesisDocProviderFunc(config),
state.DefaultGenesisDocProviderFunc(config),
nm.DefaultDBProvider, nm.DefaultDBProvider,
nm.DefaultMetricsProvider(config.Instrumentation), nm.DefaultMetricsProvider(config.Instrumentation),
logger) logger)


+ 65
- 0
state/store.go View File

@ -1,9 +1,11 @@
package state package state
import ( import (
"errors"
"fmt" "fmt"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -17,6 +19,23 @@ const (
valSetCheckpointInterval = 100000 valSetCheckpointInterval = 100000
) )
var (
genesisDocKey = []byte("genesisDoc")
)
// GenesisDocProvider returns a GenesisDoc.
// It allows the GenesisDoc to be pulled from sources other than the
// filesystem, for instance from a distributed key-value store cluster.
type GenesisDocProvider func() (*types.GenesisDoc, error)
// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
// the GenesisDoc from the config.GenesisFile() on the filesystem.
func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
return func() (*types.GenesisDoc, error) {
return types.GenesisDocFromFile(config.GenesisFile())
}
}
//------------------------------------------------------------------------ //------------------------------------------------------------------------
func calcValidatorsKey(height int64) []byte { func calcValidatorsKey(height int64) []byte {
@ -65,6 +84,52 @@ func LoadStateFromDBOrGenesisDoc(stateDB dbm.DB, genesisDoc *types.GenesisDoc) (
return state, nil return state, nil
} }
// LoadStateFromDBOrGenesisDocProvider attempts to load the state from the
// database, or creates one using the given genesisDocProvider and persists the
// result to the database. On success this also returns the genesis doc loaded
// through the given provider.
func LoadStateFromDBOrGenesisDocProvider(stateDB dbm.DB, genesisDocProvider GenesisDocProvider) (State, *types.GenesisDoc, error) {
// Get genesis doc
genDoc, err := loadGenesisDoc(stateDB)
if err != nil {
genDoc, err = genesisDocProvider()
if err != nil {
return State{}, nil, err
}
// save genesis doc to prevent a certain class of user errors (e.g. when it
// was changed, accidentally or not). Also good for audit trail.
saveGenesisDoc(stateDB, genDoc)
}
state, err := LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
if err != nil {
return State{}, nil, err
}
return state, genDoc, nil
}
// panics if failed to unmarshal bytes
func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) {
bytes := db.Get(genesisDocKey)
if len(bytes) == 0 {
return nil, errors.New("Genesis doc not found")
}
var genDoc *types.GenesisDoc
err := cdc.UnmarshalJSON(bytes, &genDoc)
if err != nil {
panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, bytes))
}
return genDoc, nil
}
// panics if failed to marshal the given genesis document
func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
bytes, err := cdc.MarshalJSON(genDoc)
if err != nil {
panic(fmt.Sprintf("Failed to save genesis doc due to marshaling error: %v", err))
}
db.SetSync(genesisDocKey, bytes)
}
// LoadState loads the State from the database. // LoadState loads the State from the database.
func LoadState(db dbm.DB) State { func LoadState(db dbm.DB) State {
return loadState(db, stateKey) return loadState(db, stateKey)


Loading…
Cancel
Save