You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

863 lines
26 KiB

max-bytes PR follow-up (#2318) * ReapMaxTxs: return all txs if max is negative this mirrors ReapMaxBytes behavior See * increase MaxAminoOverheadForBlock tested with: ``` func TestMaxAminoOverheadForBlock(t *testing.T) { maxChainID := "" for i := 0; i < MaxChainIDLen; i++ { maxChainID += "𠜎" } h := Header{ ChainID: maxChainID, Height: 10, Time: time.Now().UTC(), NumTxs: 100, TotalTxs: 200, LastBlockID: makeBlockID(make([]byte, 20), 300, make([]byte, 20)), LastCommitHash: tmhash.Sum([]byte("last_commit_hash")), DataHash: tmhash.Sum([]byte("data_hash")), ValidatorsHash: tmhash.Sum([]byte("validators_hash")), NextValidatorsHash: tmhash.Sum([]byte("next_validators_hash")), ConsensusHash: tmhash.Sum([]byte("consensus_hash")), AppHash: tmhash.Sum([]byte("app_hash")), LastResultsHash: tmhash.Sum([]byte("last_results_hash")), EvidenceHash: tmhash.Sum([]byte("evidence_hash")), ProposerAddress: tmhash.Sum([]byte("proposer_address")), } b := Block{ Header: h, Data: Data{Txs: makeTxs(10000, 100)}, Evidence: EvidenceData{}, LastCommit: &Commit{}, } bz, err := cdc.MarshalBinary(b) require.NoError(t, err) assert.Equal(t, MaxHeaderBytes+MaxAminoOverheadForBlock-2, len(bz)-1000000-20000-1) } ``` * fix MaxYYY constants calculation by using math.MaxInt64 See * pass mempool filter as an option See * fixes after Dev's comments
6 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. _ "net/http/pprof"
  10. "strings"
  11. "time"
  12. ""
  13. ""
  14. ""
  15. abci ""
  16. bc ""
  17. cfg ""
  18. cs ""
  19. ""
  20. ""
  21. cmn ""
  22. dbm ""
  23. ""
  24. mempl ""
  25. ""
  26. ""
  27. ""
  28. ""
  29. rpccore ""
  30. ctypes ""
  31. grpccore ""
  32. ""
  33. sm ""
  34. ""
  35. ""
  36. ""
  37. ""
  38. tmtime ""
  39. ""
  40. )
  41. //------------------------------------------------------------------------------
  42. // DBContext specifies config information for loading a new DB.
  43. type DBContext struct {
  44. ID string
  45. Config *cfg.Config
  46. }
  47. // DBProvider takes a DBContext and returns an instantiated DB.
  48. type DBProvider func(*DBContext) (dbm.DB, error)
  49. // DefaultDBProvider returns a database using the DBBackend and DBDir
  50. // specified in the ctx.Config.
  51. func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
  52. dbType := dbm.DBBackendType(ctx.Config.DBBackend)
  53. return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()), nil
  54. }
  55. // GenesisDocProvider returns a GenesisDoc.
  56. // It allows the GenesisDoc to be pulled from sources other than the
  57. // filesystem, for instance from a distributed key-value store cluster.
  58. type GenesisDocProvider func() (*types.GenesisDoc, error)
  59. // DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
  60. // the GenesisDoc from the config.GenesisFile() on the filesystem.
  61. func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
  62. return func() (*types.GenesisDoc, error) {
  63. return types.GenesisDocFromFile(config.GenesisFile())
  64. }
  65. }
  66. // NodeProvider takes a config and a logger and returns a ready to go Node.
  67. type NodeProvider func(*cfg.Config, log.Logger) (*Node, error)
  68. // DefaultNewNode returns a Tendermint node with default settings for the
  69. // PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
  70. // It implements NodeProvider.
  71. func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
  72. // Generate node PrivKey
  73. nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
  74. if err != nil {
  75. return nil, err
  76. }
  77. return NewNode(config,
  78. privval.LoadOrGenFilePV(config.PrivValidatorFile()),
  79. nodeKey,
  80. proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
  81. DefaultGenesisDocProviderFunc(config),
  82. DefaultDBProvider,
  83. DefaultMetricsProvider(config.Instrumentation),
  84. logger,
  85. )
  86. }
  87. // MetricsProvider returns a consensus, p2p and mempool Metrics.
  88. type MetricsProvider func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
  89. // DefaultMetricsProvider returns Metrics build using Prometheus client library
  90. // if Prometheus is enabled. Otherwise, it returns no-op Metrics.
  91. func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
  92. return func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
  93. if config.Prometheus {
  94. return cs.PrometheusMetrics(config.Namespace), p2p.PrometheusMetrics(config.Namespace),
  95. mempl.PrometheusMetrics(config.Namespace), sm.PrometheusMetrics(config.Namespace)
  96. }
  97. return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
  98. }
  99. }
  100. //------------------------------------------------------------------------------
  101. // Node is the highest level interface to a full Tendermint node.
  102. // It includes all configuration information and running services.
  103. type Node struct {
  104. cmn.BaseService
  105. // config
  106. config *cfg.Config
  107. genesisDoc *types.GenesisDoc // initial validator set
  108. privValidator types.PrivValidator // local node's validator key
  109. // network
  110. transport *p2p.MultiplexTransport
  111. sw *p2p.Switch // p2p connections
  112. addrBook pex.AddrBook // known peers
  113. nodeInfo p2p.NodeInfo
  114. nodeKey *p2p.NodeKey // our node privkey
  115. isListening bool
  116. // services
  117. eventBus *types.EventBus // pub/sub for services
  118. stateDB dbm.DB
  119. blockStore *bc.BlockStore // store the blockchain to disk
  120. bcReactor *bc.BlockchainReactor // for fast-syncing
  121. mempoolReactor *mempl.MempoolReactor // for gossipping transactions
  122. consensusState *cs.ConsensusState // latest consensus state
  123. consensusReactor *cs.ConsensusReactor // for participating in the consensus
  124. evidencePool *evidence.EvidencePool // tracking evidence
  125. proxyApp proxy.AppConns // connection to the application
  126. rpcListeners []net.Listener // rpc servers
  127. txIndexer txindex.TxIndexer
  128. indexerService *txindex.IndexerService
  129. prometheusSrv *http.Server
  130. }
  131. // NewNode returns a new, ready to go, Tendermint Node.
  132. func NewNode(config *cfg.Config,
  133. privValidator types.PrivValidator,
  134. nodeKey *p2p.NodeKey,
  135. clientCreator proxy.ClientCreator,
  136. genesisDocProvider GenesisDocProvider,
  137. dbProvider DBProvider,
  138. metricsProvider MetricsProvider,
  139. logger log.Logger) (*Node, error) {
  140. // Get BlockStore
  141. blockStoreDB, err := dbProvider(&DBContext{"blockstore", config})
  142. if err != nil {
  143. return nil, err
  144. }
  145. blockStore := bc.NewBlockStore(blockStoreDB)
  146. // Get State
  147. stateDB, err := dbProvider(&DBContext{"state", config})
  148. if err != nil {
  149. return nil, err
  150. }
  151. // Get genesis doc
  152. // TODO: move to state package?
  153. genDoc, err := loadGenesisDoc(stateDB)
  154. if err != nil {
  155. genDoc, err = genesisDocProvider()
  156. if err != nil {
  157. return nil, err
  158. }
  159. // save genesis doc to prevent a certain class of user errors (e.g. when it
  160. // was changed, accidentally or not). Also good for audit trail.
  161. saveGenesisDoc(stateDB, genDoc)
  162. }
  163. state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
  164. if err != nil {
  165. return nil, err
  166. }
  167. // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
  168. proxyApp := proxy.NewAppConns(clientCreator)
  169. proxyApp.SetLogger(logger.With("module", "proxy"))
  170. if err := proxyApp.Start(); err != nil {
  171. return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
  172. }
  173. // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
  174. // and replays any blocks as necessary to sync tendermint with the app.
  175. consensusLogger := logger.With("module", "consensus")
  176. handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
  177. handshaker.SetLogger(consensusLogger)
  178. if err := handshaker.Handshake(proxyApp); err != nil {
  179. return nil, fmt.Errorf("Error during handshake: %v", err)
  180. }
  181. // Reload the state. It will have the Version.Consensus.App set by the
  182. // Handshake, and may have other modifications as well (ie. depending on
  183. // what happened during block replay).
  184. state = sm.LoadState(stateDB)
  185. // Ensure the state's block version matches that of the software.
  186. if state.Version.Consensus.Block != version.BlockProtocol {
  187. return nil, fmt.Errorf(
  188. "Block version of the software does not match that of the state.\n"+
  189. "Got version.BlockProtocol=%v, state.Version.Consensus.Block=%v",
  190. version.BlockProtocol,
  191. state.Version.Consensus.Block,
  192. )
  193. }
  194. // If an address is provided, listen on the socket for a
  195. // connection from an external signing process.
  196. if config.PrivValidatorListenAddr != "" {
  197. var (
  198. // TODO: persist this key so external signer
  199. // can actually authenticate us
  200. privKey = ed25519.GenPrivKey()
  201. pvsc = privval.NewTCPVal(
  202. logger.With("module", "privval"),
  203. config.PrivValidatorListenAddr,
  204. privKey,
  205. )
  206. )
  207. if err := pvsc.Start(); err != nil {
  208. return nil, fmt.Errorf("Error starting private validator client: %v", err)
  209. }
  210. privValidator = pvsc
  211. }
  212. // Decide whether to fast-sync or not
  213. // We don't fast-sync when the only validator is us.
  214. fastSync := config.FastSync
  215. if state.Validators.Size() == 1 {
  216. addr, _ := state.Validators.GetByIndex(0)
  217. if bytes.Equal(privValidator.GetAddress(), addr) {
  218. fastSync = false
  219. }
  220. }
  221. // Log whether this node is a validator or an observer
  222. if state.Validators.HasAddress(privValidator.GetAddress()) {
  223. consensusLogger.Info("This node is a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
  224. } else {
  225. consensusLogger.Info("This node is not a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
  226. }
  227. csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider()
  228. // Make MempoolReactor
  229. mempool := mempl.NewMempool(
  230. config.Mempool,
  231. proxyApp.Mempool(),
  232. state.LastBlockHeight,
  233. mempl.WithMetrics(memplMetrics),
  234. mempl.WithPreCheck(
  235. mempl.PreCheckAminoMaxBytes(
  236. types.MaxDataBytesUnknownEvidence(
  237. state.ConsensusParams.BlockSize.MaxBytes,
  238. state.Validators.Size(),
  239. ),
  240. ),
  241. ),
  242. mempl.WithPostCheck(
  243. mempl.PostCheckMaxGas(state.ConsensusParams.BlockSize.MaxGas),
  244. ),
  245. )
  246. mempoolLogger := logger.With("module", "mempool")
  247. mempool.SetLogger(mempoolLogger)
  248. mempool.InitWAL() // no need to have the mempool wal during tests
  249. mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
  250. mempoolReactor.SetLogger(mempoolLogger)
  251. if config.Consensus.WaitForTxs() {
  252. mempool.EnableTxsAvailable()
  253. }
  254. // Make Evidence Reactor
  255. evidenceDB, err := dbProvider(&DBContext{"evidence", config})
  256. if err != nil {
  257. return nil, err
  258. }
  259. evidenceLogger := logger.With("module", "evidence")
  260. evidenceStore := evidence.NewEvidenceStore(evidenceDB)
  261. evidencePool := evidence.NewEvidencePool(stateDB, evidenceStore)
  262. evidencePool.SetLogger(evidenceLogger)
  263. evidenceReactor := evidence.NewEvidenceReactor(evidencePool)
  264. evidenceReactor.SetLogger(evidenceLogger)
  265. blockExecLogger := logger.With("module", "state")
  266. // make block executor for consensus and blockchain reactors to execute blocks
  267. blockExec := sm.NewBlockExecutor(
  268. stateDB,
  269. blockExecLogger,
  270. proxyApp.Consensus(),
  271. mempool,
  272. evidencePool,
  273. sm.BlockExecutorWithMetrics(smMetrics),
  274. )
  275. // Make BlockchainReactor
  276. bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
  277. bcReactor.SetLogger(logger.With("module", "blockchain"))
  278. // Make ConsensusReactor
  279. consensusState := cs.NewConsensusState(
  280. config.Consensus,
  281. state.Copy(),
  282. blockExec,
  283. blockStore,
  284. mempool,
  285. evidencePool,
  286. cs.StateMetrics(csMetrics),
  287. )
  288. consensusState.SetLogger(consensusLogger)
  289. if privValidator != nil {
  290. consensusState.SetPrivValidator(privValidator)
  291. }
  292. consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics))
  293. consensusReactor.SetLogger(consensusLogger)
  294. eventBus := types.NewEventBus()
  295. eventBus.SetLogger(logger.With("module", "events"))
  296. // services which will be publishing and/or subscribing for messages (events)
  297. // consensusReactor will set it on consensusState and blockExecutor
  298. consensusReactor.SetEventBus(eventBus)
  299. // Transaction indexing
  300. var txIndexer txindex.TxIndexer
  301. switch config.TxIndex.Indexer {
  302. case "kv":
  303. store, err := dbProvider(&DBContext{"tx_index", config})
  304. if err != nil {
  305. return nil, err
  306. }
  307. if config.TxIndex.IndexTags != "" {
  308. txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
  309. } else if config.TxIndex.IndexAllTags {
  310. txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
  311. } else {
  312. txIndexer = kv.NewTxIndex(store)
  313. }
  314. default:
  315. txIndexer = &null.TxIndex{}
  316. }
  317. indexerService := txindex.NewIndexerService(txIndexer, eventBus)
  318. indexerService.SetLogger(logger.With("module", "txindex"))
  319. var (
  320. p2pLogger = logger.With("module", "p2p")
  321. nodeInfo = makeNodeInfo(
  322. config,
  323. nodeKey.ID(),
  324. txIndexer,
  325. genDoc.ChainID,
  326. p2p.ProtocolVersionWithApp(state.Version.Consensus.App),
  327. )
  328. )
  329. // Setup Transport.
  330. var (
  331. transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey)
  332. connFilters = []p2p.ConnFilterFunc{}
  333. peerFilters = []p2p.PeerFilterFunc{}
  334. )
  335. if !config.P2P.AllowDuplicateIP {
  336. connFilters = append(connFilters, p2p.ConnDuplicateIPFilter())
  337. }
  338. // Filter peers by addr or pubkey with an ABCI query.
  339. // If the query return code is OK, add peer.
  340. if config.FilterPeers {
  341. connFilters = append(
  342. connFilters,
  343. // ABCI query for address filtering.
  344. func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
  345. res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
  346. Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
  347. })
  348. if err != nil {
  349. return err
  350. }
  351. if res.IsErr() {
  352. return fmt.Errorf("Error querying abci app: %v", res)
  353. }
  354. return nil
  355. },
  356. )
  357. peerFilters = append(
  358. peerFilters,
  359. // ABCI query for ID filtering.
  360. func(_ p2p.IPeerSet, p p2p.Peer) error {
  361. res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
  362. Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
  363. })
  364. if err != nil {
  365. return err
  366. }
  367. if res.IsErr() {
  368. return fmt.Errorf("Error querying abci app: %v", res)
  369. }
  370. return nil
  371. },
  372. )
  373. }
  374. p2p.MultiplexTransportConnFilters(connFilters...)(transport)
  375. // Setup Switch.
  376. sw := p2p.NewSwitch(
  377. config.P2P,
  378. transport,
  379. p2p.WithMetrics(p2pMetrics),
  380. p2p.SwitchPeerFilters(peerFilters...),
  381. )
  382. sw.SetLogger(p2pLogger)
  383. sw.AddReactor("MEMPOOL", mempoolReactor)
  384. sw.AddReactor("BLOCKCHAIN", bcReactor)
  385. sw.AddReactor("CONSENSUS", consensusReactor)
  386. sw.AddReactor("EVIDENCE", evidenceReactor)
  387. sw.SetNodeInfo(nodeInfo)
  388. sw.SetNodeKey(nodeKey)
  389. p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
  390. // Optionally, start the pex reactor
  391. //
  392. // TODO:
  393. //
  394. // We need to set Seeds and PersistentPeers on the switch,
  395. // since it needs to be able to use these (and their DNS names)
  396. // even if the PEX is off. We can include the DNS name in the NetAddress,
  397. // but it would still be nice to have a clear list of the current "PersistentPeers"
  398. // somewhere that we can return with net_info.
  399. //
  400. // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
  401. // Note we currently use the addrBook regardless at least for AddOurAddress
  402. addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  403. // Add ourselves to addrbook to prevent dialing ourselves
  404. addrBook.AddOurAddress(nodeInfo.NetAddress())
  405. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  406. if config.P2P.PexReactor {
  407. // TODO persistent peers ? so we can have their DNS addrs saved
  408. pexReactor := pex.NewPEXReactor(addrBook,
  409. &pex.PEXReactorConfig{
  410. Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
  411. SeedMode: config.P2P.SeedMode,
  412. })
  413. pexReactor.SetLogger(p2pLogger)
  414. sw.AddReactor("PEX", pexReactor)
  415. }
  416. sw.SetAddrBook(addrBook)
  417. // run the profile server
  418. profileHost := config.ProfListenAddress
  419. if profileHost != "" {
  420. go func() {
  421. logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil))
  422. }()
  423. }
  424. node := &Node{
  425. config: config,
  426. genesisDoc: genDoc,
  427. privValidator: privValidator,
  428. transport: transport,
  429. sw: sw,
  430. addrBook: addrBook,
  431. nodeInfo: nodeInfo,
  432. nodeKey: nodeKey,
  433. stateDB: stateDB,
  434. blockStore: blockStore,
  435. bcReactor: bcReactor,
  436. mempoolReactor: mempoolReactor,
  437. consensusState: consensusState,
  438. consensusReactor: consensusReactor,
  439. evidencePool: evidencePool,
  440. proxyApp: proxyApp,
  441. txIndexer: txIndexer,
  442. indexerService: indexerService,
  443. eventBus: eventBus,
  444. }
  445. node.BaseService = *cmn.NewBaseService(logger, "Node", node)
  446. return node, nil
  447. }
  448. // OnStart starts the Node. It implements cmn.Service.
  449. func (n *Node) OnStart() error {
  450. now := tmtime.Now()
  451. genTime := n.genesisDoc.GenesisTime
  452. if genTime.After(now) {
  453. n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
  454. time.Sleep(genTime.Sub(now))
  455. }
  456. err := n.eventBus.Start()
  457. if err != nil {
  458. return err
  459. }
  460. // Add private IDs to addrbook to block those peers being added
  461. n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
  462. // Start the RPC server before the P2P server
  463. // so we can eg. receive txs for the first block
  464. if n.config.RPC.ListenAddress != "" {
  465. listeners, err := n.startRPC()
  466. if err != nil {
  467. return err
  468. }
  469. n.rpcListeners = listeners
  470. }
  471. if n.config.Instrumentation.Prometheus &&
  472. n.config.Instrumentation.PrometheusListenAddr != "" {
  473. n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
  474. }
  475. // Start the transport.
  476. addr, err := p2p.NewNetAddressStringWithOptionalID(n.config.P2P.ListenAddress)
  477. if err != nil {
  478. return err
  479. }
  480. if err := n.transport.Listen(*addr); err != nil {
  481. return err
  482. }
  483. n.isListening = true
  484. // Start the switch (the P2P server).
  485. err = n.sw.Start()
  486. if err != nil {
  487. return err
  488. }
  489. // Always connect to persistent peers
  490. if n.config.P2P.PersistentPeers != "" {
  491. err = n.sw.DialPeersAsync(n.addrBook, splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "), true)
  492. if err != nil {
  493. return err
  494. }
  495. }
  496. // start tx indexer
  497. return n.indexerService.Start()
  498. }
  499. // OnStop stops the Node. It implements cmn.Service.
  500. func (n *Node) OnStop() {
  501. n.BaseService.OnStop()
  502. n.Logger.Info("Stopping Node")
  503. // first stop the non-reactor services
  504. n.eventBus.Stop()
  505. n.indexerService.Stop()
  506. // now stop the reactors
  507. // TODO: gracefully disconnect from peers.
  508. n.sw.Stop()
  509. if err := n.transport.Close(); err != nil {
  510. n.Logger.Error("Error closing transport", "err", err)
  511. }
  512. n.isListening = false
  513. // finally stop the listeners / external services
  514. for _, l := range n.rpcListeners {
  515. n.Logger.Info("Closing rpc listener", "listener", l)
  516. if err := l.Close(); err != nil {
  517. n.Logger.Error("Error closing listener", "listener", l, "err", err)
  518. }
  519. }
  520. if pvsc, ok := n.privValidator.(*privval.TCPVal); ok {
  521. if err := pvsc.Stop(); err != nil {
  522. n.Logger.Error("Error stopping priv validator socket client", "err", err)
  523. }
  524. }
  525. if n.prometheusSrv != nil {
  526. if err := n.prometheusSrv.Shutdown(context.Background()); err != nil {
  527. // Error from closing listeners, or context timeout:
  528. n.Logger.Error("Prometheus HTTP server Shutdown", "err", err)
  529. }
  530. }
  531. }
  532. // ConfigureRPC sets all variables in rpccore so they will serve
  533. // rpc calls from this node
  534. func (n *Node) ConfigureRPC() {
  535. rpccore.SetStateDB(n.stateDB)
  536. rpccore.SetBlockStore(n.blockStore)
  537. rpccore.SetConsensusState(n.consensusState)
  538. rpccore.SetMempool(n.mempoolReactor.Mempool)
  539. rpccore.SetEvidencePool(n.evidencePool)
  540. rpccore.SetP2PPeers(n.sw)
  541. rpccore.SetP2PTransport(n)
  542. rpccore.SetPubKey(n.privValidator.GetPubKey())
  543. rpccore.SetGenesisDoc(n.genesisDoc)
  544. rpccore.SetAddrBook(n.addrBook)
  545. rpccore.SetProxyAppQuery(n.proxyApp.Query())
  546. rpccore.SetTxIndexer(n.txIndexer)
  547. rpccore.SetConsensusReactor(n.consensusReactor)
  548. rpccore.SetEventBus(n.eventBus)
  549. rpccore.SetLogger(n.Logger.With("module", "rpc"))
  550. }
  551. func (n *Node) startRPC() ([]net.Listener, error) {
  552. n.ConfigureRPC()
  553. listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ")
  554. coreCodec := amino.NewCodec()
  555. ctypes.RegisterAmino(coreCodec)
  556. if n.config.RPC.Unsafe {
  557. rpccore.AddUnsafeRoutes()
  558. }
  559. // we may expose the rpc over both a unix and tcp socket
  560. listeners := make([]net.Listener, len(listenAddrs))
  561. for i, listenAddr := range listenAddrs {
  562. mux := http.NewServeMux()
  563. rpcLogger := n.Logger.With("module", "rpc-server")
  564. wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.EventSubscriber(n.eventBus))
  565. wm.SetLogger(rpcLogger.With("protocol", "websocket"))
  566. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  567. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)
  568. listener, err := rpcserver.StartHTTPServer(
  569. listenAddr,
  570. mux,
  571. rpcLogger,
  572. rpcserver.Config{MaxOpenConnections: n.config.RPC.MaxOpenConnections},
  573. )
  574. if err != nil {
  575. return nil, err
  576. }
  577. listeners[i] = listener
  578. }
  579. // we expose a simplified api over grpc for convenience to app devs
  580. grpcListenAddr := n.config.RPC.GRPCListenAddress
  581. if grpcListenAddr != "" {
  582. listener, err := grpccore.StartGRPCServer(
  583. grpcListenAddr,
  584. grpccore.Config{
  585. MaxOpenConnections: n.config.RPC.GRPCMaxOpenConnections,
  586. },
  587. )
  588. if err != nil {
  589. return nil, err
  590. }
  591. listeners = append(listeners, listener)
  592. }
  593. return listeners, nil
  594. }
  595. // startPrometheusServer starts a Prometheus HTTP server, listening for metrics
  596. // collectors on addr.
  597. func (n *Node) startPrometheusServer(addr string) *http.Server {
  598. srv := &http.Server{
  599. Addr: addr,
  600. Handler: promhttp.InstrumentMetricHandler(
  601. prometheus.DefaultRegisterer, promhttp.HandlerFor(
  602. prometheus.DefaultGatherer,
  603. promhttp.HandlerOpts{MaxRequestsInFlight: n.config.Instrumentation.MaxOpenConnections},
  604. ),
  605. ),
  606. }
  607. go func() {
  608. if err := srv.ListenAndServe(); err != http.ErrServerClosed {
  609. // Error starting or closing listener:
  610. n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
  611. }
  612. }()
  613. return srv
  614. }
  615. // Switch returns the Node's Switch.
  616. func (n *Node) Switch() *p2p.Switch {
  617. return n.sw
  618. }
  619. // BlockStore returns the Node's BlockStore.
  620. func (n *Node) BlockStore() *bc.BlockStore {
  621. return n.blockStore
  622. }
  623. // ConsensusState returns the Node's ConsensusState.
  624. func (n *Node) ConsensusState() *cs.ConsensusState {
  625. return n.consensusState
  626. }
  627. // ConsensusReactor returns the Node's ConsensusReactor.
  628. func (n *Node) ConsensusReactor() *cs.ConsensusReactor {
  629. return n.consensusReactor
  630. }
  631. // MempoolReactor returns the Node's MempoolReactor.
  632. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  633. return n.mempoolReactor
  634. }
  635. // EvidencePool returns the Node's EvidencePool.
  636. func (n *Node) EvidencePool() *evidence.EvidencePool {
  637. return n.evidencePool
  638. }
  639. // EventBus returns the Node's EventBus.
  640. func (n *Node) EventBus() *types.EventBus {
  641. return n.eventBus
  642. }
  643. // PrivValidator returns the Node's PrivValidator.
  644. // XXX: for convenience only!
  645. func (n *Node) PrivValidator() types.PrivValidator {
  646. return n.privValidator
  647. }
  648. // GenesisDoc returns the Node's GenesisDoc.
  649. func (n *Node) GenesisDoc() *types.GenesisDoc {
  650. return n.genesisDoc
  651. }
  652. // ProxyApp returns the Node's AppConns, representing its connections to the ABCI application.
  653. func (n *Node) ProxyApp() proxy.AppConns {
  654. return n.proxyApp
  655. }
  656. //------------------------------------------------------------------------------
  657. func (n *Node) Listeners() []string {
  658. return []string{
  659. fmt.Sprintf("Listener(@%v)", n.config.P2P.ExternalAddress),
  660. }
  661. }
  662. func (n *Node) IsListening() bool {
  663. return n.isListening
  664. }
  665. // NodeInfo returns the Node's Info from the Switch.
  666. func (n *Node) NodeInfo() p2p.NodeInfo {
  667. return n.nodeInfo
  668. }
  669. func makeNodeInfo(
  670. config *cfg.Config,
  671. nodeID p2p.ID,
  672. txIndexer txindex.TxIndexer,
  673. chainID string,
  674. protocolVersion p2p.ProtocolVersion,
  675. ) p2p.NodeInfo {
  676. txIndexerStatus := "on"
  677. if _, ok := txIndexer.(*null.TxIndex); ok {
  678. txIndexerStatus = "off"
  679. }
  680. nodeInfo := p2p.DefaultNodeInfo{
  681. ProtocolVersion: protocolVersion,
  682. ID_: nodeID,
  683. Network: chainID,
  684. Version: version.TMCoreSemVer,
  685. Channels: []byte{
  686. bc.BlockchainChannel,
  687. cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
  688. mempl.MempoolChannel,
  689. evidence.EvidenceChannel,
  690. },
  691. Moniker: config.Moniker,
  692. Other: p2p.DefaultNodeInfoOther{
  693. TxIndex: txIndexerStatus,
  694. RPCAddress: config.RPC.ListenAddress,
  695. },
  696. }
  697. if config.P2P.PexReactor {
  698. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  699. }
  700. lAddr := config.P2P.ExternalAddress
  701. if lAddr == "" {
  702. lAddr = config.P2P.ListenAddress
  703. }
  704. nodeInfo.ListenAddr = lAddr
  705. return nodeInfo
  706. }
  707. //------------------------------------------------------------------------------
  708. var (
  709. genesisDocKey = []byte("genesisDoc")
  710. )
  711. // panics if failed to unmarshal bytes
  712. func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) {
  713. bytes := db.Get(genesisDocKey)
  714. if len(bytes) == 0 {
  715. return nil, errors.New("Genesis doc not found")
  716. }
  717. var genDoc *types.GenesisDoc
  718. err := cdc.UnmarshalJSON(bytes, &genDoc)
  719. if err != nil {
  720. cmn.PanicCrisis(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, bytes))
  721. }
  722. return genDoc, nil
  723. }
  724. // panics if failed to marshal the given genesis document
  725. func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
  726. bytes, err := cdc.MarshalJSON(genDoc)
  727. if err != nil {
  728. cmn.PanicCrisis(fmt.Sprintf("Failed to save genesis doc due to marshaling error: %v", err))
  729. }
  730. db.SetSync(genesisDocKey, bytes)
  731. }
  732. // splitAndTrimEmpty slices s into all subslices separated by sep and returns a
  733. // slice of the string s with all leading and trailing Unicode code points
  734. // contained in cutset removed. If sep is empty, SplitAndTrim splits after each
  735. // UTF-8 sequence. First part is equivalent to strings.SplitN with a count of
  736. // -1. also filter out empty strings, only return non-empty strings.
  737. func splitAndTrimEmpty(s, sep, cutset string) []string {
  738. if s == "" {
  739. return []string{}
  740. }
  741. spl := strings.Split(s, sep)
  742. nonEmptyStrings := make([]string, 0, len(spl))
  743. for i := 0; i < len(spl); i++ {
  744. element := strings.Trim(spl[i], cutset)
  745. if element != "" {
  746. nonEmptyStrings = append(nonEmptyStrings, element)
  747. }
  748. }
  749. return nonEmptyStrings
  750. }