|
|
@ -0,0 +1,751 @@ |
|
|
|
package node |
|
|
|
|
|
|
|
import ( |
|
|
|
"bytes" |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"math" |
|
|
|
"net" |
|
|
|
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
|
|
|
|
"time" |
|
|
|
|
|
|
|
dbm "github.com/tendermint/tm-db" |
|
|
|
|
|
|
|
abci "github.com/tendermint/tendermint/abci/types" |
|
|
|
bcv0 "github.com/tendermint/tendermint/blockchain/v0" |
|
|
|
bcv2 "github.com/tendermint/tendermint/blockchain/v2" |
|
|
|
cfg "github.com/tendermint/tendermint/config" |
|
|
|
cs "github.com/tendermint/tendermint/consensus" |
|
|
|
"github.com/tendermint/tendermint/crypto" |
|
|
|
"github.com/tendermint/tendermint/evidence" |
|
|
|
"github.com/tendermint/tendermint/libs/log" |
|
|
|
"github.com/tendermint/tendermint/libs/service" |
|
|
|
"github.com/tendermint/tendermint/libs/strings" |
|
|
|
mempl "github.com/tendermint/tendermint/mempool" |
|
|
|
"github.com/tendermint/tendermint/p2p" |
|
|
|
"github.com/tendermint/tendermint/p2p/pex" |
|
|
|
protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p" |
|
|
|
"github.com/tendermint/tendermint/proxy" |
|
|
|
sm "github.com/tendermint/tendermint/state" |
|
|
|
"github.com/tendermint/tendermint/state/indexer" |
|
|
|
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" |
|
|
|
blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null" |
|
|
|
"github.com/tendermint/tendermint/state/indexer/tx/kv" |
|
|
|
"github.com/tendermint/tendermint/state/indexer/tx/null" |
|
|
|
"github.com/tendermint/tendermint/statesync" |
|
|
|
"github.com/tendermint/tendermint/store" |
|
|
|
"github.com/tendermint/tendermint/types" |
|
|
|
"github.com/tendermint/tendermint/version" |
|
|
|
) |
|
|
|
|
|
|
|
func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { |
|
|
|
var blockStoreDB dbm.DB |
|
|
|
blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) |
|
|
|
if err != nil { |
|
|
|
return |
|
|
|
} |
|
|
|
blockStore = store.NewBlockStore(blockStoreDB) |
|
|
|
|
|
|
|
stateDB, err = dbProvider(&DBContext{"state", config}) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) { |
|
|
|
proxyApp := proxy.NewAppConns(clientCreator) |
|
|
|
proxyApp.SetLogger(logger.With("module", "proxy")) |
|
|
|
if err := proxyApp.Start(); err != nil { |
|
|
|
return nil, fmt.Errorf("error starting proxy app connections: %v", err) |
|
|
|
} |
|
|
|
return proxyApp, nil |
|
|
|
} |
|
|
|
|
|
|
|
func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { |
|
|
|
eventBus := types.NewEventBus() |
|
|
|
eventBus.SetLogger(logger.With("module", "events")) |
|
|
|
if err := eventBus.Start(); err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
return eventBus, nil |
|
|
|
} |
|
|
|
|
|
|
|
func createAndStartIndexerService( |
|
|
|
config *cfg.Config, |
|
|
|
dbProvider DBProvider, |
|
|
|
eventBus *types.EventBus, |
|
|
|
logger log.Logger, |
|
|
|
) (*indexer.Service, indexer.TxIndexer, indexer.BlockIndexer, error) { |
|
|
|
|
|
|
|
var ( |
|
|
|
txIndexer indexer.TxIndexer |
|
|
|
blockIndexer indexer.BlockIndexer |
|
|
|
) |
|
|
|
|
|
|
|
switch config.TxIndex.Indexer { |
|
|
|
case "kv": |
|
|
|
store, err := dbProvider(&DBContext{"tx_index", config}) |
|
|
|
if err != nil { |
|
|
|
return nil, nil, nil, err |
|
|
|
} |
|
|
|
|
|
|
|
txIndexer = kv.NewTxIndex(store) |
|
|
|
blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))) |
|
|
|
default: |
|
|
|
txIndexer = &null.TxIndex{} |
|
|
|
blockIndexer = &blockidxnull.BlockerIndexer{} |
|
|
|
} |
|
|
|
|
|
|
|
indexerService := indexer.NewIndexerService(txIndexer, blockIndexer, eventBus) |
|
|
|
indexerService.SetLogger(logger.With("module", "txindex")) |
|
|
|
|
|
|
|
if err := indexerService.Start(); err != nil { |
|
|
|
return nil, nil, nil, err |
|
|
|
} |
|
|
|
|
|
|
|
return indexerService, txIndexer, blockIndexer, nil |
|
|
|
} |
|
|
|
|
|
|
|
func doHandshake( |
|
|
|
stateStore sm.Store, |
|
|
|
state sm.State, |
|
|
|
blockStore sm.BlockStore, |
|
|
|
genDoc *types.GenesisDoc, |
|
|
|
eventBus types.BlockEventPublisher, |
|
|
|
proxyApp proxy.AppConns, |
|
|
|
consensusLogger log.Logger) error { |
|
|
|
|
|
|
|
handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc) |
|
|
|
handshaker.SetLogger(consensusLogger) |
|
|
|
handshaker.SetEventBus(eventBus) |
|
|
|
if err := handshaker.Handshake(proxyApp); err != nil { |
|
|
|
return fmt.Errorf("error during handshake: %v", err) |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) { |
|
|
|
// Log the version info.
|
|
|
|
logger.Info("Version info", |
|
|
|
"software", version.TMCoreSemVer, |
|
|
|
"block", version.BlockProtocol, |
|
|
|
"p2p", version.P2PProtocol, |
|
|
|
"mode", mode, |
|
|
|
) |
|
|
|
|
|
|
|
// If the state and software differ in block version, at least log it.
|
|
|
|
if state.Version.Consensus.Block != version.BlockProtocol { |
|
|
|
logger.Info("Software and state have different block protocols", |
|
|
|
"software", version.BlockProtocol, |
|
|
|
"state", state.Version.Consensus.Block, |
|
|
|
) |
|
|
|
} |
|
|
|
switch { |
|
|
|
case mode == cfg.ModeFull: |
|
|
|
consensusLogger.Info("This node is a fullnode") |
|
|
|
case mode == cfg.ModeValidator: |
|
|
|
addr := pubKey.Address() |
|
|
|
// Log whether this node is a validator or an observer
|
|
|
|
if state.Validators.HasAddress(addr) { |
|
|
|
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes()) |
|
|
|
} else { |
|
|
|
consensusLogger.Info("This node is a validator (NOT in the active validator set)", |
|
|
|
"addr", addr, "pubKey", pubKey.Bytes()) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { |
|
|
|
if state.Validators.Size() > 1 { |
|
|
|
return false |
|
|
|
} |
|
|
|
addr, _ := state.Validators.GetByIndex(0) |
|
|
|
return pubKey != nil && bytes.Equal(pubKey.Address(), addr) |
|
|
|
} |
|
|
|
|
|
|
|
func createMempoolReactor( |
|
|
|
config *cfg.Config, |
|
|
|
proxyApp proxy.AppConns, |
|
|
|
state sm.State, |
|
|
|
memplMetrics *mempl.Metrics, |
|
|
|
peerManager *p2p.PeerManager, |
|
|
|
router *p2p.Router, |
|
|
|
logger log.Logger, |
|
|
|
) (*p2p.ReactorShim, *mempl.Reactor, *mempl.CListMempool) { |
|
|
|
|
|
|
|
logger = logger.With("module", "mempool") |
|
|
|
mempool := mempl.NewCListMempool( |
|
|
|
config.Mempool, |
|
|
|
proxyApp.Mempool(), |
|
|
|
state.LastBlockHeight, |
|
|
|
mempl.WithMetrics(memplMetrics), |
|
|
|
mempl.WithPreCheck(sm.TxPreCheck(state)), |
|
|
|
mempl.WithPostCheck(sm.TxPostCheck(state)), |
|
|
|
) |
|
|
|
|
|
|
|
mempool.SetLogger(logger) |
|
|
|
|
|
|
|
channelShims := mempl.GetChannelShims(config.Mempool) |
|
|
|
reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims) |
|
|
|
|
|
|
|
var ( |
|
|
|
channels map[p2p.ChannelID]*p2p.Channel |
|
|
|
peerUpdates *p2p.PeerUpdates |
|
|
|
) |
|
|
|
|
|
|
|
if config.P2P.DisableLegacy { |
|
|
|
channels = makeChannelsFromShims(router, channelShims) |
|
|
|
peerUpdates = peerManager.Subscribe() |
|
|
|
} else { |
|
|
|
channels = getChannelsFromShim(reactorShim) |
|
|
|
peerUpdates = reactorShim.PeerUpdates |
|
|
|
} |
|
|
|
|
|
|
|
reactor := mempl.NewReactor( |
|
|
|
logger, |
|
|
|
config.Mempool, |
|
|
|
peerManager, |
|
|
|
mempool, |
|
|
|
channels[mempl.MempoolChannel], |
|
|
|
peerUpdates, |
|
|
|
) |
|
|
|
|
|
|
|
if config.Consensus.WaitForTxs() { |
|
|
|
mempool.EnableTxsAvailable() |
|
|
|
} |
|
|
|
|
|
|
|
return reactorShim, reactor, mempool |
|
|
|
} |
|
|
|
|
|
|
|
func createEvidenceReactor( |
|
|
|
config *cfg.Config, |
|
|
|
dbProvider DBProvider, |
|
|
|
stateDB dbm.DB, |
|
|
|
blockStore *store.BlockStore, |
|
|
|
peerManager *p2p.PeerManager, |
|
|
|
router *p2p.Router, |
|
|
|
logger log.Logger, |
|
|
|
) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) { |
|
|
|
evidenceDB, err := dbProvider(&DBContext{"evidence", config}) |
|
|
|
if err != nil { |
|
|
|
return nil, nil, nil, err |
|
|
|
} |
|
|
|
|
|
|
|
logger = logger.With("module", "evidence") |
|
|
|
reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims) |
|
|
|
|
|
|
|
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore) |
|
|
|
if err != nil { |
|
|
|
return nil, nil, nil, err |
|
|
|
} |
|
|
|
|
|
|
|
var ( |
|
|
|
channels map[p2p.ChannelID]*p2p.Channel |
|
|
|
peerUpdates *p2p.PeerUpdates |
|
|
|
) |
|
|
|
|
|
|
|
if config.P2P.DisableLegacy { |
|
|
|
channels = makeChannelsFromShims(router, evidence.ChannelShims) |
|
|
|
peerUpdates = peerManager.Subscribe() |
|
|
|
} else { |
|
|
|
channels = getChannelsFromShim(reactorShim) |
|
|
|
peerUpdates = reactorShim.PeerUpdates |
|
|
|
} |
|
|
|
|
|
|
|
evidenceReactor := evidence.NewReactor( |
|
|
|
logger, |
|
|
|
channels[evidence.EvidenceChannel], |
|
|
|
peerUpdates, |
|
|
|
evidencePool, |
|
|
|
) |
|
|
|
|
|
|
|
return reactorShim, evidenceReactor, evidencePool, nil |
|
|
|
} |
|
|
|
|
|
|
|
func createBlockchainReactor( |
|
|
|
logger log.Logger, |
|
|
|
config *cfg.Config, |
|
|
|
state sm.State, |
|
|
|
blockExec *sm.BlockExecutor, |
|
|
|
blockStore *store.BlockStore, |
|
|
|
csReactor *cs.Reactor, |
|
|
|
peerManager *p2p.PeerManager, |
|
|
|
router *p2p.Router, |
|
|
|
fastSync bool, |
|
|
|
) (*p2p.ReactorShim, service.Service, error) { |
|
|
|
|
|
|
|
logger = logger.With("module", "blockchain") |
|
|
|
|
|
|
|
switch config.FastSync.Version { |
|
|
|
case cfg.BlockchainV0: |
|
|
|
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims) |
|
|
|
|
|
|
|
var ( |
|
|
|
channels map[p2p.ChannelID]*p2p.Channel |
|
|
|
peerUpdates *p2p.PeerUpdates |
|
|
|
) |
|
|
|
|
|
|
|
if config.P2P.DisableLegacy { |
|
|
|
channels = makeChannelsFromShims(router, bcv0.ChannelShims) |
|
|
|
peerUpdates = peerManager.Subscribe() |
|
|
|
} else { |
|
|
|
channels = getChannelsFromShim(reactorShim) |
|
|
|
peerUpdates = reactorShim.PeerUpdates |
|
|
|
} |
|
|
|
|
|
|
|
reactor, err := bcv0.NewReactor( |
|
|
|
logger, state.Copy(), blockExec, blockStore, csReactor, |
|
|
|
channels[bcv0.BlockchainChannel], peerUpdates, fastSync, |
|
|
|
) |
|
|
|
if err != nil { |
|
|
|
return nil, nil, err |
|
|
|
} |
|
|
|
|
|
|
|
return reactorShim, reactor, nil |
|
|
|
|
|
|
|
case cfg.BlockchainV2: |
|
|
|
reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) |
|
|
|
reactor.SetLogger(logger) |
|
|
|
|
|
|
|
return nil, reactor, nil |
|
|
|
|
|
|
|
default: |
|
|
|
return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func createConsensusReactor( |
|
|
|
config *cfg.Config, |
|
|
|
state sm.State, |
|
|
|
blockExec *sm.BlockExecutor, |
|
|
|
blockStore sm.BlockStore, |
|
|
|
mempool *mempl.CListMempool, |
|
|
|
evidencePool *evidence.Pool, |
|
|
|
privValidator types.PrivValidator, |
|
|
|
csMetrics *cs.Metrics, |
|
|
|
waitSync bool, |
|
|
|
eventBus *types.EventBus, |
|
|
|
peerManager *p2p.PeerManager, |
|
|
|
router *p2p.Router, |
|
|
|
logger log.Logger, |
|
|
|
) (*p2p.ReactorShim, *cs.Reactor, *cs.State) { |
|
|
|
|
|
|
|
consensusState := cs.NewState( |
|
|
|
config.Consensus, |
|
|
|
state.Copy(), |
|
|
|
blockExec, |
|
|
|
blockStore, |
|
|
|
mempool, |
|
|
|
evidencePool, |
|
|
|
cs.StateMetrics(csMetrics), |
|
|
|
) |
|
|
|
consensusState.SetLogger(logger) |
|
|
|
if privValidator != nil && config.Mode == cfg.ModeValidator { |
|
|
|
consensusState.SetPrivValidator(privValidator) |
|
|
|
} |
|
|
|
|
|
|
|
reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims) |
|
|
|
|
|
|
|
var ( |
|
|
|
channels map[p2p.ChannelID]*p2p.Channel |
|
|
|
peerUpdates *p2p.PeerUpdates |
|
|
|
) |
|
|
|
|
|
|
|
if config.P2P.DisableLegacy { |
|
|
|
channels = makeChannelsFromShims(router, cs.ChannelShims) |
|
|
|
peerUpdates = peerManager.Subscribe() |
|
|
|
} else { |
|
|
|
channels = getChannelsFromShim(reactorShim) |
|
|
|
peerUpdates = reactorShim.PeerUpdates |
|
|
|
} |
|
|
|
|
|
|
|
reactor := cs.NewReactor( |
|
|
|
logger, |
|
|
|
consensusState, |
|
|
|
channels[cs.StateChannel], |
|
|
|
channels[cs.DataChannel], |
|
|
|
channels[cs.VoteChannel], |
|
|
|
channels[cs.VoteSetBitsChannel], |
|
|
|
peerUpdates, |
|
|
|
waitSync, |
|
|
|
cs.ReactorMetrics(csMetrics), |
|
|
|
) |
|
|
|
|
|
|
|
// Services which will be publishing and/or subscribing for messages (events)
|
|
|
|
// consensusReactor will set it on consensusState and blockExecutor.
|
|
|
|
reactor.SetEventBus(eventBus) |
|
|
|
|
|
|
|
return reactorShim, reactor, consensusState |
|
|
|
} |
|
|
|
|
|
|
|
func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport { |
|
|
|
return p2p.NewMConnTransport( |
|
|
|
logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{}, |
|
|
|
p2p.MConnTransportOptions{ |
|
|
|
MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers + |
|
|
|
len(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")), |
|
|
|
), |
|
|
|
}, |
|
|
|
) |
|
|
|
} |
|
|
|
|
|
|
|
func createPeerManager( |
|
|
|
config *cfg.Config, |
|
|
|
dbProvider DBProvider, |
|
|
|
p2pLogger log.Logger, |
|
|
|
nodeID p2p.NodeID, |
|
|
|
) (*p2p.PeerManager, error) { |
|
|
|
|
|
|
|
var maxConns uint16 |
|
|
|
|
|
|
|
switch { |
|
|
|
case config.P2P.MaxConnections > 0: |
|
|
|
maxConns = config.P2P.MaxConnections |
|
|
|
|
|
|
|
case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0: |
|
|
|
x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers |
|
|
|
if x > math.MaxUint16 { |
|
|
|
return nil, fmt.Errorf( |
|
|
|
"max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)", |
|
|
|
config.P2P.MaxNumInboundPeers, |
|
|
|
config.P2P.MaxNumOutboundPeers, |
|
|
|
math.MaxUint16, |
|
|
|
) |
|
|
|
} |
|
|
|
|
|
|
|
maxConns = uint16(x) |
|
|
|
|
|
|
|
default: |
|
|
|
maxConns = 64 |
|
|
|
} |
|
|
|
|
|
|
|
privatePeerIDs := make(map[p2p.NodeID]struct{}) |
|
|
|
for _, id := range strings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") { |
|
|
|
privatePeerIDs[p2p.NodeID(id)] = struct{}{} |
|
|
|
} |
|
|
|
|
|
|
|
options := p2p.PeerManagerOptions{ |
|
|
|
MaxConnected: maxConns, |
|
|
|
MaxConnectedUpgrade: 4, |
|
|
|
MaxPeers: 1000, |
|
|
|
MinRetryTime: 100 * time.Millisecond, |
|
|
|
MaxRetryTime: 8 * time.Hour, |
|
|
|
MaxRetryTimePersistent: 5 * time.Minute, |
|
|
|
RetryTimeJitter: 3 * time.Second, |
|
|
|
PrivatePeers: privatePeerIDs, |
|
|
|
} |
|
|
|
|
|
|
|
peers := []p2p.NodeAddress{} |
|
|
|
for _, p := range strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") { |
|
|
|
address, err := p2p.ParseNodeAddress(p) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("invalid peer address %q: %w", p, err) |
|
|
|
} |
|
|
|
|
|
|
|
peers = append(peers, address) |
|
|
|
options.PersistentPeers = append(options.PersistentPeers, address.NodeID) |
|
|
|
} |
|
|
|
|
|
|
|
for _, p := range strings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") { |
|
|
|
address, err := p2p.ParseNodeAddress(p) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("invalid peer address %q: %w", p, err) |
|
|
|
} |
|
|
|
peers = append(peers, address) |
|
|
|
} |
|
|
|
|
|
|
|
peerDB, err := dbProvider(&DBContext{"peerstore", config}) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("failed to create peer manager: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
for _, peer := range peers { |
|
|
|
if _, err := peerManager.Add(peer); err != nil { |
|
|
|
return nil, fmt.Errorf("failed to add peer %q: %w", peer, err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return peerManager, nil |
|
|
|
} |
|
|
|
|
|
|
|
func createRouter( |
|
|
|
p2pLogger log.Logger, |
|
|
|
p2pMetrics *p2p.Metrics, |
|
|
|
nodeInfo p2p.NodeInfo, |
|
|
|
privKey crypto.PrivKey, |
|
|
|
peerManager *p2p.PeerManager, |
|
|
|
transport p2p.Transport, |
|
|
|
options p2p.RouterOptions, |
|
|
|
) (*p2p.Router, error) { |
|
|
|
|
|
|
|
return p2p.NewRouter( |
|
|
|
p2pLogger, |
|
|
|
p2pMetrics, |
|
|
|
nodeInfo, |
|
|
|
privKey, |
|
|
|
peerManager, |
|
|
|
[]p2p.Transport{transport}, |
|
|
|
options, |
|
|
|
) |
|
|
|
} |
|
|
|
|
|
|
|
func createSwitch( |
|
|
|
config *cfg.Config, |
|
|
|
transport p2p.Transport, |
|
|
|
p2pMetrics *p2p.Metrics, |
|
|
|
mempoolReactor *p2p.ReactorShim, |
|
|
|
bcReactor p2p.Reactor, |
|
|
|
stateSyncReactor *p2p.ReactorShim, |
|
|
|
consensusReactor *p2p.ReactorShim, |
|
|
|
evidenceReactor *p2p.ReactorShim, |
|
|
|
proxyApp proxy.AppConns, |
|
|
|
nodeInfo p2p.NodeInfo, |
|
|
|
nodeKey p2p.NodeKey, |
|
|
|
p2pLogger log.Logger, |
|
|
|
) *p2p.Switch { |
|
|
|
|
|
|
|
var ( |
|
|
|
connFilters = []p2p.ConnFilterFunc{} |
|
|
|
peerFilters = []p2p.PeerFilterFunc{} |
|
|
|
) |
|
|
|
|
|
|
|
if !config.P2P.AllowDuplicateIP { |
|
|
|
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter) |
|
|
|
} |
|
|
|
|
|
|
|
// Filter peers by addr or pubkey with an ABCI query.
|
|
|
|
// If the query return code is OK, add peer.
|
|
|
|
if config.FilterPeers { |
|
|
|
connFilters = append( |
|
|
|
connFilters, |
|
|
|
// ABCI query for address filtering.
|
|
|
|
func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { |
|
|
|
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{ |
|
|
|
Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
if res.IsErr() { |
|
|
|
return fmt.Errorf("error querying abci app: %v", res) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
}, |
|
|
|
) |
|
|
|
|
|
|
|
peerFilters = append( |
|
|
|
peerFilters, |
|
|
|
// ABCI query for ID filtering.
|
|
|
|
func(_ p2p.IPeerSet, p p2p.Peer) error { |
|
|
|
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{ |
|
|
|
Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
if res.IsErr() { |
|
|
|
return fmt.Errorf("error querying abci app: %v", res) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
}, |
|
|
|
) |
|
|
|
} |
|
|
|
|
|
|
|
sw := p2p.NewSwitch( |
|
|
|
config.P2P, |
|
|
|
transport, |
|
|
|
p2p.WithMetrics(p2pMetrics), |
|
|
|
p2p.SwitchPeerFilters(peerFilters...), |
|
|
|
p2p.SwitchConnFilters(connFilters...), |
|
|
|
) |
|
|
|
|
|
|
|
sw.SetLogger(p2pLogger) |
|
|
|
if config.Mode != cfg.ModeSeed { |
|
|
|
sw.AddReactor("MEMPOOL", mempoolReactor) |
|
|
|
sw.AddReactor("BLOCKCHAIN", bcReactor) |
|
|
|
sw.AddReactor("CONSENSUS", consensusReactor) |
|
|
|
sw.AddReactor("EVIDENCE", evidenceReactor) |
|
|
|
sw.AddReactor("STATESYNC", stateSyncReactor) |
|
|
|
} |
|
|
|
|
|
|
|
sw.SetNodeInfo(nodeInfo) |
|
|
|
sw.SetNodeKey(nodeKey) |
|
|
|
|
|
|
|
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile()) |
|
|
|
return sw |
|
|
|
} |
|
|
|
|
|
|
|
func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch, |
|
|
|
p2pLogger log.Logger, nodeKey p2p.NodeKey) (pex.AddrBook, error) { |
|
|
|
|
|
|
|
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) |
|
|
|
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) |
|
|
|
|
|
|
|
// Add ourselves to addrbook to prevent dialing ourselves
|
|
|
|
if config.P2P.ExternalAddress != "" { |
|
|
|
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ExternalAddress)) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err) |
|
|
|
} |
|
|
|
addrBook.AddOurAddress(addr) |
|
|
|
} |
|
|
|
if config.P2P.ListenAddress != "" { |
|
|
|
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ListenAddress)) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err) |
|
|
|
} |
|
|
|
addrBook.AddOurAddress(addr) |
|
|
|
} |
|
|
|
|
|
|
|
sw.SetAddrBook(addrBook) |
|
|
|
|
|
|
|
return addrBook, nil |
|
|
|
} |
|
|
|
|
|
|
|
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, |
|
|
|
sw *p2p.Switch, logger log.Logger) *pex.Reactor { |
|
|
|
|
|
|
|
reactorConfig := &pex.ReactorConfig{ |
|
|
|
Seeds: strings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "), |
|
|
|
SeedMode: config.Mode == cfg.ModeSeed, |
|
|
|
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
|
|
|
|
// blocks assuming 10s blocks ~ 28 hours.
|
|
|
|
// TODO (melekes): make it dynamic based on the actual block latencies
|
|
|
|
// from the live network.
|
|
|
|
// https://github.com/tendermint/tendermint/issues/3523
|
|
|
|
SeedDisconnectWaitPeriod: 28 * time.Hour, |
|
|
|
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod, |
|
|
|
} |
|
|
|
// TODO persistent peers ? so we can have their DNS addrs saved
|
|
|
|
pexReactor := pex.NewReactor(addrBook, reactorConfig) |
|
|
|
pexReactor.SetLogger(logger.With("module", "pex")) |
|
|
|
sw.AddReactor("PEX", pexReactor) |
|
|
|
return pexReactor |
|
|
|
} |
|
|
|
|
|
|
|
func createPEXReactorV2( |
|
|
|
config *cfg.Config, |
|
|
|
logger log.Logger, |
|
|
|
peerManager *p2p.PeerManager, |
|
|
|
router *p2p.Router, |
|
|
|
) (*pex.ReactorV2, error) { |
|
|
|
|
|
|
|
channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 4096) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
peerUpdates := peerManager.Subscribe() |
|
|
|
return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil |
|
|
|
} |
|
|
|
|
|
|
|
func makeNodeInfo( |
|
|
|
config *cfg.Config, |
|
|
|
nodeKey p2p.NodeKey, |
|
|
|
txIndexer indexer.TxIndexer, |
|
|
|
genDoc *types.GenesisDoc, |
|
|
|
state sm.State, |
|
|
|
) (p2p.NodeInfo, error) { |
|
|
|
txIndexerStatus := "on" |
|
|
|
if _, ok := txIndexer.(*null.TxIndex); ok { |
|
|
|
txIndexerStatus = "off" |
|
|
|
} |
|
|
|
|
|
|
|
var bcChannel byte |
|
|
|
switch config.FastSync.Version { |
|
|
|
case cfg.BlockchainV0: |
|
|
|
bcChannel = byte(bcv0.BlockchainChannel) |
|
|
|
|
|
|
|
case cfg.BlockchainV2: |
|
|
|
bcChannel = bcv2.BlockchainChannel |
|
|
|
|
|
|
|
default: |
|
|
|
return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) |
|
|
|
} |
|
|
|
|
|
|
|
nodeInfo := p2p.NodeInfo{ |
|
|
|
ProtocolVersion: p2p.NewProtocolVersion( |
|
|
|
version.P2PProtocol, // global
|
|
|
|
state.Version.Consensus.Block, |
|
|
|
state.Version.Consensus.App, |
|
|
|
), |
|
|
|
NodeID: nodeKey.ID, |
|
|
|
Network: genDoc.ChainID, |
|
|
|
Version: version.TMCoreSemVer, |
|
|
|
Channels: []byte{ |
|
|
|
bcChannel, |
|
|
|
byte(cs.StateChannel), |
|
|
|
byte(cs.DataChannel), |
|
|
|
byte(cs.VoteChannel), |
|
|
|
byte(cs.VoteSetBitsChannel), |
|
|
|
byte(mempl.MempoolChannel), |
|
|
|
byte(evidence.EvidenceChannel), |
|
|
|
byte(statesync.SnapshotChannel), |
|
|
|
byte(statesync.ChunkChannel), |
|
|
|
}, |
|
|
|
Moniker: config.Moniker, |
|
|
|
Other: p2p.NodeInfoOther{ |
|
|
|
TxIndex: txIndexerStatus, |
|
|
|
RPCAddress: config.RPC.ListenAddress, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
if config.P2P.PexReactor { |
|
|
|
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel) |
|
|
|
} |
|
|
|
|
|
|
|
lAddr := config.P2P.ExternalAddress |
|
|
|
|
|
|
|
if lAddr == "" { |
|
|
|
lAddr = config.P2P.ListenAddress |
|
|
|
} |
|
|
|
|
|
|
|
nodeInfo.ListenAddr = lAddr |
|
|
|
|
|
|
|
err := nodeInfo.Validate() |
|
|
|
return nodeInfo, err |
|
|
|
} |
|
|
|
|
|
|
|
func makeSeedNodeInfo( |
|
|
|
config *cfg.Config, |
|
|
|
nodeKey p2p.NodeKey, |
|
|
|
genDoc *types.GenesisDoc, |
|
|
|
state sm.State, |
|
|
|
) (p2p.NodeInfo, error) { |
|
|
|
nodeInfo := p2p.NodeInfo{ |
|
|
|
ProtocolVersion: p2p.NewProtocolVersion( |
|
|
|
version.P2PProtocol, // global
|
|
|
|
state.Version.Consensus.Block, |
|
|
|
state.Version.Consensus.App, |
|
|
|
), |
|
|
|
NodeID: nodeKey.ID, |
|
|
|
Network: genDoc.ChainID, |
|
|
|
Version: version.TMCoreSemVer, |
|
|
|
Channels: []byte{}, |
|
|
|
Moniker: config.Moniker, |
|
|
|
Other: p2p.NodeInfoOther{ |
|
|
|
TxIndex: "off", |
|
|
|
RPCAddress: config.RPC.ListenAddress, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
if config.P2P.PexReactor { |
|
|
|
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel) |
|
|
|
} |
|
|
|
|
|
|
|
lAddr := config.P2P.ExternalAddress |
|
|
|
|
|
|
|
if lAddr == "" { |
|
|
|
lAddr = config.P2P.ListenAddress |
|
|
|
} |
|
|
|
|
|
|
|
nodeInfo.ListenAddr = lAddr |
|
|
|
|
|
|
|
err := nodeInfo.Validate() |
|
|
|
return nodeInfo, err |
|
|
|
} |