You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

816 lines
22 KiB

  1. package node
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "math"
  8. "net"
  9. _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
  10. "strings"
  11. "time"
  12. dbm "github.com/tendermint/tm-db"
  13. abci "github.com/tendermint/tendermint/abci/types"
  14. cfg "github.com/tendermint/tendermint/config"
  15. "github.com/tendermint/tendermint/crypto"
  16. bcv0 "github.com/tendermint/tendermint/internal/blockchain/v0"
  17. bcv2 "github.com/tendermint/tendermint/internal/blockchain/v2"
  18. cs "github.com/tendermint/tendermint/internal/consensus"
  19. "github.com/tendermint/tendermint/internal/evidence"
  20. "github.com/tendermint/tendermint/internal/mempool"
  21. mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0"
  22. mempoolv1 "github.com/tendermint/tendermint/internal/mempool/v1"
  23. "github.com/tendermint/tendermint/internal/p2p"
  24. "github.com/tendermint/tendermint/internal/p2p/pex"
  25. "github.com/tendermint/tendermint/internal/statesync"
  26. "github.com/tendermint/tendermint/libs/log"
  27. "github.com/tendermint/tendermint/libs/service"
  28. tmstrings "github.com/tendermint/tendermint/libs/strings"
  29. protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  30. "github.com/tendermint/tendermint/proxy"
  31. sm "github.com/tendermint/tendermint/state"
  32. "github.com/tendermint/tendermint/state/indexer"
  33. kv "github.com/tendermint/tendermint/state/indexer/sink/kv"
  34. null "github.com/tendermint/tendermint/state/indexer/sink/null"
  35. psql "github.com/tendermint/tendermint/state/indexer/sink/psql"
  36. "github.com/tendermint/tendermint/store"
  37. "github.com/tendermint/tendermint/types"
  38. "github.com/tendermint/tendermint/version"
  39. )
  40. func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
  41. var blockStoreDB dbm.DB
  42. blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config})
  43. if err != nil {
  44. return
  45. }
  46. blockStore = store.NewBlockStore(blockStoreDB)
  47. stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config})
  48. return
  49. }
  50. func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) {
  51. proxyApp := proxy.NewAppConns(clientCreator)
  52. proxyApp.SetLogger(logger.With("module", "proxy"))
  53. if err := proxyApp.Start(); err != nil {
  54. return nil, fmt.Errorf("error starting proxy app connections: %v", err)
  55. }
  56. return proxyApp, nil
  57. }
  58. func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
  59. eventBus := types.NewEventBus()
  60. eventBus.SetLogger(logger.With("module", "events"))
  61. if err := eventBus.Start(); err != nil {
  62. return nil, err
  63. }
  64. return eventBus, nil
  65. }
  66. func createAndStartIndexerService(
  67. config *cfg.Config,
  68. dbProvider cfg.DBProvider,
  69. eventBus *types.EventBus,
  70. logger log.Logger,
  71. chainID string,
  72. ) (*indexer.Service, []indexer.EventSink, error) {
  73. eventSinks := []indexer.EventSink{}
  74. // Check duplicated sinks.
  75. sinks := map[string]bool{}
  76. for _, s := range config.TxIndex.Indexer {
  77. sl := strings.ToLower(s)
  78. if sinks[sl] {
  79. return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
  80. }
  81. sinks[sl] = true
  82. }
  83. loop:
  84. for k := range sinks {
  85. switch k {
  86. case string(indexer.NULL):
  87. // when we see null in the config, the eventsinks will be reset with the nullEventSink.
  88. eventSinks = []indexer.EventSink{null.NewEventSink()}
  89. break loop
  90. case string(indexer.KV):
  91. store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config})
  92. if err != nil {
  93. return nil, nil, err
  94. }
  95. eventSinks = append(eventSinks, kv.NewEventSink(store))
  96. case string(indexer.PSQL):
  97. conn := config.TxIndex.PsqlConn
  98. if conn == "" {
  99. return nil, nil, errors.New("the psql connection settings cannot be empty")
  100. }
  101. es, _, err := psql.NewEventSink(conn, chainID)
  102. if err != nil {
  103. return nil, nil, err
  104. }
  105. eventSinks = append(eventSinks, es)
  106. default:
  107. return nil, nil, errors.New("unsupported event sink type")
  108. }
  109. }
  110. if len(eventSinks) == 0 {
  111. eventSinks = []indexer.EventSink{null.NewEventSink()}
  112. }
  113. indexerService := indexer.NewIndexerService(eventSinks, eventBus)
  114. indexerService.SetLogger(logger.With("module", "txindex"))
  115. if err := indexerService.Start(); err != nil {
  116. return nil, nil, err
  117. }
  118. return indexerService, eventSinks, nil
  119. }
  120. func doHandshake(
  121. stateStore sm.Store,
  122. state sm.State,
  123. blockStore sm.BlockStore,
  124. genDoc *types.GenesisDoc,
  125. eventBus types.BlockEventPublisher,
  126. proxyApp proxy.AppConns,
  127. consensusLogger log.Logger) error {
  128. handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
  129. handshaker.SetLogger(consensusLogger)
  130. handshaker.SetEventBus(eventBus)
  131. if err := handshaker.Handshake(proxyApp); err != nil {
  132. return fmt.Errorf("error during handshake: %v", err)
  133. }
  134. return nil
  135. }
  136. func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) {
  137. // Log the version info.
  138. logger.Info("Version info",
  139. "tmVersion", version.TMVersion,
  140. "block", version.BlockProtocol,
  141. "p2p", version.P2PProtocol,
  142. "mode", mode,
  143. )
  144. // If the state and software differ in block version, at least log it.
  145. if state.Version.Consensus.Block != version.BlockProtocol {
  146. logger.Info("Software and state have different block protocols",
  147. "software", version.BlockProtocol,
  148. "state", state.Version.Consensus.Block,
  149. )
  150. }
  151. switch {
  152. case mode == cfg.ModeFull:
  153. consensusLogger.Info("This node is a fullnode")
  154. case mode == cfg.ModeValidator:
  155. addr := pubKey.Address()
  156. // Log whether this node is a validator or an observer
  157. if state.Validators.HasAddress(addr) {
  158. consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
  159. } else {
  160. consensusLogger.Info("This node is a validator (NOT in the active validator set)",
  161. "addr", addr, "pubKey", pubKey.Bytes())
  162. }
  163. }
  164. }
  165. func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
  166. if state.Validators.Size() > 1 {
  167. return false
  168. }
  169. addr, _ := state.Validators.GetByIndex(0)
  170. return pubKey != nil && bytes.Equal(pubKey.Address(), addr)
  171. }
  172. func createMempoolReactor(
  173. config *cfg.Config,
  174. proxyApp proxy.AppConns,
  175. state sm.State,
  176. memplMetrics *mempool.Metrics,
  177. peerManager *p2p.PeerManager,
  178. router *p2p.Router,
  179. logger log.Logger,
  180. ) (*p2p.ReactorShim, service.Service, mempool.Mempool, error) {
  181. logger = logger.With("module", "mempool", "version", config.Mempool.Version)
  182. channelShims := mempoolv0.GetChannelShims(config.Mempool)
  183. reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims)
  184. var (
  185. channels map[p2p.ChannelID]*p2p.Channel
  186. peerUpdates *p2p.PeerUpdates
  187. )
  188. if config.P2P.DisableLegacy {
  189. channels = makeChannelsFromShims(router, channelShims)
  190. peerUpdates = peerManager.Subscribe()
  191. } else {
  192. channels = getChannelsFromShim(reactorShim)
  193. peerUpdates = reactorShim.PeerUpdates
  194. }
  195. switch config.Mempool.Version {
  196. case cfg.MempoolV0:
  197. mp := mempoolv0.NewCListMempool(
  198. config.Mempool,
  199. proxyApp.Mempool(),
  200. state.LastBlockHeight,
  201. mempoolv0.WithMetrics(memplMetrics),
  202. mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
  203. mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
  204. )
  205. mp.SetLogger(logger)
  206. reactor := mempoolv0.NewReactor(
  207. logger,
  208. config.Mempool,
  209. peerManager,
  210. mp,
  211. channels[mempool.MempoolChannel],
  212. peerUpdates,
  213. )
  214. if config.Consensus.WaitForTxs() {
  215. mp.EnableTxsAvailable()
  216. }
  217. return reactorShim, reactor, mp, nil
  218. case cfg.MempoolV1:
  219. mp := mempoolv1.NewTxMempool(
  220. logger,
  221. config.Mempool,
  222. proxyApp.Mempool(),
  223. state.LastBlockHeight,
  224. mempoolv1.WithMetrics(memplMetrics),
  225. mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
  226. mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
  227. )
  228. reactor := mempoolv1.NewReactor(
  229. logger,
  230. config.Mempool,
  231. peerManager,
  232. mp,
  233. channels[mempool.MempoolChannel],
  234. peerUpdates,
  235. )
  236. if config.Consensus.WaitForTxs() {
  237. mp.EnableTxsAvailable()
  238. }
  239. return reactorShim, reactor, mp, nil
  240. default:
  241. return nil, nil, nil, fmt.Errorf("unknown mempool version: %s", config.Mempool.Version)
  242. }
  243. }
  244. func createEvidenceReactor(
  245. config *cfg.Config,
  246. dbProvider cfg.DBProvider,
  247. stateDB dbm.DB,
  248. blockStore *store.BlockStore,
  249. peerManager *p2p.PeerManager,
  250. router *p2p.Router,
  251. logger log.Logger,
  252. ) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
  253. evidenceDB, err := dbProvider(&cfg.DBContext{ID: "evidence", Config: config})
  254. if err != nil {
  255. return nil, nil, nil, err
  256. }
  257. logger = logger.With("module", "evidence")
  258. reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
  259. evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
  260. if err != nil {
  261. return nil, nil, nil, fmt.Errorf("creating evidence pool: %w", err)
  262. }
  263. var (
  264. channels map[p2p.ChannelID]*p2p.Channel
  265. peerUpdates *p2p.PeerUpdates
  266. )
  267. if config.P2P.DisableLegacy {
  268. channels = makeChannelsFromShims(router, evidence.ChannelShims)
  269. peerUpdates = peerManager.Subscribe()
  270. } else {
  271. channels = getChannelsFromShim(reactorShim)
  272. peerUpdates = reactorShim.PeerUpdates
  273. }
  274. evidenceReactor := evidence.NewReactor(
  275. logger,
  276. channels[evidence.EvidenceChannel],
  277. peerUpdates,
  278. evidencePool,
  279. )
  280. return reactorShim, evidenceReactor, evidencePool, nil
  281. }
  282. func createBlockchainReactor(
  283. logger log.Logger,
  284. config *cfg.Config,
  285. state sm.State,
  286. blockExec *sm.BlockExecutor,
  287. blockStore *store.BlockStore,
  288. csReactor *cs.Reactor,
  289. peerManager *p2p.PeerManager,
  290. router *p2p.Router,
  291. fastSync bool,
  292. metrics *cs.Metrics,
  293. ) (*p2p.ReactorShim, service.Service, error) {
  294. logger = logger.With("module", "blockchain")
  295. switch config.FastSync.Version {
  296. case cfg.BlockchainV0:
  297. reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
  298. var (
  299. channels map[p2p.ChannelID]*p2p.Channel
  300. peerUpdates *p2p.PeerUpdates
  301. )
  302. if config.P2P.DisableLegacy {
  303. channels = makeChannelsFromShims(router, bcv0.ChannelShims)
  304. peerUpdates = peerManager.Subscribe()
  305. } else {
  306. channels = getChannelsFromShim(reactorShim)
  307. peerUpdates = reactorShim.PeerUpdates
  308. }
  309. reactor, err := bcv0.NewReactor(
  310. logger, state.Copy(), blockExec, blockStore, csReactor,
  311. channels[bcv0.BlockchainChannel], peerUpdates, fastSync,
  312. metrics,
  313. )
  314. if err != nil {
  315. return nil, nil, err
  316. }
  317. return reactorShim, reactor, nil
  318. case cfg.BlockchainV2:
  319. reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync, metrics)
  320. reactor.SetLogger(logger)
  321. return nil, reactor, nil
  322. default:
  323. return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
  324. }
  325. }
  326. func createConsensusReactor(
  327. config *cfg.Config,
  328. state sm.State,
  329. blockExec *sm.BlockExecutor,
  330. blockStore sm.BlockStore,
  331. mp mempool.Mempool,
  332. evidencePool *evidence.Pool,
  333. privValidator types.PrivValidator,
  334. csMetrics *cs.Metrics,
  335. waitSync bool,
  336. eventBus *types.EventBus,
  337. peerManager *p2p.PeerManager,
  338. router *p2p.Router,
  339. logger log.Logger,
  340. ) (*p2p.ReactorShim, *cs.Reactor, *cs.State) {
  341. consensusState := cs.NewState(
  342. config.Consensus,
  343. state.Copy(),
  344. blockExec,
  345. blockStore,
  346. mp,
  347. evidencePool,
  348. cs.StateMetrics(csMetrics),
  349. )
  350. consensusState.SetLogger(logger)
  351. if privValidator != nil && config.Mode == cfg.ModeValidator {
  352. consensusState.SetPrivValidator(privValidator)
  353. }
  354. reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims)
  355. var (
  356. channels map[p2p.ChannelID]*p2p.Channel
  357. peerUpdates *p2p.PeerUpdates
  358. )
  359. if config.P2P.DisableLegacy {
  360. channels = makeChannelsFromShims(router, cs.ChannelShims)
  361. peerUpdates = peerManager.Subscribe()
  362. } else {
  363. channels = getChannelsFromShim(reactorShim)
  364. peerUpdates = reactorShim.PeerUpdates
  365. }
  366. reactor := cs.NewReactor(
  367. logger,
  368. consensusState,
  369. channels[cs.StateChannel],
  370. channels[cs.DataChannel],
  371. channels[cs.VoteChannel],
  372. channels[cs.VoteSetBitsChannel],
  373. peerUpdates,
  374. waitSync,
  375. cs.ReactorMetrics(csMetrics),
  376. )
  377. // Services which will be publishing and/or subscribing for messages (events)
  378. // consensusReactor will set it on consensusState and blockExecutor.
  379. reactor.SetEventBus(eventBus)
  380. return reactorShim, reactor, consensusState
  381. }
  382. func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport {
  383. return p2p.NewMConnTransport(
  384. logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{},
  385. p2p.MConnTransportOptions{
  386. MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers +
  387. len(tmstrings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")),
  388. ),
  389. },
  390. )
  391. }
  392. func createPeerManager(
  393. config *cfg.Config,
  394. dbProvider cfg.DBProvider,
  395. p2pLogger log.Logger,
  396. nodeID types.NodeID,
  397. ) (*p2p.PeerManager, error) {
  398. var maxConns uint16
  399. switch {
  400. case config.P2P.MaxConnections > 0:
  401. maxConns = config.P2P.MaxConnections
  402. case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0:
  403. x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers
  404. if x > math.MaxUint16 {
  405. return nil, fmt.Errorf(
  406. "max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)",
  407. config.P2P.MaxNumInboundPeers,
  408. config.P2P.MaxNumOutboundPeers,
  409. math.MaxUint16,
  410. )
  411. }
  412. maxConns = uint16(x)
  413. default:
  414. maxConns = 64
  415. }
  416. privatePeerIDs := make(map[types.NodeID]struct{})
  417. for _, id := range tmstrings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") {
  418. privatePeerIDs[types.NodeID(id)] = struct{}{}
  419. }
  420. options := p2p.PeerManagerOptions{
  421. MaxConnected: maxConns,
  422. MaxConnectedUpgrade: 4,
  423. MaxPeers: 1000,
  424. MinRetryTime: 100 * time.Millisecond,
  425. MaxRetryTime: 8 * time.Hour,
  426. MaxRetryTimePersistent: 5 * time.Minute,
  427. RetryTimeJitter: 3 * time.Second,
  428. PrivatePeers: privatePeerIDs,
  429. }
  430. peers := []p2p.NodeAddress{}
  431. for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") {
  432. address, err := p2p.ParseNodeAddress(p)
  433. if err != nil {
  434. return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
  435. }
  436. peers = append(peers, address)
  437. options.PersistentPeers = append(options.PersistentPeers, address.NodeID)
  438. }
  439. for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") {
  440. address, err := p2p.ParseNodeAddress(p)
  441. if err != nil {
  442. return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
  443. }
  444. peers = append(peers, address)
  445. }
  446. peerDB, err := dbProvider(&cfg.DBContext{ID: "peerstore", Config: config})
  447. if err != nil {
  448. return nil, err
  449. }
  450. peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options)
  451. if err != nil {
  452. return nil, fmt.Errorf("failed to create peer manager: %w", err)
  453. }
  454. for _, peer := range peers {
  455. if _, err := peerManager.Add(peer); err != nil {
  456. return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
  457. }
  458. }
  459. return peerManager, nil
  460. }
  461. func createRouter(
  462. p2pLogger log.Logger,
  463. p2pMetrics *p2p.Metrics,
  464. nodeInfo types.NodeInfo,
  465. privKey crypto.PrivKey,
  466. peerManager *p2p.PeerManager,
  467. transport p2p.Transport,
  468. options p2p.RouterOptions,
  469. ) (*p2p.Router, error) {
  470. return p2p.NewRouter(
  471. p2pLogger,
  472. p2pMetrics,
  473. nodeInfo,
  474. privKey,
  475. peerManager,
  476. []p2p.Transport{transport},
  477. options,
  478. )
  479. }
  480. func createSwitch(
  481. config *cfg.Config,
  482. transport p2p.Transport,
  483. p2pMetrics *p2p.Metrics,
  484. mempoolReactor *p2p.ReactorShim,
  485. bcReactor p2p.Reactor,
  486. stateSyncReactor *p2p.ReactorShim,
  487. consensusReactor *p2p.ReactorShim,
  488. evidenceReactor *p2p.ReactorShim,
  489. proxyApp proxy.AppConns,
  490. nodeInfo types.NodeInfo,
  491. nodeKey types.NodeKey,
  492. p2pLogger log.Logger,
  493. ) *p2p.Switch {
  494. var (
  495. connFilters = []p2p.ConnFilterFunc{}
  496. peerFilters = []p2p.PeerFilterFunc{}
  497. )
  498. if !config.P2P.AllowDuplicateIP {
  499. connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
  500. }
  501. // Filter peers by addr or pubkey with an ABCI query.
  502. // If the query return code is OK, add peer.
  503. if config.FilterPeers {
  504. connFilters = append(
  505. connFilters,
  506. // ABCI query for address filtering.
  507. func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
  508. res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
  509. Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
  510. })
  511. if err != nil {
  512. return err
  513. }
  514. if res.IsErr() {
  515. return fmt.Errorf("error querying abci app: %v", res)
  516. }
  517. return nil
  518. },
  519. )
  520. peerFilters = append(
  521. peerFilters,
  522. // ABCI query for ID filtering.
  523. func(_ p2p.IPeerSet, p p2p.Peer) error {
  524. res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
  525. Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
  526. })
  527. if err != nil {
  528. return err
  529. }
  530. if res.IsErr() {
  531. return fmt.Errorf("error querying abci app: %v", res)
  532. }
  533. return nil
  534. },
  535. )
  536. }
  537. sw := p2p.NewSwitch(
  538. config.P2P,
  539. transport,
  540. p2p.WithMetrics(p2pMetrics),
  541. p2p.SwitchPeerFilters(peerFilters...),
  542. p2p.SwitchConnFilters(connFilters...),
  543. )
  544. sw.SetLogger(p2pLogger)
  545. if config.Mode != cfg.ModeSeed {
  546. sw.AddReactor("MEMPOOL", mempoolReactor)
  547. sw.AddReactor("BLOCKCHAIN", bcReactor)
  548. sw.AddReactor("CONSENSUS", consensusReactor)
  549. sw.AddReactor("EVIDENCE", evidenceReactor)
  550. sw.AddReactor("STATESYNC", stateSyncReactor)
  551. }
  552. sw.SetNodeInfo(nodeInfo)
  553. sw.SetNodeKey(nodeKey)
  554. p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile())
  555. return sw
  556. }
  557. func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
  558. p2pLogger log.Logger, nodeKey types.NodeKey) (pex.AddrBook, error) {
  559. addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  560. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  561. // Add ourselves to addrbook to prevent dialing ourselves
  562. if config.P2P.ExternalAddress != "" {
  563. addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ExternalAddress))
  564. if err != nil {
  565. return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
  566. }
  567. addrBook.AddOurAddress(addr)
  568. }
  569. if config.P2P.ListenAddress != "" {
  570. addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ListenAddress))
  571. if err != nil {
  572. return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
  573. }
  574. addrBook.AddOurAddress(addr)
  575. }
  576. sw.SetAddrBook(addrBook)
  577. return addrBook, nil
  578. }
  579. func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
  580. sw *p2p.Switch, logger log.Logger) *pex.Reactor {
  581. reactorConfig := &pex.ReactorConfig{
  582. Seeds: tmstrings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "),
  583. SeedMode: config.Mode == cfg.ModeSeed,
  584. // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
  585. // blocks assuming 10s blocks ~ 28 hours.
  586. // TODO (melekes): make it dynamic based on the actual block latencies
  587. // from the live network.
  588. // https://github.com/tendermint/tendermint/issues/3523
  589. SeedDisconnectWaitPeriod: 28 * time.Hour,
  590. PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
  591. }
  592. // TODO persistent peers ? so we can have their DNS addrs saved
  593. pexReactor := pex.NewReactor(addrBook, reactorConfig)
  594. pexReactor.SetLogger(logger.With("module", "pex"))
  595. sw.AddReactor("PEX", pexReactor)
  596. return pexReactor
  597. }
  598. func createPEXReactorV2(
  599. config *cfg.Config,
  600. logger log.Logger,
  601. peerManager *p2p.PeerManager,
  602. router *p2p.Router,
  603. ) (*pex.ReactorV2, error) {
  604. channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128)
  605. if err != nil {
  606. return nil, err
  607. }
  608. peerUpdates := peerManager.Subscribe()
  609. return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil
  610. }
  611. func makeNodeInfo(
  612. config *cfg.Config,
  613. nodeKey types.NodeKey,
  614. eventSinks []indexer.EventSink,
  615. genDoc *types.GenesisDoc,
  616. state sm.State,
  617. ) (types.NodeInfo, error) {
  618. txIndexerStatus := "off"
  619. if indexer.IndexingEnabled(eventSinks) {
  620. txIndexerStatus = "on"
  621. }
  622. var bcChannel byte
  623. switch config.FastSync.Version {
  624. case cfg.BlockchainV0:
  625. bcChannel = byte(bcv0.BlockchainChannel)
  626. case cfg.BlockchainV2:
  627. bcChannel = bcv2.BlockchainChannel
  628. default:
  629. return types.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
  630. }
  631. nodeInfo := types.NodeInfo{
  632. ProtocolVersion: types.ProtocolVersion{
  633. P2P: version.P2PProtocol, // global
  634. Block: state.Version.Consensus.Block,
  635. App: state.Version.Consensus.App,
  636. },
  637. NodeID: nodeKey.ID,
  638. Network: genDoc.ChainID,
  639. Version: version.TMVersion,
  640. Channels: []byte{
  641. bcChannel,
  642. byte(cs.StateChannel),
  643. byte(cs.DataChannel),
  644. byte(cs.VoteChannel),
  645. byte(cs.VoteSetBitsChannel),
  646. byte(mempool.MempoolChannel),
  647. byte(evidence.EvidenceChannel),
  648. byte(statesync.SnapshotChannel),
  649. byte(statesync.ChunkChannel),
  650. byte(statesync.LightBlockChannel),
  651. },
  652. Moniker: config.Moniker,
  653. Other: types.NodeInfoOther{
  654. TxIndex: txIndexerStatus,
  655. RPCAddress: config.RPC.ListenAddress,
  656. },
  657. }
  658. if config.P2P.PexReactor {
  659. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  660. }
  661. lAddr := config.P2P.ExternalAddress
  662. if lAddr == "" {
  663. lAddr = config.P2P.ListenAddress
  664. }
  665. nodeInfo.ListenAddr = lAddr
  666. err := nodeInfo.Validate()
  667. return nodeInfo, err
  668. }
  669. func makeSeedNodeInfo(
  670. config *cfg.Config,
  671. nodeKey types.NodeKey,
  672. genDoc *types.GenesisDoc,
  673. state sm.State,
  674. ) (types.NodeInfo, error) {
  675. nodeInfo := types.NodeInfo{
  676. ProtocolVersion: types.ProtocolVersion{
  677. P2P: version.P2PProtocol, // global
  678. Block: state.Version.Consensus.Block,
  679. App: state.Version.Consensus.App,
  680. },
  681. NodeID: nodeKey.ID,
  682. Network: genDoc.ChainID,
  683. Version: version.TMVersion,
  684. Channels: []byte{},
  685. Moniker: config.Moniker,
  686. Other: types.NodeInfoOther{
  687. TxIndex: "off",
  688. RPCAddress: config.RPC.ListenAddress,
  689. },
  690. }
  691. if config.P2P.PexReactor {
  692. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  693. }
  694. lAddr := config.P2P.ExternalAddress
  695. if lAddr == "" {
  696. lAddr = config.P2P.ListenAddress
  697. }
  698. nodeInfo.ListenAddr = lAddr
  699. err := nodeInfo.Validate()
  700. return nodeInfo, err
  701. }