You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

779 lines
20 KiB

  1. package node
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "math"
  8. "net"
  9. _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
  10. "strings"
  11. "time"
  12. dbm "github.com/tendermint/tm-db"
  13. abci "github.com/tendermint/tendermint/abci/types"
  14. bcv0 "github.com/tendermint/tendermint/blockchain/v0"
  15. bcv2 "github.com/tendermint/tendermint/blockchain/v2"
  16. cfg "github.com/tendermint/tendermint/config"
  17. cs "github.com/tendermint/tendermint/consensus"
  18. "github.com/tendermint/tendermint/crypto"
  19. "github.com/tendermint/tendermint/evidence"
  20. "github.com/tendermint/tendermint/libs/log"
  21. "github.com/tendermint/tendermint/libs/service"
  22. tmStrings "github.com/tendermint/tendermint/libs/strings"
  23. mempl "github.com/tendermint/tendermint/mempool"
  24. "github.com/tendermint/tendermint/p2p"
  25. "github.com/tendermint/tendermint/p2p/pex"
  26. protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  27. "github.com/tendermint/tendermint/proxy"
  28. sm "github.com/tendermint/tendermint/state"
  29. "github.com/tendermint/tendermint/state/indexer"
  30. kv "github.com/tendermint/tendermint/state/indexer/sink/kv"
  31. null "github.com/tendermint/tendermint/state/indexer/sink/null"
  32. psql "github.com/tendermint/tendermint/state/indexer/sink/psql"
  33. "github.com/tendermint/tendermint/statesync"
  34. "github.com/tendermint/tendermint/store"
  35. "github.com/tendermint/tendermint/types"
  36. "github.com/tendermint/tendermint/version"
  37. )
  38. func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
  39. var blockStoreDB dbm.DB
  40. blockStoreDB, err = dbProvider(&DBContext{"blockstore", config})
  41. if err != nil {
  42. return
  43. }
  44. blockStore = store.NewBlockStore(blockStoreDB)
  45. stateDB, err = dbProvider(&DBContext{"state", config})
  46. return
  47. }
  48. func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) {
  49. proxyApp := proxy.NewAppConns(clientCreator)
  50. proxyApp.SetLogger(logger.With("module", "proxy"))
  51. if err := proxyApp.Start(); err != nil {
  52. return nil, fmt.Errorf("error starting proxy app connections: %v", err)
  53. }
  54. return proxyApp, nil
  55. }
  56. func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
  57. eventBus := types.NewEventBus()
  58. eventBus.SetLogger(logger.With("module", "events"))
  59. if err := eventBus.Start(); err != nil {
  60. return nil, err
  61. }
  62. return eventBus, nil
  63. }
  64. func createAndStartIndexerService(
  65. config *cfg.Config,
  66. dbProvider DBProvider,
  67. eventBus *types.EventBus,
  68. logger log.Logger,
  69. chainID string,
  70. ) (*indexer.Service, []indexer.EventSink, error) {
  71. eventSinks := []indexer.EventSink{}
  72. // Check duplicated sinks.
  73. sinks := map[string]bool{}
  74. for _, s := range config.TxIndex.Indexer {
  75. sl := strings.ToLower(s)
  76. if sinks[sl] {
  77. return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
  78. }
  79. sinks[sl] = true
  80. }
  81. loop:
  82. for k := range sinks {
  83. switch k {
  84. case string(indexer.NULL):
  85. // when we see null in the config, the eventsinks will be reset with the nullEventSink.
  86. eventSinks = []indexer.EventSink{null.NewEventSink()}
  87. break loop
  88. case string(indexer.KV):
  89. store, err := dbProvider(&DBContext{"tx_index", config})
  90. if err != nil {
  91. return nil, nil, err
  92. }
  93. eventSinks = append(eventSinks, kv.NewEventSink(store))
  94. case string(indexer.PSQL):
  95. conn := config.TxIndex.PsqlConn
  96. if conn == "" {
  97. return nil, nil, errors.New("the psql connection settings cannot be empty")
  98. }
  99. es, _, err := psql.NewEventSink(conn, chainID)
  100. if err != nil {
  101. return nil, nil, err
  102. }
  103. eventSinks = append(eventSinks, es)
  104. default:
  105. return nil, nil, errors.New("unsupported event sink type")
  106. }
  107. }
  108. if len(eventSinks) == 0 {
  109. eventSinks = []indexer.EventSink{null.NewEventSink()}
  110. }
  111. indexerService := indexer.NewIndexerService(eventSinks, eventBus)
  112. indexerService.SetLogger(logger.With("module", "txindex"))
  113. if err := indexerService.Start(); err != nil {
  114. return nil, nil, err
  115. }
  116. return indexerService, eventSinks, nil
  117. }
  118. func doHandshake(
  119. stateStore sm.Store,
  120. state sm.State,
  121. blockStore sm.BlockStore,
  122. genDoc *types.GenesisDoc,
  123. eventBus types.BlockEventPublisher,
  124. proxyApp proxy.AppConns,
  125. consensusLogger log.Logger) error {
  126. handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
  127. handshaker.SetLogger(consensusLogger)
  128. handshaker.SetEventBus(eventBus)
  129. if err := handshaker.Handshake(proxyApp); err != nil {
  130. return fmt.Errorf("error during handshake: %v", err)
  131. }
  132. return nil
  133. }
  134. func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) {
  135. // Log the version info.
  136. logger.Info("Version info",
  137. "tmVersion", version.TMVersion,
  138. "block", version.BlockProtocol,
  139. "p2p", version.P2PProtocol,
  140. "mode", mode,
  141. )
  142. // If the state and software differ in block version, at least log it.
  143. if state.Version.Consensus.Block != version.BlockProtocol {
  144. logger.Info("Software and state have different block protocols",
  145. "software", version.BlockProtocol,
  146. "state", state.Version.Consensus.Block,
  147. )
  148. }
  149. switch {
  150. case mode == cfg.ModeFull:
  151. consensusLogger.Info("This node is a fullnode")
  152. case mode == cfg.ModeValidator:
  153. addr := pubKey.Address()
  154. // Log whether this node is a validator or an observer
  155. if state.Validators.HasAddress(addr) {
  156. consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
  157. } else {
  158. consensusLogger.Info("This node is a validator (NOT in the active validator set)",
  159. "addr", addr, "pubKey", pubKey.Bytes())
  160. }
  161. }
  162. }
  163. func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
  164. if state.Validators.Size() > 1 {
  165. return false
  166. }
  167. addr, _ := state.Validators.GetByIndex(0)
  168. return pubKey != nil && bytes.Equal(pubKey.Address(), addr)
  169. }
  170. func createMempoolReactor(
  171. config *cfg.Config,
  172. proxyApp proxy.AppConns,
  173. state sm.State,
  174. memplMetrics *mempl.Metrics,
  175. peerManager *p2p.PeerManager,
  176. router *p2p.Router,
  177. logger log.Logger,
  178. ) (*p2p.ReactorShim, *mempl.Reactor, *mempl.CListMempool) {
  179. logger = logger.With("module", "mempool")
  180. mempool := mempl.NewCListMempool(
  181. config.Mempool,
  182. proxyApp.Mempool(),
  183. state.LastBlockHeight,
  184. mempl.WithMetrics(memplMetrics),
  185. mempl.WithPreCheck(sm.TxPreCheck(state)),
  186. mempl.WithPostCheck(sm.TxPostCheck(state)),
  187. )
  188. mempool.SetLogger(logger)
  189. channelShims := mempl.GetChannelShims(config.Mempool)
  190. reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims)
  191. var (
  192. channels map[p2p.ChannelID]*p2p.Channel
  193. peerUpdates *p2p.PeerUpdates
  194. )
  195. if config.P2P.DisableLegacy {
  196. channels = makeChannelsFromShims(router, channelShims)
  197. peerUpdates = peerManager.Subscribe()
  198. } else {
  199. channels = getChannelsFromShim(reactorShim)
  200. peerUpdates = reactorShim.PeerUpdates
  201. }
  202. reactor := mempl.NewReactor(
  203. logger,
  204. config.Mempool,
  205. peerManager,
  206. mempool,
  207. channels[mempl.MempoolChannel],
  208. peerUpdates,
  209. )
  210. if config.Consensus.WaitForTxs() {
  211. mempool.EnableTxsAvailable()
  212. }
  213. return reactorShim, reactor, mempool
  214. }
  215. func createEvidenceReactor(
  216. config *cfg.Config,
  217. dbProvider DBProvider,
  218. stateDB dbm.DB,
  219. blockStore *store.BlockStore,
  220. peerManager *p2p.PeerManager,
  221. router *p2p.Router,
  222. logger log.Logger,
  223. ) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
  224. evidenceDB, err := dbProvider(&DBContext{"evidence", config})
  225. if err != nil {
  226. return nil, nil, nil, err
  227. }
  228. logger = logger.With("module", "evidence")
  229. reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
  230. evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
  231. if err != nil {
  232. return nil, nil, nil, err
  233. }
  234. var (
  235. channels map[p2p.ChannelID]*p2p.Channel
  236. peerUpdates *p2p.PeerUpdates
  237. )
  238. if config.P2P.DisableLegacy {
  239. channels = makeChannelsFromShims(router, evidence.ChannelShims)
  240. peerUpdates = peerManager.Subscribe()
  241. } else {
  242. channels = getChannelsFromShim(reactorShim)
  243. peerUpdates = reactorShim.PeerUpdates
  244. }
  245. evidenceReactor := evidence.NewReactor(
  246. logger,
  247. channels[evidence.EvidenceChannel],
  248. peerUpdates,
  249. evidencePool,
  250. )
  251. return reactorShim, evidenceReactor, evidencePool, nil
  252. }
  253. func createBlockchainReactor(
  254. logger log.Logger,
  255. config *cfg.Config,
  256. state sm.State,
  257. blockExec *sm.BlockExecutor,
  258. blockStore *store.BlockStore,
  259. csReactor *cs.Reactor,
  260. peerManager *p2p.PeerManager,
  261. router *p2p.Router,
  262. fastSync bool,
  263. ) (*p2p.ReactorShim, service.Service, error) {
  264. logger = logger.With("module", "blockchain")
  265. switch config.FastSync.Version {
  266. case cfg.BlockchainV0:
  267. reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
  268. var (
  269. channels map[p2p.ChannelID]*p2p.Channel
  270. peerUpdates *p2p.PeerUpdates
  271. )
  272. if config.P2P.DisableLegacy {
  273. channels = makeChannelsFromShims(router, bcv0.ChannelShims)
  274. peerUpdates = peerManager.Subscribe()
  275. } else {
  276. channels = getChannelsFromShim(reactorShim)
  277. peerUpdates = reactorShim.PeerUpdates
  278. }
  279. reactor, err := bcv0.NewReactor(
  280. logger, state.Copy(), blockExec, blockStore, csReactor,
  281. channels[bcv0.BlockchainChannel], peerUpdates, fastSync,
  282. )
  283. if err != nil {
  284. return nil, nil, err
  285. }
  286. return reactorShim, reactor, nil
  287. case cfg.BlockchainV2:
  288. reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
  289. reactor.SetLogger(logger)
  290. return nil, reactor, nil
  291. default:
  292. return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
  293. }
  294. }
  295. func createConsensusReactor(
  296. config *cfg.Config,
  297. state sm.State,
  298. blockExec *sm.BlockExecutor,
  299. blockStore sm.BlockStore,
  300. mempool *mempl.CListMempool,
  301. evidencePool *evidence.Pool,
  302. privValidator types.PrivValidator,
  303. csMetrics *cs.Metrics,
  304. waitSync bool,
  305. eventBus *types.EventBus,
  306. peerManager *p2p.PeerManager,
  307. router *p2p.Router,
  308. logger log.Logger,
  309. ) (*p2p.ReactorShim, *cs.Reactor, *cs.State) {
  310. consensusState := cs.NewState(
  311. config.Consensus,
  312. state.Copy(),
  313. blockExec,
  314. blockStore,
  315. mempool,
  316. evidencePool,
  317. cs.StateMetrics(csMetrics),
  318. )
  319. consensusState.SetLogger(logger)
  320. if privValidator != nil && config.Mode == cfg.ModeValidator {
  321. consensusState.SetPrivValidator(privValidator)
  322. }
  323. reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims)
  324. var (
  325. channels map[p2p.ChannelID]*p2p.Channel
  326. peerUpdates *p2p.PeerUpdates
  327. )
  328. if config.P2P.DisableLegacy {
  329. channels = makeChannelsFromShims(router, cs.ChannelShims)
  330. peerUpdates = peerManager.Subscribe()
  331. } else {
  332. channels = getChannelsFromShim(reactorShim)
  333. peerUpdates = reactorShim.PeerUpdates
  334. }
  335. reactor := cs.NewReactor(
  336. logger,
  337. consensusState,
  338. channels[cs.StateChannel],
  339. channels[cs.DataChannel],
  340. channels[cs.VoteChannel],
  341. channels[cs.VoteSetBitsChannel],
  342. peerUpdates,
  343. waitSync,
  344. cs.ReactorMetrics(csMetrics),
  345. )
  346. // Services which will be publishing and/or subscribing for messages (events)
  347. // consensusReactor will set it on consensusState and blockExecutor.
  348. reactor.SetEventBus(eventBus)
  349. return reactorShim, reactor, consensusState
  350. }
  351. func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport {
  352. return p2p.NewMConnTransport(
  353. logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{},
  354. p2p.MConnTransportOptions{
  355. MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers +
  356. len(tmStrings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")),
  357. ),
  358. },
  359. )
  360. }
  361. func createPeerManager(
  362. config *cfg.Config,
  363. dbProvider DBProvider,
  364. p2pLogger log.Logger,
  365. nodeID p2p.NodeID,
  366. ) (*p2p.PeerManager, error) {
  367. var maxConns uint16
  368. switch {
  369. case config.P2P.MaxConnections > 0:
  370. maxConns = config.P2P.MaxConnections
  371. case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0:
  372. x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers
  373. if x > math.MaxUint16 {
  374. return nil, fmt.Errorf(
  375. "max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)",
  376. config.P2P.MaxNumInboundPeers,
  377. config.P2P.MaxNumOutboundPeers,
  378. math.MaxUint16,
  379. )
  380. }
  381. maxConns = uint16(x)
  382. default:
  383. maxConns = 64
  384. }
  385. privatePeerIDs := make(map[p2p.NodeID]struct{})
  386. for _, id := range tmStrings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") {
  387. privatePeerIDs[p2p.NodeID(id)] = struct{}{}
  388. }
  389. options := p2p.PeerManagerOptions{
  390. MaxConnected: maxConns,
  391. MaxConnectedUpgrade: 4,
  392. MaxPeers: 1000,
  393. MinRetryTime: 100 * time.Millisecond,
  394. MaxRetryTime: 8 * time.Hour,
  395. MaxRetryTimePersistent: 5 * time.Minute,
  396. RetryTimeJitter: 3 * time.Second,
  397. PrivatePeers: privatePeerIDs,
  398. }
  399. peers := []p2p.NodeAddress{}
  400. for _, p := range tmStrings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") {
  401. address, err := p2p.ParseNodeAddress(p)
  402. if err != nil {
  403. return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
  404. }
  405. peers = append(peers, address)
  406. options.PersistentPeers = append(options.PersistentPeers, address.NodeID)
  407. }
  408. for _, p := range tmStrings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") {
  409. address, err := p2p.ParseNodeAddress(p)
  410. if err != nil {
  411. return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
  412. }
  413. peers = append(peers, address)
  414. }
  415. peerDB, err := dbProvider(&DBContext{"peerstore", config})
  416. if err != nil {
  417. return nil, err
  418. }
  419. peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options)
  420. if err != nil {
  421. return nil, fmt.Errorf("failed to create peer manager: %w", err)
  422. }
  423. for _, peer := range peers {
  424. if _, err := peerManager.Add(peer); err != nil {
  425. return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
  426. }
  427. }
  428. return peerManager, nil
  429. }
  430. func createRouter(
  431. p2pLogger log.Logger,
  432. p2pMetrics *p2p.Metrics,
  433. nodeInfo p2p.NodeInfo,
  434. privKey crypto.PrivKey,
  435. peerManager *p2p.PeerManager,
  436. transport p2p.Transport,
  437. options p2p.RouterOptions,
  438. ) (*p2p.Router, error) {
  439. return p2p.NewRouter(
  440. p2pLogger,
  441. p2pMetrics,
  442. nodeInfo,
  443. privKey,
  444. peerManager,
  445. []p2p.Transport{transport},
  446. options,
  447. )
  448. }
  449. func createSwitch(
  450. config *cfg.Config,
  451. transport p2p.Transport,
  452. p2pMetrics *p2p.Metrics,
  453. mempoolReactor *p2p.ReactorShim,
  454. bcReactor p2p.Reactor,
  455. stateSyncReactor *p2p.ReactorShim,
  456. consensusReactor *p2p.ReactorShim,
  457. evidenceReactor *p2p.ReactorShim,
  458. proxyApp proxy.AppConns,
  459. nodeInfo p2p.NodeInfo,
  460. nodeKey p2p.NodeKey,
  461. p2pLogger log.Logger,
  462. ) *p2p.Switch {
  463. var (
  464. connFilters = []p2p.ConnFilterFunc{}
  465. peerFilters = []p2p.PeerFilterFunc{}
  466. )
  467. if !config.P2P.AllowDuplicateIP {
  468. connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
  469. }
  470. // Filter peers by addr or pubkey with an ABCI query.
  471. // If the query return code is OK, add peer.
  472. if config.FilterPeers {
  473. connFilters = append(
  474. connFilters,
  475. // ABCI query for address filtering.
  476. func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
  477. res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
  478. Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
  479. })
  480. if err != nil {
  481. return err
  482. }
  483. if res.IsErr() {
  484. return fmt.Errorf("error querying abci app: %v", res)
  485. }
  486. return nil
  487. },
  488. )
  489. peerFilters = append(
  490. peerFilters,
  491. // ABCI query for ID filtering.
  492. func(_ p2p.IPeerSet, p p2p.Peer) error {
  493. res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
  494. Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
  495. })
  496. if err != nil {
  497. return err
  498. }
  499. if res.IsErr() {
  500. return fmt.Errorf("error querying abci app: %v", res)
  501. }
  502. return nil
  503. },
  504. )
  505. }
  506. sw := p2p.NewSwitch(
  507. config.P2P,
  508. transport,
  509. p2p.WithMetrics(p2pMetrics),
  510. p2p.SwitchPeerFilters(peerFilters...),
  511. p2p.SwitchConnFilters(connFilters...),
  512. )
  513. sw.SetLogger(p2pLogger)
  514. if config.Mode != cfg.ModeSeed {
  515. sw.AddReactor("MEMPOOL", mempoolReactor)
  516. sw.AddReactor("BLOCKCHAIN", bcReactor)
  517. sw.AddReactor("CONSENSUS", consensusReactor)
  518. sw.AddReactor("EVIDENCE", evidenceReactor)
  519. sw.AddReactor("STATESYNC", stateSyncReactor)
  520. }
  521. sw.SetNodeInfo(nodeInfo)
  522. sw.SetNodeKey(nodeKey)
  523. p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile())
  524. return sw
  525. }
  526. func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
  527. p2pLogger log.Logger, nodeKey p2p.NodeKey) (pex.AddrBook, error) {
  528. addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  529. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  530. // Add ourselves to addrbook to prevent dialing ourselves
  531. if config.P2P.ExternalAddress != "" {
  532. addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ExternalAddress))
  533. if err != nil {
  534. return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
  535. }
  536. addrBook.AddOurAddress(addr)
  537. }
  538. if config.P2P.ListenAddress != "" {
  539. addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ListenAddress))
  540. if err != nil {
  541. return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
  542. }
  543. addrBook.AddOurAddress(addr)
  544. }
  545. sw.SetAddrBook(addrBook)
  546. return addrBook, nil
  547. }
  548. func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
  549. sw *p2p.Switch, logger log.Logger) *pex.Reactor {
  550. reactorConfig := &pex.ReactorConfig{
  551. Seeds: tmStrings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "),
  552. SeedMode: config.Mode == cfg.ModeSeed,
  553. // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
  554. // blocks assuming 10s blocks ~ 28 hours.
  555. // TODO (melekes): make it dynamic based on the actual block latencies
  556. // from the live network.
  557. // https://github.com/tendermint/tendermint/issues/3523
  558. SeedDisconnectWaitPeriod: 28 * time.Hour,
  559. PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
  560. }
  561. // TODO persistent peers ? so we can have their DNS addrs saved
  562. pexReactor := pex.NewReactor(addrBook, reactorConfig)
  563. pexReactor.SetLogger(logger.With("module", "pex"))
  564. sw.AddReactor("PEX", pexReactor)
  565. return pexReactor
  566. }
  567. func createPEXReactorV2(
  568. config *cfg.Config,
  569. logger log.Logger,
  570. peerManager *p2p.PeerManager,
  571. router *p2p.Router,
  572. ) (*pex.ReactorV2, error) {
  573. channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 4096)
  574. if err != nil {
  575. return nil, err
  576. }
  577. peerUpdates := peerManager.Subscribe()
  578. return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil
  579. }
  580. func makeNodeInfo(
  581. config *cfg.Config,
  582. nodeKey p2p.NodeKey,
  583. eventSinks []indexer.EventSink,
  584. genDoc *types.GenesisDoc,
  585. state sm.State,
  586. ) (p2p.NodeInfo, error) {
  587. txIndexerStatus := "off"
  588. if indexer.IndexingEnabled(eventSinks) {
  589. txIndexerStatus = "on"
  590. }
  591. var bcChannel byte
  592. switch config.FastSync.Version {
  593. case cfg.BlockchainV0:
  594. bcChannel = byte(bcv0.BlockchainChannel)
  595. case cfg.BlockchainV2:
  596. bcChannel = bcv2.BlockchainChannel
  597. default:
  598. return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
  599. }
  600. nodeInfo := p2p.NodeInfo{
  601. ProtocolVersion: p2p.NewProtocolVersion(
  602. version.P2PProtocol, // global
  603. state.Version.Consensus.Block,
  604. state.Version.Consensus.App,
  605. ),
  606. NodeID: nodeKey.ID,
  607. Network: genDoc.ChainID,
  608. Version: version.TMVersion,
  609. Channels: []byte{
  610. bcChannel,
  611. byte(cs.StateChannel),
  612. byte(cs.DataChannel),
  613. byte(cs.VoteChannel),
  614. byte(cs.VoteSetBitsChannel),
  615. byte(mempl.MempoolChannel),
  616. byte(evidence.EvidenceChannel),
  617. byte(statesync.SnapshotChannel),
  618. byte(statesync.ChunkChannel),
  619. },
  620. Moniker: config.Moniker,
  621. Other: p2p.NodeInfoOther{
  622. TxIndex: txIndexerStatus,
  623. RPCAddress: config.RPC.ListenAddress,
  624. },
  625. }
  626. if config.P2P.PexReactor {
  627. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  628. }
  629. lAddr := config.P2P.ExternalAddress
  630. if lAddr == "" {
  631. lAddr = config.P2P.ListenAddress
  632. }
  633. nodeInfo.ListenAddr = lAddr
  634. err := nodeInfo.Validate()
  635. return nodeInfo, err
  636. }
  637. func makeSeedNodeInfo(
  638. config *cfg.Config,
  639. nodeKey p2p.NodeKey,
  640. genDoc *types.GenesisDoc,
  641. state sm.State,
  642. ) (p2p.NodeInfo, error) {
  643. nodeInfo := p2p.NodeInfo{
  644. ProtocolVersion: p2p.NewProtocolVersion(
  645. version.P2PProtocol, // global
  646. state.Version.Consensus.Block,
  647. state.Version.Consensus.App,
  648. ),
  649. NodeID: nodeKey.ID,
  650. Network: genDoc.ChainID,
  651. Version: version.TMVersion,
  652. Channels: []byte{},
  653. Moniker: config.Moniker,
  654. Other: p2p.NodeInfoOther{
  655. TxIndex: "off",
  656. RPCAddress: config.RPC.ListenAddress,
  657. },
  658. }
  659. if config.P2P.PexReactor {
  660. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  661. }
  662. lAddr := config.P2P.ExternalAddress
  663. if lAddr == "" {
  664. lAddr = config.P2P.ListenAddress
  665. }
  666. nodeInfo.ListenAddr = lAddr
  667. err := nodeInfo.Validate()
  668. return nodeInfo, err
  669. }