You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

751 lines
20 KiB

package node
import (
"bytes"
"context"
"fmt"
"math"
"net"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
"time"
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
bcv0 "github.com/tendermint/tendermint/blockchain/v0"
bcv2 "github.com/tendermint/tendermint/blockchain/v2"
cfg "github.com/tendermint/tendermint/config"
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/evidence"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/libs/strings"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/pex"
protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null"
"github.com/tendermint/tendermint/state/indexer/tx/kv"
"github.com/tendermint/tendermint/state/indexer/tx/null"
"github.com/tendermint/tendermint/statesync"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
)
func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
var blockStoreDB dbm.DB
blockStoreDB, err = dbProvider(&DBContext{"blockstore", config})
if err != nil {
return
}
blockStore = store.NewBlockStore(blockStoreDB)
stateDB, err = dbProvider(&DBContext{"state", config})
return
}
func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator)
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}
return proxyApp, nil
}
func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return nil, err
}
return eventBus, nil
}
func createAndStartIndexerService(
config *cfg.Config,
dbProvider DBProvider,
eventBus *types.EventBus,
logger log.Logger,
) (*indexer.Service, indexer.TxIndexer, indexer.BlockIndexer, error) {
var (
txIndexer indexer.TxIndexer
blockIndexer indexer.BlockIndexer
)
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, nil, nil, err
}
txIndexer = kv.NewTxIndex(store)
blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events")))
default:
txIndexer = &null.TxIndex{}
blockIndexer = &blockidxnull.BlockerIndexer{}
}
indexerService := indexer.NewIndexerService(txIndexer, blockIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
if err := indexerService.Start(); err != nil {
return nil, nil, nil, err
}
return indexerService, txIndexer, blockIndexer, nil
}
func doHandshake(
stateStore sm.Store,
state sm.State,
blockStore sm.BlockStore,
genDoc *types.GenesisDoc,
eventBus types.BlockEventPublisher,
proxyApp proxy.AppConns,
consensusLogger log.Logger) error {
handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil {
return fmt.Errorf("error during handshake: %v", err)
}
return nil
}
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) {
// Log the version info.
logger.Info("Version info",
"software", version.TMCoreSemVer,
"block", version.BlockProtocol,
"p2p", version.P2PProtocol,
"mode", mode,
)
// If the state and software differ in block version, at least log it.
if state.Version.Consensus.Block != version.BlockProtocol {
logger.Info("Software and state have different block protocols",
"software", version.BlockProtocol,
"state", state.Version.Consensus.Block,
)
}
switch {
case mode == cfg.ModeFull:
consensusLogger.Info("This node is a fullnode")
case mode == cfg.ModeValidator:
addr := pubKey.Address()
// Log whether this node is a validator or an observer
if state.Validators.HasAddress(addr) {
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
} else {
consensusLogger.Info("This node is a validator (NOT in the active validator set)",
"addr", addr, "pubKey", pubKey.Bytes())
}
}
}
func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
if state.Validators.Size() > 1 {
return false
}
addr, _ := state.Validators.GetByIndex(0)
return pubKey != nil && bytes.Equal(pubKey.Address(), addr)
}
func createMempoolReactor(
config *cfg.Config,
proxyApp proxy.AppConns,
state sm.State,
memplMetrics *mempl.Metrics,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, *mempl.Reactor, *mempl.CListMempool) {
logger = logger.With("module", "mempool")
mempool := mempl.NewCListMempool(
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)
channelShims := mempl.GetChannelShims(config.Mempool)
reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, channelShims)
peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
}
reactor := mempl.NewReactor(
logger,
config.Mempool,
peerManager,
mempool,
channels[mempl.MempoolChannel],
peerUpdates,
)
if config.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}
return reactorShim, reactor, mempool
}
func createEvidenceReactor(
config *cfg.Config,
dbProvider DBProvider,
stateDB dbm.DB,
blockStore *store.BlockStore,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := dbProvider(&DBContext{"evidence", config})
if err != nil {
return nil, nil, nil, err
}
logger = logger.With("module", "evidence")
reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
if err != nil {
return nil, nil, nil, err
}
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, evidence.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
}
evidenceReactor := evidence.NewReactor(
logger,
channels[evidence.EvidenceChannel],
peerUpdates,
evidencePool,
)
return reactorShim, evidenceReactor, evidencePool, nil
}
func createBlockchainReactor(
logger log.Logger,
config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
csReactor *cs.Reactor,
peerManager *p2p.PeerManager,
router *p2p.Router,
fastSync bool,
) (*p2p.ReactorShim, service.Service, error) {
logger = logger.With("module", "blockchain")
switch config.FastSync.Version {
case cfg.BlockchainV0:
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, bcv0.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
}
reactor, err := bcv0.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor,
channels[bcv0.BlockchainChannel], peerUpdates, fastSync,
)
if err != nil {
return nil, nil, err
}
return reactorShim, reactor, nil
case cfg.BlockchainV2:
reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
reactor.SetLogger(logger)
return nil, reactor, nil
default:
return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}
}
func createConsensusReactor(
config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mempool *mempl.CListMempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
waitSync bool,
eventBus *types.EventBus,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, *cs.Reactor, *cs.State) {
consensusState := cs.NewState(
config.Consensus,
state.Copy(),
blockExec,
blockStore,
mempool,
evidencePool,
cs.StateMetrics(csMetrics),
)
consensusState.SetLogger(logger)
if privValidator != nil && config.Mode == cfg.ModeValidator {
consensusState.SetPrivValidator(privValidator)
}
reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if config.P2P.DisableLegacy {
channels = makeChannelsFromShims(router, cs.ChannelShims)
peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
}
reactor := cs.NewReactor(
logger,
consensusState,
channels[cs.StateChannel],
channels[cs.DataChannel],
channels[cs.VoteChannel],
channels[cs.VoteSetBitsChannel],
peerUpdates,
waitSync,
cs.ReactorMetrics(csMetrics),
)
// Services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor.
reactor.SetEventBus(eventBus)
return reactorShim, reactor, consensusState
}
func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport {
return p2p.NewMConnTransport(
logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{
MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers +
len(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")),
),
},
)
}
func createPeerManager(
config *cfg.Config,
dbProvider DBProvider,
p2pLogger log.Logger,
nodeID p2p.NodeID,
) (*p2p.PeerManager, error) {
var maxConns uint16
switch {
case config.P2P.MaxConnections > 0:
maxConns = config.P2P.MaxConnections
case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0:
x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers
if x > math.MaxUint16 {
return nil, fmt.Errorf(
"max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)",
config.P2P.MaxNumInboundPeers,
config.P2P.MaxNumOutboundPeers,
math.MaxUint16,
)
}
maxConns = uint16(x)
default:
maxConns = 64
}
privatePeerIDs := make(map[p2p.NodeID]struct{})
for _, id := range strings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") {
privatePeerIDs[p2p.NodeID(id)] = struct{}{}
}
options := p2p.PeerManagerOptions{
MaxConnected: maxConns,
MaxConnectedUpgrade: 4,
MaxPeers: 1000,
MinRetryTime: 100 * time.Millisecond,
MaxRetryTime: 8 * time.Hour,
MaxRetryTimePersistent: 5 * time.Minute,
RetryTimeJitter: 3 * time.Second,
PrivatePeers: privatePeerIDs,
}
peers := []p2p.NodeAddress{}
for _, p := range strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
}
peers = append(peers, address)
options.PersistentPeers = append(options.PersistentPeers, address.NodeID)
}
for _, p := range strings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
}
peers = append(peers, address)
}
peerDB, err := dbProvider(&DBContext{"peerstore", config})
if err != nil {
return nil, err
}
peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
}
for _, peer := range peers {
if _, err := peerManager.Add(peer); err != nil {
return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
}
}
return peerManager, nil
}
func createRouter(
p2pLogger log.Logger,
p2pMetrics *p2p.Metrics,
nodeInfo p2p.NodeInfo,
privKey crypto.PrivKey,
peerManager *p2p.PeerManager,
transport p2p.Transport,
options p2p.RouterOptions,
) (*p2p.Router, error) {
return p2p.NewRouter(
p2pLogger,
p2pMetrics,
nodeInfo,
privKey,
peerManager,
[]p2p.Transport{transport},
options,
)
}
func createSwitch(
config *cfg.Config,
transport p2p.Transport,
p2pMetrics *p2p.Metrics,
mempoolReactor *p2p.ReactorShim,
bcReactor p2p.Reactor,
stateSyncReactor *p2p.ReactorShim,
consensusReactor *p2p.ReactorShim,
evidenceReactor *p2p.ReactorShim,
proxyApp proxy.AppConns,
nodeInfo p2p.NodeInfo,
nodeKey p2p.NodeKey,
p2pLogger log.Logger,
) *p2p.Switch {
var (
connFilters = []p2p.ConnFilterFunc{}
peerFilters = []p2p.PeerFilterFunc{}
)
if !config.P2P.AllowDuplicateIP {
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
}
// Filter peers by addr or pubkey with an ABCI query.
// If the query return code is OK, add peer.
if config.FilterPeers {
connFilters = append(
connFilters,
// ABCI query for address filtering.
func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}
return nil
},
)
peerFilters = append(
peerFilters,
// ABCI query for ID filtering.
func(_ p2p.IPeerSet, p p2p.Peer) error {
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}
return nil
},
)
}
sw := p2p.NewSwitch(
config.P2P,
transport,
p2p.WithMetrics(p2pMetrics),
p2p.SwitchPeerFilters(peerFilters...),
p2p.SwitchConnFilters(connFilters...),
)
sw.SetLogger(p2pLogger)
if config.Mode != cfg.ModeSeed {
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.AddReactor("STATESYNC", stateSyncReactor)
}
sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile())
return sw
}
func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
p2pLogger log.Logger, nodeKey p2p.NodeKey) (pex.AddrBook, error) {
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
// Add ourselves to addrbook to prevent dialing ourselves
if config.P2P.ExternalAddress != "" {
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ExternalAddress))
if err != nil {
return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
}
addrBook.AddOurAddress(addr)
}
if config.P2P.ListenAddress != "" {
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ListenAddress))
if err != nil {
return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
}
addrBook.AddOurAddress(addr)
}
sw.SetAddrBook(addrBook)
return addrBook, nil
}
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
sw *p2p.Switch, logger log.Logger) *pex.Reactor {
reactorConfig := &pex.ReactorConfig{
Seeds: strings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.Mode == cfg.ModeSeed,
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
// blocks assuming 10s blocks ~ 28 hours.
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
}
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewReactor(addrBook, reactorConfig)
pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor)
return pexReactor
}
func createPEXReactorV2(
config *cfg.Config,
logger log.Logger,
peerManager *p2p.PeerManager,
router *p2p.Router,
) (*pex.ReactorV2, error) {
channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 4096)
if err != nil {
return nil, err
}
peerUpdates := peerManager.Subscribe()
return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil
}
func makeNodeInfo(
config *cfg.Config,
nodeKey p2p.NodeKey,
txIndexer indexer.TxIndexer,
genDoc *types.GenesisDoc,
state sm.State,
) (p2p.NodeInfo, error) {
txIndexerStatus := "on"
if _, ok := txIndexer.(*null.TxIndex); ok {
txIndexerStatus = "off"
}
var bcChannel byte
switch config.FastSync.Version {
case cfg.BlockchainV0:
bcChannel = byte(bcv0.BlockchainChannel)
case cfg.BlockchainV2:
bcChannel = bcv2.BlockchainChannel
default:
return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}
nodeInfo := p2p.NodeInfo{
ProtocolVersion: p2p.NewProtocolVersion(
version.P2PProtocol, // global
state.Version.Consensus.Block,
state.Version.Consensus.App,
),
NodeID: nodeKey.ID,
Network: genDoc.ChainID,
Version: version.TMCoreSemVer,
Channels: []byte{
bcChannel,
byte(cs.StateChannel),
byte(cs.DataChannel),
byte(cs.VoteChannel),
byte(cs.VoteSetBitsChannel),
byte(mempl.MempoolChannel),
byte(evidence.EvidenceChannel),
byte(statesync.SnapshotChannel),
byte(statesync.ChunkChannel),
},
Moniker: config.Moniker,
Other: p2p.NodeInfoOther{
TxIndex: txIndexerStatus,
RPCAddress: config.RPC.ListenAddress,
},
}
if config.P2P.PexReactor {
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
lAddr := config.P2P.ExternalAddress
if lAddr == "" {
lAddr = config.P2P.ListenAddress
}
nodeInfo.ListenAddr = lAddr
err := nodeInfo.Validate()
return nodeInfo, err
}
func makeSeedNodeInfo(
config *cfg.Config,
nodeKey p2p.NodeKey,
genDoc *types.GenesisDoc,
state sm.State,
) (p2p.NodeInfo, error) {
nodeInfo := p2p.NodeInfo{
ProtocolVersion: p2p.NewProtocolVersion(
version.P2PProtocol, // global
state.Version.Consensus.Block,
state.Version.Consensus.App,
),
NodeID: nodeKey.ID,
Network: genDoc.ChainID,
Version: version.TMCoreSemVer,
Channels: []byte{},
Moniker: config.Moniker,
Other: p2p.NodeInfoOther{
TxIndex: "off",
RPCAddress: config.RPC.ListenAddress,
},
}
if config.P2P.PexReactor {
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
lAddr := config.P2P.ExternalAddress
if lAddr == "" {
lAddr = config.P2P.ListenAddress
}
nodeInfo.ListenAddr = lAddr
err := nodeInfo.Validate()
return nodeInfo, err
}