You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

820 lines
22 KiB

  1. package node
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "math"
  8. "net"
  9. _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
  10. "strings"
  11. "time"
  12. dbm "github.com/tendermint/tm-db"
  13. abci "github.com/tendermint/tendermint/abci/types"
  14. cfg "github.com/tendermint/tendermint/config"
  15. "github.com/tendermint/tendermint/crypto"
  16. bcv0 "github.com/tendermint/tendermint/internal/blocksync/v0"
  17. bcv2 "github.com/tendermint/tendermint/internal/blocksync/v2"
  18. cs "github.com/tendermint/tendermint/internal/consensus"
  19. "github.com/tendermint/tendermint/internal/evidence"
  20. "github.com/tendermint/tendermint/internal/mempool"
  21. mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0"
  22. mempoolv1 "github.com/tendermint/tendermint/internal/mempool/v1"
  23. "github.com/tendermint/tendermint/internal/p2p"
  24. "github.com/tendermint/tendermint/internal/p2p/pex"
  25. "github.com/tendermint/tendermint/internal/statesync"
  26. "github.com/tendermint/tendermint/libs/log"
  27. "github.com/tendermint/tendermint/libs/service"
  28. tmstrings "github.com/tendermint/tendermint/libs/strings"
  29. protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  30. "github.com/tendermint/tendermint/proxy"
  31. sm "github.com/tendermint/tendermint/state"
  32. "github.com/tendermint/tendermint/state/indexer"
  33. kv "github.com/tendermint/tendermint/state/indexer/sink/kv"
  34. null "github.com/tendermint/tendermint/state/indexer/sink/null"
  35. psql "github.com/tendermint/tendermint/state/indexer/sink/psql"
  36. "github.com/tendermint/tendermint/store"
  37. "github.com/tendermint/tendermint/types"
  38. "github.com/tendermint/tendermint/version"
  39. )
  40. func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
  41. var blockStoreDB dbm.DB
  42. blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config})
  43. if err != nil {
  44. return
  45. }
  46. blockStore = store.NewBlockStore(blockStoreDB)
  47. stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config})
  48. return
  49. }
  50. func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) {
  51. proxyApp := proxy.NewAppConns(clientCreator)
  52. proxyApp.SetLogger(logger.With("module", "proxy"))
  53. if err := proxyApp.Start(); err != nil {
  54. return nil, fmt.Errorf("error starting proxy app connections: %v", err)
  55. }
  56. return proxyApp, nil
  57. }
  58. func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
  59. eventBus := types.NewEventBus()
  60. eventBus.SetLogger(logger.With("module", "events"))
  61. if err := eventBus.Start(); err != nil {
  62. return nil, err
  63. }
  64. return eventBus, nil
  65. }
  66. func createAndStartIndexerService(
  67. config *cfg.Config,
  68. dbProvider cfg.DBProvider,
  69. eventBus *types.EventBus,
  70. logger log.Logger,
  71. chainID string,
  72. ) (*indexer.Service, []indexer.EventSink, error) {
  73. eventSinks := []indexer.EventSink{}
  74. // check for duplicated sinks
  75. sinks := map[string]bool{}
  76. for _, s := range config.TxIndex.Indexer {
  77. sl := strings.ToLower(s)
  78. if sinks[sl] {
  79. return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
  80. }
  81. sinks[sl] = true
  82. }
  83. loop:
  84. for k := range sinks {
  85. switch k {
  86. case string(indexer.NULL):
  87. // When we see null in the config, the eventsinks will be reset with the
  88. // nullEventSink.
  89. eventSinks = []indexer.EventSink{null.NewEventSink()}
  90. break loop
  91. case string(indexer.KV):
  92. store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config})
  93. if err != nil {
  94. return nil, nil, err
  95. }
  96. eventSinks = append(eventSinks, kv.NewEventSink(store))
  97. case string(indexer.PSQL):
  98. conn := config.TxIndex.PsqlConn
  99. if conn == "" {
  100. return nil, nil, errors.New("the psql connection settings cannot be empty")
  101. }
  102. es, _, err := psql.NewEventSink(conn, chainID)
  103. if err != nil {
  104. return nil, nil, err
  105. }
  106. eventSinks = append(eventSinks, es)
  107. default:
  108. return nil, nil, errors.New("unsupported event sink type")
  109. }
  110. }
  111. if len(eventSinks) == 0 {
  112. eventSinks = []indexer.EventSink{null.NewEventSink()}
  113. }
  114. indexerService := indexer.NewIndexerService(eventSinks, eventBus)
  115. indexerService.SetLogger(logger.With("module", "txindex"))
  116. if err := indexerService.Start(); err != nil {
  117. return nil, nil, err
  118. }
  119. return indexerService, eventSinks, nil
  120. }
  121. func doHandshake(
  122. stateStore sm.Store,
  123. state sm.State,
  124. blockStore sm.BlockStore,
  125. genDoc *types.GenesisDoc,
  126. eventBus types.BlockEventPublisher,
  127. proxyApp proxy.AppConns,
  128. consensusLogger log.Logger) error {
  129. handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
  130. handshaker.SetLogger(consensusLogger)
  131. handshaker.SetEventBus(eventBus)
  132. if err := handshaker.Handshake(proxyApp); err != nil {
  133. return fmt.Errorf("error during handshake: %v", err)
  134. }
  135. return nil
  136. }
  137. func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) {
  138. // Log the version info.
  139. logger.Info("Version info",
  140. "tmVersion", version.TMVersion,
  141. "block", version.BlockProtocol,
  142. "p2p", version.P2PProtocol,
  143. "mode", mode,
  144. )
  145. // If the state and software differ in block version, at least log it.
  146. if state.Version.Consensus.Block != version.BlockProtocol {
  147. logger.Info("Software and state have different block protocols",
  148. "software", version.BlockProtocol,
  149. "state", state.Version.Consensus.Block,
  150. )
  151. }
  152. switch {
  153. case mode == cfg.ModeFull:
  154. consensusLogger.Info("This node is a fullnode")
  155. case mode == cfg.ModeValidator:
  156. addr := pubKey.Address()
  157. // Log whether this node is a validator or an observer
  158. if state.Validators.HasAddress(addr) {
  159. consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
  160. } else {
  161. consensusLogger.Info("This node is a validator (NOT in the active validator set)",
  162. "addr", addr, "pubKey", pubKey.Bytes())
  163. }
  164. }
  165. }
  166. func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
  167. if state.Validators.Size() > 1 {
  168. return false
  169. }
  170. addr, _ := state.Validators.GetByIndex(0)
  171. return pubKey != nil && bytes.Equal(pubKey.Address(), addr)
  172. }
  173. func createMempoolReactor(
  174. config *cfg.Config,
  175. proxyApp proxy.AppConns,
  176. state sm.State,
  177. memplMetrics *mempool.Metrics,
  178. peerManager *p2p.PeerManager,
  179. router *p2p.Router,
  180. logger log.Logger,
  181. ) (*p2p.ReactorShim, service.Service, mempool.Mempool, error) {
  182. logger = logger.With("module", "mempool", "version", config.Mempool.Version)
  183. channelShims := mempoolv0.GetChannelShims(config.Mempool)
  184. reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims)
  185. var (
  186. channels map[p2p.ChannelID]*p2p.Channel
  187. peerUpdates *p2p.PeerUpdates
  188. )
  189. if config.P2P.DisableLegacy {
  190. channels = makeChannelsFromShims(router, channelShims)
  191. peerUpdates = peerManager.Subscribe()
  192. } else {
  193. channels = getChannelsFromShim(reactorShim)
  194. peerUpdates = reactorShim.PeerUpdates
  195. }
  196. switch config.Mempool.Version {
  197. case cfg.MempoolV0:
  198. mp := mempoolv0.NewCListMempool(
  199. config.Mempool,
  200. proxyApp.Mempool(),
  201. state.LastBlockHeight,
  202. mempoolv0.WithMetrics(memplMetrics),
  203. mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
  204. mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
  205. )
  206. mp.SetLogger(logger)
  207. reactor := mempoolv0.NewReactor(
  208. logger,
  209. config.Mempool,
  210. peerManager,
  211. mp,
  212. channels[mempool.MempoolChannel],
  213. peerUpdates,
  214. )
  215. if config.Consensus.WaitForTxs() {
  216. mp.EnableTxsAvailable()
  217. }
  218. return reactorShim, reactor, mp, nil
  219. case cfg.MempoolV1:
  220. mp := mempoolv1.NewTxMempool(
  221. logger,
  222. config.Mempool,
  223. proxyApp.Mempool(),
  224. state.LastBlockHeight,
  225. mempoolv1.WithMetrics(memplMetrics),
  226. mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
  227. mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
  228. )
  229. reactor := mempoolv1.NewReactor(
  230. logger,
  231. config.Mempool,
  232. peerManager,
  233. mp,
  234. channels[mempool.MempoolChannel],
  235. peerUpdates,
  236. )
  237. if config.Consensus.WaitForTxs() {
  238. mp.EnableTxsAvailable()
  239. }
  240. return reactorShim, reactor, mp, nil
  241. default:
  242. return nil, nil, nil, fmt.Errorf("unknown mempool version: %s", config.Mempool.Version)
  243. }
  244. }
  245. func createEvidenceReactor(
  246. config *cfg.Config,
  247. dbProvider cfg.DBProvider,
  248. stateDB dbm.DB,
  249. blockStore *store.BlockStore,
  250. peerManager *p2p.PeerManager,
  251. router *p2p.Router,
  252. logger log.Logger,
  253. ) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
  254. evidenceDB, err := dbProvider(&cfg.DBContext{ID: "evidence", Config: config})
  255. if err != nil {
  256. return nil, nil, nil, err
  257. }
  258. logger = logger.With("module", "evidence")
  259. reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
  260. evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
  261. if err != nil {
  262. return nil, nil, nil, fmt.Errorf("creating evidence pool: %w", err)
  263. }
  264. var (
  265. channels map[p2p.ChannelID]*p2p.Channel
  266. peerUpdates *p2p.PeerUpdates
  267. )
  268. if config.P2P.DisableLegacy {
  269. channels = makeChannelsFromShims(router, evidence.ChannelShims)
  270. peerUpdates = peerManager.Subscribe()
  271. } else {
  272. channels = getChannelsFromShim(reactorShim)
  273. peerUpdates = reactorShim.PeerUpdates
  274. }
  275. evidenceReactor := evidence.NewReactor(
  276. logger,
  277. channels[evidence.EvidenceChannel],
  278. peerUpdates,
  279. evidencePool,
  280. )
  281. return reactorShim, evidenceReactor, evidencePool, nil
  282. }
  283. func createBlockchainReactor(
  284. logger log.Logger,
  285. config *cfg.Config,
  286. state sm.State,
  287. blockExec *sm.BlockExecutor,
  288. blockStore *store.BlockStore,
  289. csReactor *cs.Reactor,
  290. peerManager *p2p.PeerManager,
  291. router *p2p.Router,
  292. blockSync bool,
  293. metrics *cs.Metrics,
  294. ) (*p2p.ReactorShim, service.Service, error) {
  295. logger = logger.With("module", "blockchain")
  296. switch config.BlockSync.Version {
  297. case cfg.BlockSyncV0:
  298. reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
  299. var (
  300. channels map[p2p.ChannelID]*p2p.Channel
  301. peerUpdates *p2p.PeerUpdates
  302. )
  303. if config.P2P.DisableLegacy {
  304. channels = makeChannelsFromShims(router, bcv0.ChannelShims)
  305. peerUpdates = peerManager.Subscribe()
  306. } else {
  307. channels = getChannelsFromShim(reactorShim)
  308. peerUpdates = reactorShim.PeerUpdates
  309. }
  310. reactor, err := bcv0.NewReactor(
  311. logger, state.Copy(), blockExec, blockStore, csReactor,
  312. channels[bcv0.BlockchainChannel], peerUpdates, blockSync,
  313. metrics,
  314. )
  315. if err != nil {
  316. return nil, nil, err
  317. }
  318. return reactorShim, reactor, nil
  319. case cfg.BlockSyncV2:
  320. return nil, nil, errors.New("block sync version v2 is no longer supported. Please use v0")
  321. default:
  322. return nil, nil, fmt.Errorf("unknown block sync version %s", config.BlockSync.Version)
  323. }
  324. }
  325. func createConsensusReactor(
  326. config *cfg.Config,
  327. state sm.State,
  328. blockExec *sm.BlockExecutor,
  329. blockStore sm.BlockStore,
  330. mp mempool.Mempool,
  331. evidencePool *evidence.Pool,
  332. privValidator types.PrivValidator,
  333. csMetrics *cs.Metrics,
  334. waitSync bool,
  335. eventBus *types.EventBus,
  336. peerManager *p2p.PeerManager,
  337. router *p2p.Router,
  338. logger log.Logger,
  339. ) (*p2p.ReactorShim, *cs.Reactor, *cs.State) {
  340. consensusState := cs.NewState(
  341. config.Consensus,
  342. state.Copy(),
  343. blockExec,
  344. blockStore,
  345. mp,
  346. evidencePool,
  347. cs.StateMetrics(csMetrics),
  348. )
  349. consensusState.SetLogger(logger)
  350. if privValidator != nil && config.Mode == cfg.ModeValidator {
  351. consensusState.SetPrivValidator(privValidator)
  352. }
  353. reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims)
  354. var (
  355. channels map[p2p.ChannelID]*p2p.Channel
  356. peerUpdates *p2p.PeerUpdates
  357. )
  358. if config.P2P.DisableLegacy {
  359. channels = makeChannelsFromShims(router, cs.ChannelShims)
  360. peerUpdates = peerManager.Subscribe()
  361. } else {
  362. channels = getChannelsFromShim(reactorShim)
  363. peerUpdates = reactorShim.PeerUpdates
  364. }
  365. reactor := cs.NewReactor(
  366. logger,
  367. consensusState,
  368. channels[cs.StateChannel],
  369. channels[cs.DataChannel],
  370. channels[cs.VoteChannel],
  371. channels[cs.VoteSetBitsChannel],
  372. peerUpdates,
  373. waitSync,
  374. cs.ReactorMetrics(csMetrics),
  375. )
  376. // Services which will be publishing and/or subscribing for messages (events)
  377. // consensusReactor will set it on consensusState and blockExecutor.
  378. reactor.SetEventBus(eventBus)
  379. return reactorShim, reactor, consensusState
  380. }
  381. func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport {
  382. return p2p.NewMConnTransport(
  383. logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{},
  384. p2p.MConnTransportOptions{
  385. MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers +
  386. len(tmstrings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")),
  387. ),
  388. },
  389. )
  390. }
  391. func createPeerManager(
  392. config *cfg.Config,
  393. dbProvider cfg.DBProvider,
  394. p2pLogger log.Logger,
  395. nodeID types.NodeID,
  396. ) (*p2p.PeerManager, error) {
  397. var maxConns uint16
  398. switch {
  399. case config.P2P.MaxConnections > 0:
  400. maxConns = config.P2P.MaxConnections
  401. case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0:
  402. x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers
  403. if x > math.MaxUint16 {
  404. return nil, fmt.Errorf(
  405. "max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)",
  406. config.P2P.MaxNumInboundPeers,
  407. config.P2P.MaxNumOutboundPeers,
  408. math.MaxUint16,
  409. )
  410. }
  411. maxConns = uint16(x)
  412. default:
  413. maxConns = 64
  414. }
  415. privatePeerIDs := make(map[types.NodeID]struct{})
  416. for _, id := range tmstrings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") {
  417. privatePeerIDs[types.NodeID(id)] = struct{}{}
  418. }
  419. options := p2p.PeerManagerOptions{
  420. MaxConnected: maxConns,
  421. MaxConnectedUpgrade: 4,
  422. MaxPeers: 1000,
  423. MinRetryTime: 100 * time.Millisecond,
  424. MaxRetryTime: 8 * time.Hour,
  425. MaxRetryTimePersistent: 5 * time.Minute,
  426. RetryTimeJitter: 3 * time.Second,
  427. PrivatePeers: privatePeerIDs,
  428. }
  429. peers := []p2p.NodeAddress{}
  430. for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") {
  431. address, err := p2p.ParseNodeAddress(p)
  432. if err != nil {
  433. return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
  434. }
  435. peers = append(peers, address)
  436. options.PersistentPeers = append(options.PersistentPeers, address.NodeID)
  437. }
  438. for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") {
  439. address, err := p2p.ParseNodeAddress(p)
  440. if err != nil {
  441. return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
  442. }
  443. peers = append(peers, address)
  444. }
  445. peerDB, err := dbProvider(&cfg.DBContext{ID: "peerstore", Config: config})
  446. if err != nil {
  447. return nil, err
  448. }
  449. peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options)
  450. if err != nil {
  451. return nil, fmt.Errorf("failed to create peer manager: %w", err)
  452. }
  453. for _, peer := range peers {
  454. if _, err := peerManager.Add(peer); err != nil {
  455. return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
  456. }
  457. }
  458. return peerManager, nil
  459. }
  460. func createRouter(
  461. p2pLogger log.Logger,
  462. p2pMetrics *p2p.Metrics,
  463. nodeInfo types.NodeInfo,
  464. privKey crypto.PrivKey,
  465. peerManager *p2p.PeerManager,
  466. transport p2p.Transport,
  467. options p2p.RouterOptions,
  468. ) (*p2p.Router, error) {
  469. return p2p.NewRouter(
  470. p2pLogger,
  471. p2pMetrics,
  472. nodeInfo,
  473. privKey,
  474. peerManager,
  475. []p2p.Transport{transport},
  476. options,
  477. )
  478. }
  479. func createSwitch(
  480. config *cfg.Config,
  481. transport p2p.Transport,
  482. p2pMetrics *p2p.Metrics,
  483. mempoolReactor *p2p.ReactorShim,
  484. bcReactor p2p.Reactor,
  485. stateSyncReactor *p2p.ReactorShim,
  486. consensusReactor *p2p.ReactorShim,
  487. evidenceReactor *p2p.ReactorShim,
  488. proxyApp proxy.AppConns,
  489. nodeInfo types.NodeInfo,
  490. nodeKey types.NodeKey,
  491. p2pLogger log.Logger,
  492. ) *p2p.Switch {
  493. var (
  494. connFilters = []p2p.ConnFilterFunc{}
  495. peerFilters = []p2p.PeerFilterFunc{}
  496. )
  497. if !config.P2P.AllowDuplicateIP {
  498. connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
  499. }
  500. // Filter peers by addr or pubkey with an ABCI query.
  501. // If the query return code is OK, add peer.
  502. if config.FilterPeers {
  503. connFilters = append(
  504. connFilters,
  505. // ABCI query for address filtering.
  506. func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
  507. res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
  508. Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
  509. })
  510. if err != nil {
  511. return err
  512. }
  513. if res.IsErr() {
  514. return fmt.Errorf("error querying abci app: %v", res)
  515. }
  516. return nil
  517. },
  518. )
  519. peerFilters = append(
  520. peerFilters,
  521. // ABCI query for ID filtering.
  522. func(_ p2p.IPeerSet, p p2p.Peer) error {
  523. res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
  524. Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
  525. })
  526. if err != nil {
  527. return err
  528. }
  529. if res.IsErr() {
  530. return fmt.Errorf("error querying abci app: %v", res)
  531. }
  532. return nil
  533. },
  534. )
  535. }
  536. sw := p2p.NewSwitch(
  537. config.P2P,
  538. transport,
  539. p2p.WithMetrics(p2pMetrics),
  540. p2p.SwitchPeerFilters(peerFilters...),
  541. p2p.SwitchConnFilters(connFilters...),
  542. )
  543. sw.SetLogger(p2pLogger)
  544. if config.Mode != cfg.ModeSeed {
  545. sw.AddReactor("MEMPOOL", mempoolReactor)
  546. sw.AddReactor("BLOCKCHAIN", bcReactor)
  547. sw.AddReactor("CONSENSUS", consensusReactor)
  548. sw.AddReactor("EVIDENCE", evidenceReactor)
  549. sw.AddReactor("STATESYNC", stateSyncReactor)
  550. }
  551. sw.SetNodeInfo(nodeInfo)
  552. sw.SetNodeKey(nodeKey)
  553. p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile())
  554. return sw
  555. }
  556. func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
  557. p2pLogger log.Logger, nodeKey types.NodeKey) (pex.AddrBook, error) {
  558. addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  559. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  560. // Add ourselves to addrbook to prevent dialing ourselves
  561. if config.P2P.ExternalAddress != "" {
  562. addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ExternalAddress))
  563. if err != nil {
  564. return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
  565. }
  566. addrBook.AddOurAddress(addr)
  567. }
  568. if config.P2P.ListenAddress != "" {
  569. addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ListenAddress))
  570. if err != nil {
  571. return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
  572. }
  573. addrBook.AddOurAddress(addr)
  574. }
  575. sw.SetAddrBook(addrBook)
  576. return addrBook, nil
  577. }
  578. func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
  579. sw *p2p.Switch, logger log.Logger) *pex.Reactor {
  580. reactorConfig := &pex.ReactorConfig{
  581. Seeds: tmstrings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "),
  582. SeedMode: config.Mode == cfg.ModeSeed,
  583. // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
  584. // blocks assuming 10s blocks ~ 28 hours.
  585. // TODO (melekes): make it dynamic based on the actual block latencies
  586. // from the live network.
  587. // https://github.com/tendermint/tendermint/issues/3523
  588. SeedDisconnectWaitPeriod: 28 * time.Hour,
  589. PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
  590. }
  591. // TODO persistent peers ? so we can have their DNS addrs saved
  592. pexReactor := pex.NewReactor(addrBook, reactorConfig)
  593. pexReactor.SetLogger(logger.With("module", "pex"))
  594. sw.AddReactor("PEX", pexReactor)
  595. return pexReactor
  596. }
  597. func createPEXReactorV2(
  598. config *cfg.Config,
  599. logger log.Logger,
  600. peerManager *p2p.PeerManager,
  601. router *p2p.Router,
  602. ) (service.Service, error) {
  603. channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128)
  604. if err != nil {
  605. return nil, err
  606. }
  607. peerUpdates := peerManager.Subscribe()
  608. return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil
  609. }
  610. func makeNodeInfo(
  611. config *cfg.Config,
  612. nodeKey types.NodeKey,
  613. eventSinks []indexer.EventSink,
  614. genDoc *types.GenesisDoc,
  615. state sm.State,
  616. ) (types.NodeInfo, error) {
  617. txIndexerStatus := "off"
  618. if indexer.IndexingEnabled(eventSinks) {
  619. txIndexerStatus = "on"
  620. }
  621. var bcChannel byte
  622. switch config.BlockSync.Version {
  623. case cfg.BlockSyncV0:
  624. bcChannel = byte(bcv0.BlockchainChannel)
  625. case cfg.BlockSyncV2:
  626. bcChannel = bcv2.BlockchainChannel
  627. default:
  628. return types.NodeInfo{}, fmt.Errorf("unknown blocksync version %s", config.BlockSync.Version)
  629. }
  630. nodeInfo := types.NodeInfo{
  631. ProtocolVersion: types.ProtocolVersion{
  632. P2P: version.P2PProtocol, // global
  633. Block: state.Version.Consensus.Block,
  634. App: state.Version.Consensus.App,
  635. },
  636. NodeID: nodeKey.ID,
  637. Network: genDoc.ChainID,
  638. Version: version.TMVersion,
  639. Channels: []byte{
  640. bcChannel,
  641. byte(cs.StateChannel),
  642. byte(cs.DataChannel),
  643. byte(cs.VoteChannel),
  644. byte(cs.VoteSetBitsChannel),
  645. byte(mempool.MempoolChannel),
  646. byte(evidence.EvidenceChannel),
  647. byte(statesync.SnapshotChannel),
  648. byte(statesync.ChunkChannel),
  649. byte(statesync.LightBlockChannel),
  650. },
  651. Moniker: config.Moniker,
  652. Other: types.NodeInfoOther{
  653. TxIndex: txIndexerStatus,
  654. RPCAddress: config.RPC.ListenAddress,
  655. },
  656. }
  657. if config.P2P.PexReactor {
  658. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  659. }
  660. lAddr := config.P2P.ExternalAddress
  661. if lAddr == "" {
  662. lAddr = config.P2P.ListenAddress
  663. }
  664. nodeInfo.ListenAddr = lAddr
  665. err := nodeInfo.Validate()
  666. return nodeInfo, err
  667. }
  668. func makeSeedNodeInfo(
  669. config *cfg.Config,
  670. nodeKey types.NodeKey,
  671. genDoc *types.GenesisDoc,
  672. state sm.State,
  673. ) (types.NodeInfo, error) {
  674. nodeInfo := types.NodeInfo{
  675. ProtocolVersion: types.ProtocolVersion{
  676. P2P: version.P2PProtocol, // global
  677. Block: state.Version.Consensus.Block,
  678. App: state.Version.Consensus.App,
  679. },
  680. NodeID: nodeKey.ID,
  681. Network: genDoc.ChainID,
  682. Version: version.TMVersion,
  683. Channels: []byte{},
  684. Moniker: config.Moniker,
  685. Other: types.NodeInfoOther{
  686. TxIndex: "off",
  687. RPCAddress: config.RPC.ListenAddress,
  688. },
  689. }
  690. if config.P2P.PexReactor {
  691. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  692. }
  693. lAddr := config.P2P.ExternalAddress
  694. if lAddr == "" {
  695. lAddr = config.P2P.ListenAddress
  696. }
  697. nodeInfo.ListenAddr = lAddr
  698. err := nodeInfo.Validate()
  699. return nodeInfo, err
  700. }