You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

771 lines
20 KiB

  1. package node
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "math"
  8. "net"
  9. _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
  10. "time"
  11. dbm "github.com/tendermint/tm-db"
  12. abci "github.com/tendermint/tendermint/abci/types"
  13. cfg "github.com/tendermint/tendermint/config"
  14. "github.com/tendermint/tendermint/crypto"
  15. bcv0 "github.com/tendermint/tendermint/internal/blocksync/v0"
  16. bcv2 "github.com/tendermint/tendermint/internal/blocksync/v2"
  17. cs "github.com/tendermint/tendermint/internal/consensus"
  18. "github.com/tendermint/tendermint/internal/evidence"
  19. "github.com/tendermint/tendermint/internal/mempool"
  20. mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0"
  21. mempoolv1 "github.com/tendermint/tendermint/internal/mempool/v1"
  22. "github.com/tendermint/tendermint/internal/p2p"
  23. "github.com/tendermint/tendermint/internal/p2p/pex"
  24. "github.com/tendermint/tendermint/internal/statesync"
  25. "github.com/tendermint/tendermint/libs/log"
  26. "github.com/tendermint/tendermint/libs/service"
  27. tmstrings "github.com/tendermint/tendermint/libs/strings"
  28. protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  29. "github.com/tendermint/tendermint/proxy"
  30. sm "github.com/tendermint/tendermint/state"
  31. "github.com/tendermint/tendermint/state/indexer"
  32. "github.com/tendermint/tendermint/state/indexer/sink"
  33. "github.com/tendermint/tendermint/store"
  34. "github.com/tendermint/tendermint/types"
  35. "github.com/tendermint/tendermint/version"
  36. )
  37. func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
  38. var blockStoreDB dbm.DB
  39. blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config})
  40. if err != nil {
  41. return
  42. }
  43. blockStore = store.NewBlockStore(blockStoreDB)
  44. stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config})
  45. return
  46. }
  47. func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) {
  48. proxyApp := proxy.NewAppConns(clientCreator)
  49. proxyApp.SetLogger(logger.With("module", "proxy"))
  50. if err := proxyApp.Start(); err != nil {
  51. return nil, fmt.Errorf("error starting proxy app connections: %v", err)
  52. }
  53. return proxyApp, nil
  54. }
  55. func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
  56. eventBus := types.NewEventBus()
  57. eventBus.SetLogger(logger.With("module", "events"))
  58. if err := eventBus.Start(); err != nil {
  59. return nil, err
  60. }
  61. return eventBus, nil
  62. }
  63. func createAndStartIndexerService(
  64. config *cfg.Config,
  65. dbProvider cfg.DBProvider,
  66. eventBus *types.EventBus,
  67. logger log.Logger,
  68. chainID string,
  69. ) (*indexer.Service, []indexer.EventSink, error) {
  70. eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID)
  71. if err != nil {
  72. return nil, nil, err
  73. }
  74. indexerService := indexer.NewIndexerService(eventSinks, eventBus)
  75. indexerService.SetLogger(logger.With("module", "txindex"))
  76. if err := indexerService.Start(); err != nil {
  77. return nil, nil, err
  78. }
  79. return indexerService, eventSinks, nil
  80. }
  81. func doHandshake(
  82. stateStore sm.Store,
  83. state sm.State,
  84. blockStore sm.BlockStore,
  85. genDoc *types.GenesisDoc,
  86. eventBus types.BlockEventPublisher,
  87. proxyApp proxy.AppConns,
  88. consensusLogger log.Logger) error {
  89. handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
  90. handshaker.SetLogger(consensusLogger)
  91. handshaker.SetEventBus(eventBus)
  92. if err := handshaker.Handshake(proxyApp); err != nil {
  93. return fmt.Errorf("error during handshake: %v", err)
  94. }
  95. return nil
  96. }
  97. func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) {
  98. // Log the version info.
  99. logger.Info("Version info",
  100. "tmVersion", version.TMVersion,
  101. "block", version.BlockProtocol,
  102. "p2p", version.P2PProtocol,
  103. "mode", mode,
  104. )
  105. // If the state and software differ in block version, at least log it.
  106. if state.Version.Consensus.Block != version.BlockProtocol {
  107. logger.Info("Software and state have different block protocols",
  108. "software", version.BlockProtocol,
  109. "state", state.Version.Consensus.Block,
  110. )
  111. }
  112. switch {
  113. case mode == cfg.ModeFull:
  114. consensusLogger.Info("This node is a fullnode")
  115. case mode == cfg.ModeValidator:
  116. addr := pubKey.Address()
  117. // Log whether this node is a validator or an observer
  118. if state.Validators.HasAddress(addr) {
  119. consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
  120. } else {
  121. consensusLogger.Info("This node is a validator (NOT in the active validator set)",
  122. "addr", addr, "pubKey", pubKey.Bytes())
  123. }
  124. }
  125. }
  126. func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
  127. if state.Validators.Size() > 1 {
  128. return false
  129. }
  130. addr, _ := state.Validators.GetByIndex(0)
  131. return pubKey != nil && bytes.Equal(pubKey.Address(), addr)
  132. }
  133. func createMempoolReactor(
  134. config *cfg.Config,
  135. proxyApp proxy.AppConns,
  136. state sm.State,
  137. memplMetrics *mempool.Metrics,
  138. peerManager *p2p.PeerManager,
  139. router *p2p.Router,
  140. logger log.Logger,
  141. ) (*p2p.ReactorShim, service.Service, mempool.Mempool, error) {
  142. logger = logger.With("module", "mempool", "version", config.Mempool.Version)
  143. channelShims := mempoolv0.GetChannelShims(config.Mempool)
  144. reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims)
  145. var (
  146. channels map[p2p.ChannelID]*p2p.Channel
  147. peerUpdates *p2p.PeerUpdates
  148. )
  149. if config.P2P.UseLegacy {
  150. channels = getChannelsFromShim(reactorShim)
  151. peerUpdates = reactorShim.PeerUpdates
  152. } else {
  153. channels = makeChannelsFromShims(router, channelShims)
  154. peerUpdates = peerManager.Subscribe()
  155. }
  156. switch config.Mempool.Version {
  157. case cfg.MempoolV0:
  158. mp := mempoolv0.NewCListMempool(
  159. config.Mempool,
  160. proxyApp.Mempool(),
  161. state.LastBlockHeight,
  162. mempoolv0.WithMetrics(memplMetrics),
  163. mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
  164. mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
  165. )
  166. mp.SetLogger(logger)
  167. reactor := mempoolv0.NewReactor(
  168. logger,
  169. config.Mempool,
  170. peerManager,
  171. mp,
  172. channels[mempool.MempoolChannel],
  173. peerUpdates,
  174. )
  175. if config.Consensus.WaitForTxs() {
  176. mp.EnableTxsAvailable()
  177. }
  178. return reactorShim, reactor, mp, nil
  179. case cfg.MempoolV1:
  180. mp := mempoolv1.NewTxMempool(
  181. logger,
  182. config.Mempool,
  183. proxyApp.Mempool(),
  184. state.LastBlockHeight,
  185. mempoolv1.WithMetrics(memplMetrics),
  186. mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
  187. mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
  188. )
  189. reactor := mempoolv1.NewReactor(
  190. logger,
  191. config.Mempool,
  192. peerManager,
  193. mp,
  194. channels[mempool.MempoolChannel],
  195. peerUpdates,
  196. )
  197. if config.Consensus.WaitForTxs() {
  198. mp.EnableTxsAvailable()
  199. }
  200. return reactorShim, reactor, mp, nil
  201. default:
  202. return nil, nil, nil, fmt.Errorf("unknown mempool version: %s", config.Mempool.Version)
  203. }
  204. }
  205. func createEvidenceReactor(
  206. config *cfg.Config,
  207. dbProvider cfg.DBProvider,
  208. stateDB dbm.DB,
  209. blockStore *store.BlockStore,
  210. peerManager *p2p.PeerManager,
  211. router *p2p.Router,
  212. logger log.Logger,
  213. ) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
  214. evidenceDB, err := dbProvider(&cfg.DBContext{ID: "evidence", Config: config})
  215. if err != nil {
  216. return nil, nil, nil, err
  217. }
  218. logger = logger.With("module", "evidence")
  219. reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
  220. evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
  221. if err != nil {
  222. return nil, nil, nil, fmt.Errorf("creating evidence pool: %w", err)
  223. }
  224. var (
  225. channels map[p2p.ChannelID]*p2p.Channel
  226. peerUpdates *p2p.PeerUpdates
  227. )
  228. if config.P2P.UseLegacy {
  229. channels = getChannelsFromShim(reactorShim)
  230. peerUpdates = reactorShim.PeerUpdates
  231. } else {
  232. channels = makeChannelsFromShims(router, evidence.ChannelShims)
  233. peerUpdates = peerManager.Subscribe()
  234. }
  235. evidenceReactor := evidence.NewReactor(
  236. logger,
  237. channels[evidence.EvidenceChannel],
  238. peerUpdates,
  239. evidencePool,
  240. )
  241. return reactorShim, evidenceReactor, evidencePool, nil
  242. }
  243. func createBlockchainReactor(
  244. logger log.Logger,
  245. config *cfg.Config,
  246. state sm.State,
  247. blockExec *sm.BlockExecutor,
  248. blockStore *store.BlockStore,
  249. csReactor *cs.Reactor,
  250. peerManager *p2p.PeerManager,
  251. router *p2p.Router,
  252. blockSync bool,
  253. metrics *cs.Metrics,
  254. ) (*p2p.ReactorShim, service.Service, error) {
  255. logger = logger.With("module", "blockchain")
  256. switch config.BlockSync.Version {
  257. case cfg.BlockSyncV0:
  258. reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
  259. var (
  260. channels map[p2p.ChannelID]*p2p.Channel
  261. peerUpdates *p2p.PeerUpdates
  262. )
  263. if config.P2P.UseLegacy {
  264. channels = getChannelsFromShim(reactorShim)
  265. peerUpdates = reactorShim.PeerUpdates
  266. } else {
  267. channels = makeChannelsFromShims(router, bcv0.ChannelShims)
  268. peerUpdates = peerManager.Subscribe()
  269. }
  270. reactor, err := bcv0.NewReactor(
  271. logger, state.Copy(), blockExec, blockStore, csReactor,
  272. channels[bcv0.BlockSyncChannel], peerUpdates, blockSync,
  273. metrics,
  274. )
  275. if err != nil {
  276. return nil, nil, err
  277. }
  278. return reactorShim, reactor, nil
  279. case cfg.BlockSyncV2:
  280. return nil, nil, errors.New("block sync version v2 is no longer supported. Please use v0")
  281. default:
  282. return nil, nil, fmt.Errorf("unknown block sync version %s", config.BlockSync.Version)
  283. }
  284. }
  285. func createConsensusReactor(
  286. config *cfg.Config,
  287. state sm.State,
  288. blockExec *sm.BlockExecutor,
  289. blockStore sm.BlockStore,
  290. mp mempool.Mempool,
  291. evidencePool *evidence.Pool,
  292. privValidator types.PrivValidator,
  293. csMetrics *cs.Metrics,
  294. waitSync bool,
  295. eventBus *types.EventBus,
  296. peerManager *p2p.PeerManager,
  297. router *p2p.Router,
  298. logger log.Logger,
  299. ) (*p2p.ReactorShim, *cs.Reactor, *cs.State) {
  300. consensusState := cs.NewState(
  301. config.Consensus,
  302. state.Copy(),
  303. blockExec,
  304. blockStore,
  305. mp,
  306. evidencePool,
  307. cs.StateMetrics(csMetrics),
  308. )
  309. consensusState.SetLogger(logger)
  310. if privValidator != nil && config.Mode == cfg.ModeValidator {
  311. consensusState.SetPrivValidator(privValidator)
  312. }
  313. reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims)
  314. var (
  315. channels map[p2p.ChannelID]*p2p.Channel
  316. peerUpdates *p2p.PeerUpdates
  317. )
  318. if config.P2P.UseLegacy {
  319. channels = getChannelsFromShim(reactorShim)
  320. peerUpdates = reactorShim.PeerUpdates
  321. } else {
  322. channels = makeChannelsFromShims(router, cs.ChannelShims)
  323. peerUpdates = peerManager.Subscribe()
  324. }
  325. reactor := cs.NewReactor(
  326. logger,
  327. consensusState,
  328. channels[cs.StateChannel],
  329. channels[cs.DataChannel],
  330. channels[cs.VoteChannel],
  331. channels[cs.VoteSetBitsChannel],
  332. peerUpdates,
  333. waitSync,
  334. cs.ReactorMetrics(csMetrics),
  335. )
  336. // Services which will be publishing and/or subscribing for messages (events)
  337. // consensusReactor will set it on consensusState and blockExecutor.
  338. reactor.SetEventBus(eventBus)
  339. return reactorShim, reactor, consensusState
  340. }
  341. func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport {
  342. return p2p.NewMConnTransport(
  343. logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{},
  344. p2p.MConnTransportOptions{
  345. MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers +
  346. len(tmstrings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")),
  347. ),
  348. },
  349. )
  350. }
  351. func createPeerManager(
  352. config *cfg.Config,
  353. dbProvider cfg.DBProvider,
  354. p2pLogger log.Logger,
  355. nodeID types.NodeID,
  356. ) (*p2p.PeerManager, error) {
  357. var maxConns uint16
  358. switch {
  359. case config.P2P.MaxConnections > 0:
  360. maxConns = config.P2P.MaxConnections
  361. case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0:
  362. x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers
  363. if x > math.MaxUint16 {
  364. return nil, fmt.Errorf(
  365. "max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)",
  366. config.P2P.MaxNumInboundPeers,
  367. config.P2P.MaxNumOutboundPeers,
  368. math.MaxUint16,
  369. )
  370. }
  371. maxConns = uint16(x)
  372. default:
  373. maxConns = 64
  374. }
  375. privatePeerIDs := make(map[types.NodeID]struct{})
  376. for _, id := range tmstrings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") {
  377. privatePeerIDs[types.NodeID(id)] = struct{}{}
  378. }
  379. options := p2p.PeerManagerOptions{
  380. MaxConnected: maxConns,
  381. MaxConnectedUpgrade: 4,
  382. MaxPeers: 1000,
  383. MinRetryTime: 100 * time.Millisecond,
  384. MaxRetryTime: 8 * time.Hour,
  385. MaxRetryTimePersistent: 5 * time.Minute,
  386. RetryTimeJitter: 3 * time.Second,
  387. PrivatePeers: privatePeerIDs,
  388. }
  389. peers := []p2p.NodeAddress{}
  390. for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") {
  391. address, err := p2p.ParseNodeAddress(p)
  392. if err != nil {
  393. return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
  394. }
  395. peers = append(peers, address)
  396. options.PersistentPeers = append(options.PersistentPeers, address.NodeID)
  397. }
  398. for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") {
  399. address, err := p2p.ParseNodeAddress(p)
  400. if err != nil {
  401. return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
  402. }
  403. peers = append(peers, address)
  404. }
  405. peerDB, err := dbProvider(&cfg.DBContext{ID: "peerstore", Config: config})
  406. if err != nil {
  407. return nil, err
  408. }
  409. peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options)
  410. if err != nil {
  411. return nil, fmt.Errorf("failed to create peer manager: %w", err)
  412. }
  413. for _, peer := range peers {
  414. if _, err := peerManager.Add(peer); err != nil {
  415. return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
  416. }
  417. }
  418. return peerManager, nil
  419. }
  420. func createRouter(
  421. p2pLogger log.Logger,
  422. p2pMetrics *p2p.Metrics,
  423. nodeInfo types.NodeInfo,
  424. privKey crypto.PrivKey,
  425. peerManager *p2p.PeerManager,
  426. transport p2p.Transport,
  427. options p2p.RouterOptions,
  428. ) (*p2p.Router, error) {
  429. return p2p.NewRouter(
  430. p2pLogger,
  431. p2pMetrics,
  432. nodeInfo,
  433. privKey,
  434. peerManager,
  435. []p2p.Transport{transport},
  436. options,
  437. )
  438. }
  439. func createSwitch(
  440. config *cfg.Config,
  441. transport p2p.Transport,
  442. p2pMetrics *p2p.Metrics,
  443. mempoolReactor *p2p.ReactorShim,
  444. bcReactor p2p.Reactor,
  445. stateSyncReactor *p2p.ReactorShim,
  446. consensusReactor *p2p.ReactorShim,
  447. evidenceReactor *p2p.ReactorShim,
  448. proxyApp proxy.AppConns,
  449. nodeInfo types.NodeInfo,
  450. nodeKey types.NodeKey,
  451. p2pLogger log.Logger,
  452. ) *p2p.Switch {
  453. var (
  454. connFilters = []p2p.ConnFilterFunc{}
  455. peerFilters = []p2p.PeerFilterFunc{}
  456. )
  457. if !config.P2P.AllowDuplicateIP {
  458. connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
  459. }
  460. // Filter peers by addr or pubkey with an ABCI query.
  461. // If the query return code is OK, add peer.
  462. if config.FilterPeers {
  463. connFilters = append(
  464. connFilters,
  465. // ABCI query for address filtering.
  466. func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
  467. res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
  468. Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
  469. })
  470. if err != nil {
  471. return err
  472. }
  473. if res.IsErr() {
  474. return fmt.Errorf("error querying abci app: %v", res)
  475. }
  476. return nil
  477. },
  478. )
  479. peerFilters = append(
  480. peerFilters,
  481. // ABCI query for ID filtering.
  482. func(_ p2p.IPeerSet, p p2p.Peer) error {
  483. res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
  484. Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
  485. })
  486. if err != nil {
  487. return err
  488. }
  489. if res.IsErr() {
  490. return fmt.Errorf("error querying abci app: %v", res)
  491. }
  492. return nil
  493. },
  494. )
  495. }
  496. sw := p2p.NewSwitch(
  497. config.P2P,
  498. transport,
  499. p2p.WithMetrics(p2pMetrics),
  500. p2p.SwitchPeerFilters(peerFilters...),
  501. p2p.SwitchConnFilters(connFilters...),
  502. )
  503. sw.SetLogger(p2pLogger)
  504. if config.Mode != cfg.ModeSeed {
  505. sw.AddReactor("MEMPOOL", mempoolReactor)
  506. sw.AddReactor("BLOCKCHAIN", bcReactor)
  507. sw.AddReactor("CONSENSUS", consensusReactor)
  508. sw.AddReactor("EVIDENCE", evidenceReactor)
  509. sw.AddReactor("STATESYNC", stateSyncReactor)
  510. }
  511. sw.SetNodeInfo(nodeInfo)
  512. sw.SetNodeKey(nodeKey)
  513. p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile())
  514. return sw
  515. }
  516. func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
  517. p2pLogger log.Logger, nodeKey types.NodeKey) (pex.AddrBook, error) {
  518. addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  519. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  520. // Add ourselves to addrbook to prevent dialing ourselves
  521. if config.P2P.ExternalAddress != "" {
  522. addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ExternalAddress))
  523. if err != nil {
  524. return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
  525. }
  526. addrBook.AddOurAddress(addr)
  527. }
  528. if config.P2P.ListenAddress != "" {
  529. addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ListenAddress))
  530. if err != nil {
  531. return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
  532. }
  533. addrBook.AddOurAddress(addr)
  534. }
  535. sw.SetAddrBook(addrBook)
  536. return addrBook, nil
  537. }
  538. func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
  539. sw *p2p.Switch, logger log.Logger) *pex.Reactor {
  540. reactorConfig := &pex.ReactorConfig{
  541. Seeds: tmstrings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "),
  542. SeedMode: config.Mode == cfg.ModeSeed,
  543. // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
  544. // blocks assuming 10s blocks ~ 28 hours.
  545. // TODO (melekes): make it dynamic based on the actual block latencies
  546. // from the live network.
  547. // https://github.com/tendermint/tendermint/issues/3523
  548. SeedDisconnectWaitPeriod: 28 * time.Hour,
  549. PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
  550. }
  551. // TODO persistent peers ? so we can have their DNS addrs saved
  552. pexReactor := pex.NewReactor(addrBook, reactorConfig)
  553. pexReactor.SetLogger(logger.With("module", "pex"))
  554. sw.AddReactor("PEX", pexReactor)
  555. return pexReactor
  556. }
  557. func createPEXReactorV2(
  558. config *cfg.Config,
  559. logger log.Logger,
  560. peerManager *p2p.PeerManager,
  561. router *p2p.Router,
  562. ) (service.Service, error) {
  563. channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128)
  564. if err != nil {
  565. return nil, err
  566. }
  567. peerUpdates := peerManager.Subscribe()
  568. return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil
  569. }
  570. func makeNodeInfo(
  571. config *cfg.Config,
  572. nodeKey types.NodeKey,
  573. eventSinks []indexer.EventSink,
  574. genDoc *types.GenesisDoc,
  575. state sm.State,
  576. ) (types.NodeInfo, error) {
  577. txIndexerStatus := "off"
  578. if indexer.IndexingEnabled(eventSinks) {
  579. txIndexerStatus = "on"
  580. }
  581. var bcChannel byte
  582. switch config.BlockSync.Version {
  583. case cfg.BlockSyncV0:
  584. bcChannel = byte(bcv0.BlockSyncChannel)
  585. case cfg.BlockSyncV2:
  586. bcChannel = bcv2.BlockchainChannel
  587. default:
  588. return types.NodeInfo{}, fmt.Errorf("unknown blocksync version %s", config.BlockSync.Version)
  589. }
  590. nodeInfo := types.NodeInfo{
  591. ProtocolVersion: types.ProtocolVersion{
  592. P2P: version.P2PProtocol, // global
  593. Block: state.Version.Consensus.Block,
  594. App: state.Version.Consensus.App,
  595. },
  596. NodeID: nodeKey.ID,
  597. Network: genDoc.ChainID,
  598. Version: version.TMVersion,
  599. Channels: []byte{
  600. bcChannel,
  601. byte(cs.StateChannel),
  602. byte(cs.DataChannel),
  603. byte(cs.VoteChannel),
  604. byte(cs.VoteSetBitsChannel),
  605. byte(mempool.MempoolChannel),
  606. byte(evidence.EvidenceChannel),
  607. byte(statesync.SnapshotChannel),
  608. byte(statesync.ChunkChannel),
  609. byte(statesync.LightBlockChannel),
  610. byte(statesync.ParamsChannel),
  611. },
  612. Moniker: config.Moniker,
  613. Other: types.NodeInfoOther{
  614. TxIndex: txIndexerStatus,
  615. RPCAddress: config.RPC.ListenAddress,
  616. },
  617. }
  618. if config.P2P.PexReactor {
  619. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  620. }
  621. lAddr := config.P2P.ExternalAddress
  622. if lAddr == "" {
  623. lAddr = config.P2P.ListenAddress
  624. }
  625. nodeInfo.ListenAddr = lAddr
  626. err := nodeInfo.Validate()
  627. return nodeInfo, err
  628. }
  629. func makeSeedNodeInfo(
  630. config *cfg.Config,
  631. nodeKey types.NodeKey,
  632. genDoc *types.GenesisDoc,
  633. state sm.State,
  634. ) (types.NodeInfo, error) {
  635. nodeInfo := types.NodeInfo{
  636. ProtocolVersion: types.ProtocolVersion{
  637. P2P: version.P2PProtocol, // global
  638. Block: state.Version.Consensus.Block,
  639. App: state.Version.Consensus.App,
  640. },
  641. NodeID: nodeKey.ID,
  642. Network: genDoc.ChainID,
  643. Version: version.TMVersion,
  644. Channels: []byte{},
  645. Moniker: config.Moniker,
  646. Other: types.NodeInfoOther{
  647. TxIndex: "off",
  648. RPCAddress: config.RPC.ListenAddress,
  649. },
  650. }
  651. if config.P2P.PexReactor {
  652. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  653. }
  654. lAddr := config.P2P.ExternalAddress
  655. if lAddr == "" {
  656. lAddr = config.P2P.ListenAddress
  657. }
  658. nodeInfo.ListenAddr = lAddr
  659. err := nodeInfo.Validate()
  660. return nodeInfo, err
  661. }