You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1551 lines
47 KiB

p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/prometheus/client_golang/prometheus"
  14. "github.com/prometheus/client_golang/prometheus/promhttp"
  15. "github.com/rs/cors"
  16. dbm "github.com/tendermint/tm-db"
  17. abci "github.com/tendermint/tendermint/abci/types"
  18. bcv0 "github.com/tendermint/tendermint/blockchain/v0"
  19. bcv2 "github.com/tendermint/tendermint/blockchain/v2"
  20. cfg "github.com/tendermint/tendermint/config"
  21. "github.com/tendermint/tendermint/consensus"
  22. "github.com/tendermint/tendermint/crypto"
  23. "github.com/tendermint/tendermint/evidence"
  24. tmjson "github.com/tendermint/tendermint/libs/json"
  25. "github.com/tendermint/tendermint/libs/log"
  26. tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
  27. "github.com/tendermint/tendermint/libs/service"
  28. "github.com/tendermint/tendermint/light"
  29. mempl "github.com/tendermint/tendermint/mempool"
  30. "github.com/tendermint/tendermint/p2p"
  31. "github.com/tendermint/tendermint/p2p/pex"
  32. "github.com/tendermint/tendermint/privval"
  33. "github.com/tendermint/tendermint/proxy"
  34. rpccore "github.com/tendermint/tendermint/rpc/core"
  35. grpccore "github.com/tendermint/tendermint/rpc/grpc"
  36. rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server"
  37. sm "github.com/tendermint/tendermint/state"
  38. "github.com/tendermint/tendermint/state/txindex"
  39. "github.com/tendermint/tendermint/state/txindex/kv"
  40. "github.com/tendermint/tendermint/state/txindex/null"
  41. "github.com/tendermint/tendermint/statesync"
  42. "github.com/tendermint/tendermint/store"
  43. cs "github.com/tendermint/tendermint/test/maverick/consensus"
  44. "github.com/tendermint/tendermint/types"
  45. tmtime "github.com/tendermint/tendermint/types/time"
  46. "github.com/tendermint/tendermint/version"
  47. )
  48. //------------------------------------------------------------------------------
  49. // ParseMisbehaviors is a util function that converts a comma separated string into
  50. // a map of misbehaviors to be executed by the maverick node
  51. func ParseMisbehaviors(str string) (map[int64]cs.Misbehavior, error) {
  52. // check if string is empty in which case we run a normal node
  53. var misbehaviors = make(map[int64]cs.Misbehavior)
  54. if str == "" {
  55. return misbehaviors, nil
  56. }
  57. strs := strings.Split(str, ",")
  58. if len(strs)%2 != 0 {
  59. return misbehaviors, errors.New("missing either height or misbehavior name in the misbehavior flag")
  60. }
  61. OUTER_LOOP:
  62. for i := 0; i < len(strs); i += 2 {
  63. height, err := strconv.ParseInt(strs[i+1], 10, 64)
  64. if err != nil {
  65. return misbehaviors, fmt.Errorf("failed to parse misbehavior height: %w", err)
  66. }
  67. for key, misbehavior := range cs.MisbehaviorList {
  68. if key == strs[i] {
  69. misbehaviors[height] = misbehavior
  70. continue OUTER_LOOP
  71. }
  72. }
  73. return misbehaviors, fmt.Errorf("received unknown misbehavior: %s. Did you forget to add it?", strs[i])
  74. }
  75. return misbehaviors, nil
  76. }
  77. // DBContext specifies config information for loading a new DB.
  78. type DBContext struct {
  79. ID string
  80. Config *cfg.Config
  81. }
  82. // DBProvider takes a DBContext and returns an instantiated DB.
  83. type DBProvider func(*DBContext) (dbm.DB, error)
  84. // DefaultDBProvider returns a database using the DBBackend and DBDir
  85. // specified in the ctx.Config.
  86. func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
  87. dbType := dbm.BackendType(ctx.Config.DBBackend)
  88. return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir())
  89. }
  90. // GenesisDocProvider returns a GenesisDoc.
  91. // It allows the GenesisDoc to be pulled from sources other than the
  92. // filesystem, for instance from a distributed key-value store cluster.
  93. type GenesisDocProvider func() (*types.GenesisDoc, error)
  94. // DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
  95. // the GenesisDoc from the config.GenesisFile() on the filesystem.
  96. func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
  97. return func() (*types.GenesisDoc, error) {
  98. return types.GenesisDocFromFile(config.GenesisFile())
  99. }
  100. }
  101. // Provider takes a config and a logger and returns a ready to go Node.
  102. type Provider func(*cfg.Config, log.Logger) (*Node, error)
  103. // DefaultNewNode returns a Tendermint node with default settings for the
  104. // PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
  105. // It implements NodeProvider.
  106. func DefaultNewNode(config *cfg.Config, logger log.Logger, misbehaviors map[int64]cs.Misbehavior) (*Node, error) {
  107. nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
  108. if err != nil {
  109. return nil, fmt.Errorf("failed to load or gen node key %s, err: %w", config.NodeKeyFile(), err)
  110. }
  111. return NewNode(config,
  112. LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()),
  113. nodeKey,
  114. proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
  115. DefaultGenesisDocProviderFunc(config),
  116. DefaultDBProvider,
  117. DefaultMetricsProvider(config.Instrumentation),
  118. logger,
  119. misbehaviors,
  120. )
  121. }
  122. // MetricsProvider returns a consensus, p2p and mempool Metrics.
  123. type MetricsProvider func(chainID string) (*consensus.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
  124. // DefaultMetricsProvider returns Metrics build using Prometheus client library
  125. // if Prometheus is enabled. Otherwise, it returns no-op Metrics.
  126. func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
  127. return func(chainID string) (*consensus.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
  128. if config.Prometheus {
  129. return consensus.PrometheusMetrics(config.Namespace, "chain_id", chainID),
  130. p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
  131. mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
  132. sm.PrometheusMetrics(config.Namespace, "chain_id", chainID)
  133. }
  134. return consensus.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
  135. }
  136. }
  137. // Option sets a parameter for the node.
  138. type Option func(*Node)
  139. // Temporary interface for switching to fast sync, we should get rid of v0.
  140. // See: https://github.com/tendermint/tendermint/issues/4595
  141. type fastSyncReactor interface {
  142. SwitchToFastSync(sm.State) error
  143. }
  144. // CustomReactors allows you to add custom reactors (name -> p2p.Reactor) to
  145. // the node's Switch.
  146. //
  147. // WARNING: using any name from the below list of the existing reactors will
  148. // result in replacing it with the custom one.
  149. //
  150. // - MEMPOOL
  151. // - BLOCKCHAIN
  152. // - CONSENSUS
  153. // - EVIDENCE
  154. // - PEX
  155. // - STATESYNC
  156. func CustomReactors(reactors map[string]p2p.Reactor) Option {
  157. return func(n *Node) {
  158. for name, reactor := range reactors {
  159. if existingReactor := n.sw.Reactor(name); existingReactor != nil {
  160. n.sw.Logger.Info("Replacing existing reactor with a custom one",
  161. "name", name, "existing", existingReactor, "custom", reactor)
  162. n.sw.RemoveReactor(name, existingReactor)
  163. }
  164. n.sw.AddReactor(name, reactor)
  165. }
  166. }
  167. }
  168. func CustomReactorsAsConstructors(reactors map[string]func(n *Node) p2p.Reactor) Option {
  169. return func(n *Node) {
  170. for name, customReactor := range reactors {
  171. if existingReactor := n.sw.Reactor(name); existingReactor != nil {
  172. n.sw.Logger.Info("Replacing existing reactor with a custom one",
  173. "name", name)
  174. n.sw.RemoveReactor(name, existingReactor)
  175. }
  176. n.sw.AddReactor(name, customReactor(n))
  177. }
  178. }
  179. }
  180. // StateProvider overrides the state provider used by state sync to retrieve trusted app hashes and
  181. // build a State object for bootstrapping the node.
  182. // WARNING: this interface is considered unstable and subject to change.
  183. func StateProvider(stateProvider statesync.StateProvider) Option {
  184. return func(n *Node) {
  185. n.stateSyncProvider = stateProvider
  186. }
  187. }
  188. //------------------------------------------------------------------------------
  189. // Node is the highest level interface to a full Tendermint node.
  190. // It includes all configuration information and running services.
  191. type Node struct {
  192. service.BaseService
  193. // config
  194. config *cfg.Config
  195. genesisDoc *types.GenesisDoc // initial validator set
  196. privValidator types.PrivValidator // local node's validator key
  197. // network
  198. transport *p2p.MConnTransport
  199. sw *p2p.Switch // p2p connections
  200. addrBook pex.AddrBook // known peers
  201. nodeInfo p2p.NodeInfo
  202. nodeKey p2p.NodeKey // our node privkey
  203. isListening bool
  204. // services
  205. eventBus *types.EventBus // pub/sub for services
  206. stateStore sm.Store
  207. blockStore *store.BlockStore // store the blockchain to disk
  208. bcReactor service.Service // for fast-syncing
  209. mempoolReactor *mempl.Reactor // for gossipping transactions
  210. mempool mempl.Mempool
  211. stateSync bool // whether the node should state sync on startup
  212. stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
  213. stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node
  214. stateSyncGenesis sm.State // provides the genesis state for state sync
  215. consensusState *cs.State // latest consensus state
  216. consensusReactor *cs.Reactor // for participating in the consensus
  217. pexReactor *pex.Reactor // for exchanging peer addresses
  218. evidenceReactor *evidence.Reactor
  219. evidencePool *evidence.Pool // tracking evidence
  220. proxyApp proxy.AppConns // connection to the application
  221. rpcListeners []net.Listener // rpc servers
  222. txIndexer txindex.TxIndexer
  223. indexerService *txindex.IndexerService
  224. prometheusSrv *http.Server
  225. }
  226. func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
  227. var blockStoreDB dbm.DB
  228. blockStoreDB, err = dbProvider(&DBContext{"blockstore", config})
  229. if err != nil {
  230. return
  231. }
  232. blockStore = store.NewBlockStore(blockStoreDB)
  233. stateDB, err = dbProvider(&DBContext{"state", config})
  234. if err != nil {
  235. return
  236. }
  237. return
  238. }
  239. func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) {
  240. proxyApp := proxy.NewAppConns(clientCreator)
  241. proxyApp.SetLogger(logger.With("module", "proxy"))
  242. if err := proxyApp.Start(); err != nil {
  243. return nil, fmt.Errorf("error starting proxy app connections: %v", err)
  244. }
  245. return proxyApp, nil
  246. }
  247. func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
  248. eventBus := types.NewEventBus()
  249. eventBus.SetLogger(logger.With("module", "events"))
  250. if err := eventBus.Start(); err != nil {
  251. return nil, err
  252. }
  253. return eventBus, nil
  254. }
  255. func createAndStartIndexerService(config *cfg.Config, dbProvider DBProvider,
  256. eventBus *types.EventBus, logger log.Logger) (*txindex.IndexerService, txindex.TxIndexer, error) {
  257. var txIndexer txindex.TxIndexer
  258. switch config.TxIndex.Indexer {
  259. case "kv":
  260. store, err := dbProvider(&DBContext{"tx_index", config})
  261. if err != nil {
  262. return nil, nil, err
  263. }
  264. txIndexer = kv.NewTxIndex(store)
  265. default:
  266. txIndexer = &null.TxIndex{}
  267. }
  268. indexerService := txindex.NewIndexerService(txIndexer, eventBus)
  269. indexerService.SetLogger(logger.With("module", "txindex"))
  270. if err := indexerService.Start(); err != nil {
  271. return nil, nil, err
  272. }
  273. return indexerService, txIndexer, nil
  274. }
  275. func doHandshake(
  276. stateStore sm.Store,
  277. state sm.State,
  278. blockStore sm.BlockStore,
  279. genDoc *types.GenesisDoc,
  280. eventBus types.BlockEventPublisher,
  281. proxyApp proxy.AppConns,
  282. consensusLogger log.Logger) error {
  283. handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
  284. handshaker.SetLogger(consensusLogger)
  285. handshaker.SetEventBus(eventBus)
  286. if err := handshaker.Handshake(proxyApp); err != nil {
  287. return fmt.Errorf("error during handshake: %v", err)
  288. }
  289. return nil
  290. }
  291. func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) {
  292. // Log the version info.
  293. logger.Info("Version info",
  294. "software", version.TMCoreSemVer,
  295. "block", version.BlockProtocol,
  296. "p2p", version.P2PProtocol,
  297. )
  298. // If the state and software differ in block version, at least log it.
  299. if state.Version.Consensus.Block != version.BlockProtocol {
  300. logger.Info("Software and state have different block protocols",
  301. "software", version.BlockProtocol,
  302. "state", state.Version.Consensus.Block,
  303. )
  304. }
  305. addr := pubKey.Address()
  306. // Log whether this node is a validator or an observer
  307. if state.Validators.HasAddress(addr) {
  308. consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
  309. } else {
  310. consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
  311. }
  312. }
  313. func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
  314. if state.Validators.Size() > 1 {
  315. return false
  316. }
  317. addr, _ := state.Validators.GetByIndex(0)
  318. return bytes.Equal(pubKey.Address(), addr)
  319. }
  320. func createMempoolReactor(
  321. config *cfg.Config,
  322. proxyApp proxy.AppConns,
  323. state sm.State,
  324. memplMetrics *mempl.Metrics,
  325. peerMgr *p2p.PeerManager,
  326. logger log.Logger,
  327. ) (*p2p.ReactorShim, *mempl.Reactor, *mempl.CListMempool) {
  328. logger = logger.With("module", "mempool")
  329. mempool := mempl.NewCListMempool(
  330. config.Mempool,
  331. proxyApp.Mempool(),
  332. state.LastBlockHeight,
  333. mempl.WithMetrics(memplMetrics),
  334. mempl.WithPreCheck(sm.TxPreCheck(state)),
  335. mempl.WithPostCheck(sm.TxPostCheck(state)),
  336. )
  337. mempool.SetLogger(logger)
  338. reactorShim := p2p.NewReactorShim(logger, "MempoolShim", mempl.GetChannelShims(config.Mempool))
  339. reactor := mempl.NewReactor(
  340. logger,
  341. config.Mempool,
  342. peerMgr,
  343. mempool,
  344. reactorShim.GetChannel(mempl.MempoolChannel),
  345. reactorShim.PeerUpdates,
  346. )
  347. if config.Consensus.WaitForTxs() {
  348. mempool.EnableTxsAvailable()
  349. }
  350. return reactorShim, reactor, mempool
  351. }
  352. func createEvidenceReactor(
  353. config *cfg.Config,
  354. dbProvider DBProvider,
  355. stateDB dbm.DB,
  356. blockStore *store.BlockStore,
  357. logger log.Logger,
  358. ) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
  359. evidenceDB, err := dbProvider(&DBContext{"evidence", config})
  360. if err != nil {
  361. return nil, nil, nil, err
  362. }
  363. logger = logger.With("module", "evidence")
  364. evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
  365. if err != nil {
  366. return nil, nil, nil, err
  367. }
  368. evidenceReactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
  369. evidenceReactor := evidence.NewReactor(
  370. logger,
  371. evidenceReactorShim.GetChannel(evidence.EvidenceChannel),
  372. evidenceReactorShim.PeerUpdates,
  373. evidencePool,
  374. )
  375. return evidenceReactorShim, evidenceReactor, evidencePool, nil
  376. }
  377. func createBlockchainReactor(
  378. logger log.Logger,
  379. config *cfg.Config,
  380. state sm.State,
  381. blockExec *sm.BlockExecutor,
  382. blockStore *store.BlockStore,
  383. csReactor *cs.Reactor,
  384. fastSync bool,
  385. ) (*p2p.ReactorShim, service.Service, error) {
  386. logger = logger.With("module", "blockchain")
  387. switch config.FastSync.Version {
  388. case "v0":
  389. reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
  390. reactor, err := bcv0.NewReactor(
  391. logger, state.Copy(), blockExec, blockStore, csReactor,
  392. reactorShim.GetChannel(bcv0.BlockchainChannel), reactorShim.PeerUpdates, fastSync,
  393. )
  394. if err != nil {
  395. return nil, nil, err
  396. }
  397. return reactorShim, reactor, nil
  398. case "v2":
  399. reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
  400. reactor.SetLogger(logger)
  401. return nil, reactor, nil
  402. default:
  403. return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
  404. }
  405. }
  406. func createConsensusReactor(config *cfg.Config,
  407. state sm.State,
  408. blockExec *sm.BlockExecutor,
  409. blockStore sm.BlockStore,
  410. mempool *mempl.CListMempool,
  411. evidencePool *evidence.Pool,
  412. privValidator types.PrivValidator,
  413. csMetrics *consensus.Metrics,
  414. waitSync bool,
  415. eventBus *types.EventBus,
  416. consensusLogger log.Logger,
  417. misbehaviors map[int64]cs.Misbehavior) (*cs.Reactor, *cs.State) {
  418. consensusState := cs.NewState(
  419. config.Consensus,
  420. state.Copy(),
  421. blockExec,
  422. blockStore,
  423. mempool,
  424. evidencePool,
  425. misbehaviors,
  426. cs.StateMetrics(csMetrics),
  427. )
  428. consensusState.SetLogger(consensusLogger)
  429. if privValidator != nil {
  430. consensusState.SetPrivValidator(privValidator)
  431. }
  432. consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics))
  433. consensusReactor.SetLogger(consensusLogger)
  434. // services which will be publishing and/or subscribing for messages (events)
  435. // consensusReactor will set it on consensusState and blockExecutor
  436. consensusReactor.SetEventBus(eventBus)
  437. return consensusReactor, consensusState
  438. }
  439. func createTransport(
  440. logger log.Logger,
  441. config *cfg.Config,
  442. ) *p2p.MConnTransport {
  443. return p2p.NewMConnTransport(
  444. logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{},
  445. p2p.MConnTransportOptions{
  446. MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers +
  447. len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")),
  448. ),
  449. },
  450. )
  451. }
  452. func createSwitch(config *cfg.Config,
  453. transport p2p.Transport,
  454. p2pMetrics *p2p.Metrics,
  455. mempoolReactor *p2p.ReactorShim,
  456. bcReactor p2p.Reactor,
  457. stateSyncReactor *p2p.ReactorShim,
  458. consensusReactor *cs.Reactor,
  459. evidenceReactor *p2p.ReactorShim,
  460. proxyApp proxy.AppConns,
  461. nodeInfo p2p.NodeInfo,
  462. nodeKey p2p.NodeKey,
  463. p2pLogger log.Logger) *p2p.Switch {
  464. var (
  465. connFilters = []p2p.ConnFilterFunc{}
  466. peerFilters = []p2p.PeerFilterFunc{}
  467. )
  468. if !config.P2P.AllowDuplicateIP {
  469. connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
  470. }
  471. // Filter peers by addr or pubkey with an ABCI query.
  472. // If the query return code is OK, add peer.
  473. if config.FilterPeers {
  474. connFilters = append(
  475. connFilters,
  476. // ABCI query for address filtering.
  477. func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
  478. res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
  479. Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
  480. })
  481. if err != nil {
  482. return err
  483. }
  484. if res.IsErr() {
  485. return fmt.Errorf("error querying abci app: %v", res)
  486. }
  487. return nil
  488. },
  489. )
  490. peerFilters = append(
  491. peerFilters,
  492. // ABCI query for ID filtering.
  493. func(_ p2p.IPeerSet, p p2p.Peer) error {
  494. res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
  495. Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
  496. })
  497. if err != nil {
  498. return err
  499. }
  500. if res.IsErr() {
  501. return fmt.Errorf("error querying abci app: %v", res)
  502. }
  503. return nil
  504. },
  505. )
  506. }
  507. sw := p2p.NewSwitch(
  508. config.P2P,
  509. transport,
  510. p2p.WithMetrics(p2pMetrics),
  511. p2p.SwitchPeerFilters(peerFilters...),
  512. p2p.SwitchConnFilters(connFilters...),
  513. )
  514. sw.SetLogger(p2pLogger)
  515. sw.AddReactor("MEMPOOL", mempoolReactor)
  516. sw.AddReactor("BLOCKCHAIN", bcReactor)
  517. sw.AddReactor("CONSENSUS", consensusReactor)
  518. sw.AddReactor("EVIDENCE", evidenceReactor)
  519. sw.AddReactor("STATESYNC", stateSyncReactor)
  520. sw.SetNodeInfo(nodeInfo)
  521. sw.SetNodeKey(nodeKey)
  522. p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile())
  523. return sw
  524. }
  525. func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
  526. p2pLogger log.Logger, nodeKey p2p.NodeKey) (pex.AddrBook, error) {
  527. addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  528. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  529. // Add ourselves to addrbook to prevent dialing ourselves
  530. if config.P2P.ExternalAddress != "" {
  531. addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ExternalAddress))
  532. if err != nil {
  533. return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
  534. }
  535. addrBook.AddOurAddress(addr)
  536. }
  537. if config.P2P.ListenAddress != "" {
  538. addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID, config.P2P.ListenAddress))
  539. if err != nil {
  540. return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
  541. }
  542. addrBook.AddOurAddress(addr)
  543. }
  544. sw.SetAddrBook(addrBook)
  545. return addrBook, nil
  546. }
  547. func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
  548. sw *p2p.Switch, logger log.Logger) *pex.Reactor {
  549. // TODO persistent peers ? so we can have their DNS addrs saved
  550. pexReactor := pex.NewReactor(addrBook,
  551. &pex.ReactorConfig{
  552. Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
  553. SeedMode: config.P2P.SeedMode,
  554. // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
  555. // blocks assuming 10s blocks ~ 28 hours.
  556. // TODO (melekes): make it dynamic based on the actual block latencies
  557. // from the live network.
  558. // https://github.com/tendermint/tendermint/issues/3523
  559. SeedDisconnectWaitPeriod: 28 * time.Hour,
  560. PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
  561. })
  562. pexReactor.SetLogger(logger.With("module", "pex"))
  563. sw.AddReactor("PEX", pexReactor)
  564. return pexReactor
  565. }
  566. // startStateSync starts an asynchronous state sync process, then switches to fast sync mode.
  567. func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reactor,
  568. stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, fastSync bool,
  569. stateStore sm.Store, blockStore *store.BlockStore, state sm.State) error {
  570. ssR.Logger.Info("Starting state sync")
  571. if stateProvider == nil {
  572. var err error
  573. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  574. defer cancel()
  575. stateProvider, err = statesync.NewLightClientStateProvider(
  576. ctx,
  577. state.ChainID, state.Version, state.InitialHeight,
  578. config.RPCServers, light.TrustOptions{
  579. Period: config.TrustPeriod,
  580. Height: config.TrustHeight,
  581. Hash: config.TrustHashBytes(),
  582. }, ssR.Logger.With("module", "light"))
  583. if err != nil {
  584. return fmt.Errorf("failed to set up light client state provider: %w", err)
  585. }
  586. }
  587. go func() {
  588. state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime)
  589. if err != nil {
  590. ssR.Logger.Error("State sync failed", "err", err)
  591. return
  592. }
  593. err = stateStore.Bootstrap(state)
  594. if err != nil {
  595. ssR.Logger.Error("Failed to bootstrap node with new state", "err", err)
  596. return
  597. }
  598. err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit)
  599. if err != nil {
  600. ssR.Logger.Error("Failed to store last seen commit", "err", err)
  601. return
  602. }
  603. if fastSync {
  604. // FIXME Very ugly to have these metrics bleed through here.
  605. conR.Metrics.StateSyncing.Set(0)
  606. conR.Metrics.FastSyncing.Set(1)
  607. err = bcR.SwitchToFastSync(state)
  608. if err != nil {
  609. ssR.Logger.Error("Failed to switch to fast sync", "err", err)
  610. return
  611. }
  612. } else {
  613. conR.SwitchToConsensus(state, true)
  614. }
  615. }()
  616. return nil
  617. }
  618. // NewNode returns a new, ready to go, Tendermint Node.
  619. func NewNode(config *cfg.Config,
  620. privValidator types.PrivValidator,
  621. nodeKey p2p.NodeKey,
  622. clientCreator proxy.ClientCreator,
  623. genesisDocProvider GenesisDocProvider,
  624. dbProvider DBProvider,
  625. metricsProvider MetricsProvider,
  626. logger log.Logger,
  627. misbehaviors map[int64]cs.Misbehavior,
  628. options ...Option) (*Node, error) {
  629. blockStore, stateDB, err := initDBs(config, dbProvider)
  630. if err != nil {
  631. return nil, err
  632. }
  633. stateStore := sm.NewStore(stateDB)
  634. state, genDoc, err := LoadStateFromDBOrGenesisDocProvider(stateDB, genesisDocProvider)
  635. if err != nil {
  636. return nil, err
  637. }
  638. // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
  639. proxyApp, err := createAndStartProxyAppConns(clientCreator, logger)
  640. if err != nil {
  641. return nil, err
  642. }
  643. // EventBus and IndexerService must be started before the handshake because
  644. // we might need to index the txs of the replayed block as this might not have happened
  645. // when the node stopped last time (i.e. the node stopped after it saved the block
  646. // but before it indexed the txs, or, endblocker panicked)
  647. eventBus, err := createAndStartEventBus(logger)
  648. if err != nil {
  649. return nil, err
  650. }
  651. // Transaction indexing
  652. indexerService, txIndexer, err := createAndStartIndexerService(config, dbProvider, eventBus, logger)
  653. if err != nil {
  654. return nil, err
  655. }
  656. // If an address is provided, listen on the socket for a connection from an
  657. // external signing process.
  658. if config.PrivValidatorListenAddr != "" {
  659. // FIXME: we should start services inside OnStart
  660. privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, genDoc.ChainID, logger)
  661. if err != nil {
  662. return nil, fmt.Errorf("error with private validator socket client: %w", err)
  663. }
  664. }
  665. pubKey, err := privValidator.GetPubKey()
  666. if err != nil {
  667. return nil, fmt.Errorf("can't get pubkey: %w", err)
  668. }
  669. // Determine whether we should do state and/or fast sync.
  670. // We don't fast-sync when the only validator is us.
  671. fastSync := config.FastSyncMode && !onlyValidatorIsUs(state, pubKey)
  672. stateSync := config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
  673. if stateSync && state.LastBlockHeight > 0 {
  674. logger.Info("Found local state with non-zero height, skipping state sync")
  675. stateSync = false
  676. }
  677. // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
  678. // and replays any blocks as necessary to sync tendermint with the app.
  679. consensusLogger := logger.With("module", "consensus")
  680. if !stateSync {
  681. if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
  682. return nil, err
  683. }
  684. // Reload the state. It will have the Version.Consensus.App set by the
  685. // Handshake, and may have other modifications as well (ie. depending on
  686. // what happened during block replay).
  687. state, err = stateStore.Load()
  688. if err != nil {
  689. return nil, fmt.Errorf("cannot load state: %w", err)
  690. }
  691. }
  692. logNodeStartupInfo(state, pubKey, logger, consensusLogger)
  693. // TODO: Fetch and provide real options and do proper p2p bootstrapping.
  694. // TODO: Use a persistent peer database.
  695. peerMgr, err := p2p.NewPeerManager(nodeKey.ID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  696. if err != nil {
  697. return nil, err
  698. }
  699. csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
  700. mpReactorShim, mpReactor, mempool := createMempoolReactor(config, proxyApp, state, memplMetrics, peerMgr, logger)
  701. evReactorShim, evReactor, evPool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger)
  702. if err != nil {
  703. return nil, err
  704. }
  705. // make block executor for consensus and blockchain reactors to execute blocks
  706. blockExec := sm.NewBlockExecutor(
  707. stateStore,
  708. logger.With("module", "state"),
  709. proxyApp.Consensus(),
  710. mempool,
  711. evPool,
  712. sm.BlockExecutorWithMetrics(smMetrics),
  713. )
  714. logger.Info("Setting up maverick consensus reactor", "Misbehaviors", misbehaviors)
  715. csReactor, csState := createConsensusReactor(
  716. config, state, blockExec, blockStore, mempool, evPool,
  717. privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, misbehaviors,
  718. )
  719. // Create the blockchain reactor. Note, we do not start fast sync if we're
  720. // doing a state sync first.
  721. bcReactorShim, bcReactor, err := createBlockchainReactor(
  722. logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync,
  723. )
  724. if err != nil {
  725. return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
  726. }
  727. // TODO: Remove this once the switch is removed.
  728. var bcReactorForSwitch p2p.Reactor
  729. if bcReactorShim != nil {
  730. bcReactorForSwitch = bcReactorShim
  731. } else {
  732. bcReactorForSwitch = bcReactor.(p2p.Reactor)
  733. }
  734. // Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first.
  735. // FIXME We need to update metrics here, since other reactors don't have access to them.
  736. if stateSync {
  737. csMetrics.StateSyncing.Set(1)
  738. } else if fastSync {
  739. csMetrics.FastSyncing.Set(1)
  740. }
  741. // Set up state sync reactor, and schedule a sync if requested.
  742. // FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy,
  743. // we should clean this whole thing up. See:
  744. // https://github.com/tendermint/tendermint/issues/4644
  745. stateSyncReactorShim := p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
  746. stateSyncReactor := statesync.NewReactor(
  747. stateSyncReactorShim.Logger,
  748. proxyApp.Snapshot(),
  749. proxyApp.Query(),
  750. stateSyncReactorShim.GetChannel(statesync.SnapshotChannel),
  751. stateSyncReactorShim.GetChannel(statesync.ChunkChannel),
  752. stateSyncReactorShim.PeerUpdates,
  753. config.StateSync.TempDir,
  754. )
  755. nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state)
  756. if err != nil {
  757. return nil, err
  758. }
  759. // Setup Transport and Switch.
  760. p2pLogger := logger.With("module", "p2p")
  761. transport := createTransport(p2pLogger, config)
  762. sw := createSwitch(
  763. config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
  764. stateSyncReactorShim, csReactor, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
  765. )
  766. err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
  767. if err != nil {
  768. return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
  769. }
  770. err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
  771. if err != nil {
  772. return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
  773. }
  774. addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
  775. if err != nil {
  776. return nil, fmt.Errorf("could not create addrbook: %w", err)
  777. }
  778. // Optionally, start the pex reactor
  779. //
  780. // TODO:
  781. //
  782. // We need to set Seeds and PersistentPeers on the switch,
  783. // since it needs to be able to use these (and their DNS names)
  784. // even if the PEX is off. We can include the DNS name in the NetAddress,
  785. // but it would still be nice to have a clear list of the current "PersistentPeers"
  786. // somewhere that we can return with net_info.
  787. //
  788. // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
  789. // Note we currently use the addrBook regardless at least for AddOurAddress
  790. var pexReactor *pex.Reactor
  791. if config.P2P.PexReactor {
  792. pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
  793. }
  794. if config.RPC.PprofListenAddress != "" {
  795. go func() {
  796. logger.Info("Starting pprof server", "laddr", config.RPC.PprofListenAddress)
  797. logger.Error("pprof server error", "err", http.ListenAndServe(config.RPC.PprofListenAddress, nil))
  798. }()
  799. }
  800. node := &Node{
  801. config: config,
  802. genesisDoc: genDoc,
  803. privValidator: privValidator,
  804. transport: transport,
  805. sw: sw,
  806. addrBook: addrBook,
  807. nodeInfo: nodeInfo,
  808. nodeKey: nodeKey,
  809. stateStore: stateStore,
  810. blockStore: blockStore,
  811. bcReactor: bcReactor,
  812. mempoolReactor: mpReactor,
  813. mempool: mempool,
  814. consensusState: csState,
  815. consensusReactor: csReactor,
  816. stateSyncReactor: stateSyncReactor,
  817. stateSync: stateSync,
  818. stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
  819. pexReactor: pexReactor,
  820. evidenceReactor: evReactor,
  821. evidencePool: evPool,
  822. proxyApp: proxyApp,
  823. txIndexer: txIndexer,
  824. indexerService: indexerService,
  825. eventBus: eventBus,
  826. }
  827. node.BaseService = *service.NewBaseService(logger, "Node", node)
  828. for _, option := range options {
  829. option(node)
  830. }
  831. return node, nil
  832. }
  833. // OnStart starts the Node. It implements service.Service.
  834. func (n *Node) OnStart() error {
  835. now := tmtime.Now()
  836. genTime := n.genesisDoc.GenesisTime
  837. if genTime.After(now) {
  838. n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
  839. time.Sleep(genTime.Sub(now))
  840. }
  841. // Add private IDs to addrbook to block those peers being added
  842. n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
  843. // Start the RPC server before the P2P server
  844. // so we can eg. receive txs for the first block
  845. if n.config.RPC.ListenAddress != "" {
  846. listeners, err := n.startRPC()
  847. if err != nil {
  848. return err
  849. }
  850. n.rpcListeners = listeners
  851. }
  852. if n.config.Instrumentation.Prometheus &&
  853. n.config.Instrumentation.PrometheusListenAddr != "" {
  854. n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
  855. }
  856. // Start the mempool.
  857. if n.config.Mempool.WalEnabled() {
  858. err := n.mempool.InitWAL()
  859. if err != nil {
  860. return fmt.Errorf("init mempool WAL: %w", err)
  861. }
  862. }
  863. // Start the transport.
  864. addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress))
  865. if err != nil {
  866. return err
  867. }
  868. if err := n.transport.Listen(addr.Endpoint()); err != nil {
  869. return err
  870. }
  871. n.isListening = true
  872. // Start the switch (the P2P server).
  873. err = n.sw.Start()
  874. if err != nil {
  875. return err
  876. }
  877. if n.config.FastSync.Version == "v0" {
  878. // Start the real blockchain reactor separately since the switch uses the shim.
  879. if err := n.bcReactor.Start(); err != nil {
  880. return err
  881. }
  882. }
  883. // Start the real state sync reactor separately since the switch uses the shim.
  884. if err := n.stateSyncReactor.Start(); err != nil {
  885. return err
  886. }
  887. // Start the real mempool reactor separately since the switch uses the shim.
  888. if err := n.mempoolReactor.Start(); err != nil {
  889. return err
  890. }
  891. // Start the real evidence reactor separately since the switch uses the shim.
  892. if err := n.evidenceReactor.Start(); err != nil {
  893. return err
  894. }
  895. // Always connect to persistent peers
  896. err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
  897. if err != nil {
  898. return fmt.Errorf("could not dial peers from persistent-peers field: %w", err)
  899. }
  900. // Run state sync
  901. if n.stateSync {
  902. bcR, ok := n.bcReactor.(fastSyncReactor)
  903. if !ok {
  904. return fmt.Errorf("this blockchain reactor does not support switching from state sync")
  905. }
  906. err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider,
  907. n.config.StateSync, n.config.FastSyncMode, n.stateStore, n.blockStore, n.stateSyncGenesis)
  908. if err != nil {
  909. return fmt.Errorf("failed to start state sync: %w", err)
  910. }
  911. }
  912. return nil
  913. }
  914. // OnStop stops the Node. It implements service.Service.
  915. func (n *Node) OnStop() {
  916. n.BaseService.OnStop()
  917. n.Logger.Info("Stopping Node")
  918. // first stop the non-reactor services
  919. if err := n.eventBus.Stop(); err != nil {
  920. n.Logger.Error("Error closing eventBus", "err", err)
  921. }
  922. if err := n.indexerService.Stop(); err != nil {
  923. n.Logger.Error("Error closing indexerService", "err", err)
  924. }
  925. // now stop the reactors
  926. if err := n.sw.Stop(); err != nil {
  927. n.Logger.Error("Error closing switch", "err", err)
  928. }
  929. if n.config.FastSync.Version == "v0" {
  930. // Stop the real blockchain reactor separately since the switch uses the shim.
  931. if err := n.bcReactor.Stop(); err != nil {
  932. n.Logger.Error("failed to stop the blockchain reactor", "err", err)
  933. }
  934. }
  935. // Stop the real state sync reactor separately since the switch uses the shim.
  936. if err := n.stateSyncReactor.Stop(); err != nil {
  937. n.Logger.Error("failed to stop the state sync reactor", "err", err)
  938. }
  939. // Stop the real mempool reactor separately since the switch uses the shim.
  940. if err := n.mempoolReactor.Stop(); err != nil {
  941. n.Logger.Error("failed to stop the mempool reactor", "err", err)
  942. }
  943. // Stop the real evidence reactor separately since the switch uses the shim.
  944. if err := n.evidenceReactor.Stop(); err != nil {
  945. n.Logger.Error("failed to stop the evidence reactor", "err", err)
  946. }
  947. // stop mempool WAL
  948. if n.config.Mempool.WalEnabled() {
  949. n.mempool.CloseWAL()
  950. }
  951. if err := n.transport.Close(); err != nil {
  952. n.Logger.Error("Error closing transport", "err", err)
  953. }
  954. n.isListening = false
  955. // finally stop the listeners / external services
  956. for _, l := range n.rpcListeners {
  957. n.Logger.Info("Closing rpc listener", "listener", l)
  958. if err := l.Close(); err != nil {
  959. n.Logger.Error("Error closing listener", "listener", l, "err", err)
  960. }
  961. }
  962. if pvsc, ok := n.privValidator.(service.Service); ok {
  963. if err := pvsc.Stop(); err != nil {
  964. n.Logger.Error("Error closing private validator", "err", err)
  965. }
  966. }
  967. if n.prometheusSrv != nil {
  968. if err := n.prometheusSrv.Shutdown(context.Background()); err != nil {
  969. // Error from closing listeners, or context timeout:
  970. n.Logger.Error("Prometheus HTTP server Shutdown", "err", err)
  971. }
  972. }
  973. }
  974. // ConfigureRPC makes sure RPC has all the objects it needs to operate.
  975. func (n *Node) ConfigureRPC() error {
  976. pubKey, err := n.privValidator.GetPubKey()
  977. if err != nil {
  978. return fmt.Errorf("can't get pubkey: %w", err)
  979. }
  980. rpccore.SetEnvironment(&rpccore.Environment{
  981. ProxyAppQuery: n.proxyApp.Query(),
  982. ProxyAppMempool: n.proxyApp.Mempool(),
  983. StateStore: n.stateStore,
  984. BlockStore: n.blockStore,
  985. EvidencePool: n.evidencePool,
  986. ConsensusState: n.consensusState,
  987. P2PPeers: n.sw,
  988. P2PTransport: n,
  989. PubKey: pubKey,
  990. GenDoc: n.genesisDoc,
  991. TxIndexer: n.txIndexer,
  992. ConsensusReactor: &consensus.Reactor{},
  993. EventBus: n.eventBus,
  994. Mempool: n.mempool,
  995. Logger: n.Logger.With("module", "rpc"),
  996. Config: *n.config.RPC,
  997. })
  998. return nil
  999. }
  1000. func (n *Node) startRPC() ([]net.Listener, error) {
  1001. err := n.ConfigureRPC()
  1002. if err != nil {
  1003. return nil, err
  1004. }
  1005. listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ")
  1006. if n.config.RPC.Unsafe {
  1007. rpccore.AddUnsafeRoutes()
  1008. }
  1009. config := rpcserver.DefaultConfig()
  1010. config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
  1011. config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
  1012. config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
  1013. // If necessary adjust global WriteTimeout to ensure it's greater than
  1014. // TimeoutBroadcastTxCommit.
  1015. // See https://github.com/tendermint/tendermint/issues/3435
  1016. if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
  1017. config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
  1018. }
  1019. // we may expose the rpc over both a unix and tcp socket
  1020. listeners := make([]net.Listener, len(listenAddrs))
  1021. for i, listenAddr := range listenAddrs {
  1022. mux := http.NewServeMux()
  1023. rpcLogger := n.Logger.With("module", "rpc-server")
  1024. wmLogger := rpcLogger.With("protocol", "websocket")
  1025. wm := rpcserver.NewWebsocketManager(rpccore.Routes,
  1026. rpcserver.OnDisconnect(func(remoteAddr string) {
  1027. err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
  1028. if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
  1029. wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
  1030. }
  1031. }),
  1032. rpcserver.ReadLimit(config.MaxBodyBytes),
  1033. )
  1034. wm.SetLogger(wmLogger)
  1035. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  1036. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
  1037. listener, err := rpcserver.Listen(
  1038. listenAddr,
  1039. config,
  1040. )
  1041. if err != nil {
  1042. return nil, err
  1043. }
  1044. var rootHandler http.Handler = mux
  1045. if n.config.RPC.IsCorsEnabled() {
  1046. corsMiddleware := cors.New(cors.Options{
  1047. AllowedOrigins: n.config.RPC.CORSAllowedOrigins,
  1048. AllowedMethods: n.config.RPC.CORSAllowedMethods,
  1049. AllowedHeaders: n.config.RPC.CORSAllowedHeaders,
  1050. })
  1051. rootHandler = corsMiddleware.Handler(mux)
  1052. }
  1053. if n.config.RPC.IsTLSEnabled() {
  1054. go func() {
  1055. if err := rpcserver.ServeTLS(
  1056. listener,
  1057. rootHandler,
  1058. n.config.RPC.CertFile(),
  1059. n.config.RPC.KeyFile(),
  1060. rpcLogger,
  1061. config,
  1062. ); err != nil {
  1063. n.Logger.Error("Error serving server with TLS", "err", err)
  1064. }
  1065. }()
  1066. } else {
  1067. go func() {
  1068. if err := rpcserver.Serve(
  1069. listener,
  1070. rootHandler,
  1071. rpcLogger,
  1072. config,
  1073. ); err != nil {
  1074. n.Logger.Error("Error serving server", "err", err)
  1075. }
  1076. }()
  1077. }
  1078. listeners[i] = listener
  1079. }
  1080. // we expose a simplified api over grpc for convenience to app devs
  1081. grpcListenAddr := n.config.RPC.GRPCListenAddress
  1082. if grpcListenAddr != "" {
  1083. config := rpcserver.DefaultConfig()
  1084. config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
  1085. config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
  1086. // NOTE: GRPCMaxOpenConnections is used, not MaxOpenConnections
  1087. config.MaxOpenConnections = n.config.RPC.GRPCMaxOpenConnections
  1088. // If necessary adjust global WriteTimeout to ensure it's greater than
  1089. // TimeoutBroadcastTxCommit.
  1090. // See https://github.com/tendermint/tendermint/issues/3435
  1091. if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
  1092. config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
  1093. }
  1094. listener, err := rpcserver.Listen(grpcListenAddr, config)
  1095. if err != nil {
  1096. return nil, err
  1097. }
  1098. go func() {
  1099. if err := grpccore.StartGRPCServer(listener); err != nil {
  1100. n.Logger.Error("Error starting gRPC server", "err", err)
  1101. }
  1102. }()
  1103. listeners = append(listeners, listener)
  1104. }
  1105. return listeners, nil
  1106. }
  1107. // startPrometheusServer starts a Prometheus HTTP server, listening for metrics
  1108. // collectors on addr.
  1109. func (n *Node) startPrometheusServer(addr string) *http.Server {
  1110. srv := &http.Server{
  1111. Addr: addr,
  1112. Handler: promhttp.InstrumentMetricHandler(
  1113. prometheus.DefaultRegisterer, promhttp.HandlerFor(
  1114. prometheus.DefaultGatherer,
  1115. promhttp.HandlerOpts{MaxRequestsInFlight: n.config.Instrumentation.MaxOpenConnections},
  1116. ),
  1117. ),
  1118. }
  1119. go func() {
  1120. if err := srv.ListenAndServe(); err != http.ErrServerClosed {
  1121. // Error starting or closing listener:
  1122. n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
  1123. }
  1124. }()
  1125. return srv
  1126. }
  1127. // Switch returns the Node's Switch.
  1128. func (n *Node) Switch() *p2p.Switch {
  1129. return n.sw
  1130. }
  1131. // BlockStore returns the Node's BlockStore.
  1132. func (n *Node) BlockStore() *store.BlockStore {
  1133. return n.blockStore
  1134. }
  1135. // ConsensusState returns the Node's ConsensusState.
  1136. func (n *Node) ConsensusState() *cs.State {
  1137. return n.consensusState
  1138. }
  1139. // ConsensusReactor returns the Node's ConsensusReactor.
  1140. func (n *Node) ConsensusReactor() *cs.Reactor {
  1141. return n.consensusReactor
  1142. }
  1143. // MempoolReactor returns the Node's mempool reactor.
  1144. func (n *Node) MempoolReactor() *mempl.Reactor {
  1145. return n.mempoolReactor
  1146. }
  1147. // Mempool returns the Node's mempool.
  1148. func (n *Node) Mempool() mempl.Mempool {
  1149. return n.mempool
  1150. }
  1151. // PEXReactor returns the Node's PEXReactor. It returns nil if PEX is disabled.
  1152. func (n *Node) PEXReactor() *pex.Reactor {
  1153. return n.pexReactor
  1154. }
  1155. // EvidencePool returns the Node's EvidencePool.
  1156. func (n *Node) EvidencePool() *evidence.Pool {
  1157. return n.evidencePool
  1158. }
  1159. // EventBus returns the Node's EventBus.
  1160. func (n *Node) EventBus() *types.EventBus {
  1161. return n.eventBus
  1162. }
  1163. // PrivValidator returns the Node's PrivValidator.
  1164. // XXX: for convenience only!
  1165. func (n *Node) PrivValidator() types.PrivValidator {
  1166. return n.privValidator
  1167. }
  1168. // GenesisDoc returns the Node's GenesisDoc.
  1169. func (n *Node) GenesisDoc() *types.GenesisDoc {
  1170. return n.genesisDoc
  1171. }
  1172. // ProxyApp returns the Node's AppConns, representing its connections to the ABCI application.
  1173. func (n *Node) ProxyApp() proxy.AppConns {
  1174. return n.proxyApp
  1175. }
  1176. // Config returns the Node's config.
  1177. func (n *Node) Config() *cfg.Config {
  1178. return n.config
  1179. }
  1180. //------------------------------------------------------------------------------
  1181. func (n *Node) Listeners() []string {
  1182. return []string{
  1183. fmt.Sprintf("Listener(@%v)", n.config.P2P.ExternalAddress),
  1184. }
  1185. }
  1186. func (n *Node) IsListening() bool {
  1187. return n.isListening
  1188. }
  1189. // NodeInfo returns the Node's Info from the Switch.
  1190. func (n *Node) NodeInfo() p2p.NodeInfo {
  1191. return n.nodeInfo
  1192. }
  1193. func makeNodeInfo(
  1194. config *cfg.Config,
  1195. nodeKey p2p.NodeKey,
  1196. txIndexer txindex.TxIndexer,
  1197. genDoc *types.GenesisDoc,
  1198. state sm.State,
  1199. ) (p2p.NodeInfo, error) {
  1200. txIndexerStatus := "on"
  1201. if _, ok := txIndexer.(*null.TxIndex); ok {
  1202. txIndexerStatus = "off"
  1203. }
  1204. var bcChannel byte
  1205. switch config.FastSync.Version {
  1206. case "v0":
  1207. bcChannel = byte(bcv0.BlockchainChannel)
  1208. case "v2":
  1209. bcChannel = bcv2.BlockchainChannel
  1210. default:
  1211. return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
  1212. }
  1213. nodeInfo := p2p.NodeInfo{
  1214. ProtocolVersion: p2p.NewProtocolVersion(
  1215. version.P2PProtocol, // global
  1216. state.Version.Consensus.Block,
  1217. state.Version.Consensus.App,
  1218. ),
  1219. NodeID: nodeKey.ID,
  1220. Network: genDoc.ChainID,
  1221. Version: version.TMCoreSemVer,
  1222. Channels: []byte{
  1223. bcChannel,
  1224. cs.StateChannel,
  1225. cs.DataChannel,
  1226. cs.VoteChannel,
  1227. cs.VoteSetBitsChannel,
  1228. byte(mempl.MempoolChannel),
  1229. byte(evidence.EvidenceChannel),
  1230. byte(statesync.SnapshotChannel),
  1231. byte(statesync.ChunkChannel),
  1232. },
  1233. Moniker: config.Moniker,
  1234. Other: p2p.NodeInfoOther{
  1235. TxIndex: txIndexerStatus,
  1236. RPCAddress: config.RPC.ListenAddress,
  1237. },
  1238. }
  1239. if config.P2P.PexReactor {
  1240. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  1241. }
  1242. lAddr := config.P2P.ExternalAddress
  1243. if lAddr == "" {
  1244. lAddr = config.P2P.ListenAddress
  1245. }
  1246. nodeInfo.ListenAddr = lAddr
  1247. err := nodeInfo.Validate()
  1248. return nodeInfo, err
  1249. }
  1250. //------------------------------------------------------------------------------
  1251. var (
  1252. genesisDocKey = []byte("genesisDoc")
  1253. )
  1254. // LoadStateFromDBOrGenesisDocProvider attempts to load the state from the
  1255. // database, or creates one using the given genesisDocProvider and persists the
  1256. // result to the database. On success this also returns the genesis doc loaded
  1257. // through the given provider.
  1258. func LoadStateFromDBOrGenesisDocProvider(
  1259. stateDB dbm.DB,
  1260. genesisDocProvider GenesisDocProvider,
  1261. ) (sm.State, *types.GenesisDoc, error) {
  1262. // Get genesis doc
  1263. genDoc, err := loadGenesisDoc(stateDB)
  1264. if err != nil {
  1265. genDoc, err = genesisDocProvider()
  1266. if err != nil {
  1267. return sm.State{}, nil, err
  1268. }
  1269. // save genesis doc to prevent a certain class of user errors (e.g. when it
  1270. // was changed, accidentally or not). Also good for audit trail.
  1271. saveGenesisDoc(stateDB, genDoc)
  1272. }
  1273. stateStore := sm.NewStore(stateDB)
  1274. state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
  1275. if err != nil {
  1276. return sm.State{}, nil, err
  1277. }
  1278. return state, genDoc, nil
  1279. }
  1280. // panics if failed to unmarshal bytes
  1281. func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) {
  1282. b, err := db.Get(genesisDocKey)
  1283. if err != nil {
  1284. panic(err)
  1285. }
  1286. if len(b) == 0 {
  1287. return nil, errors.New("genesis doc not found")
  1288. }
  1289. var genDoc *types.GenesisDoc
  1290. err = tmjson.Unmarshal(b, &genDoc)
  1291. if err != nil {
  1292. panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, b))
  1293. }
  1294. return genDoc, nil
  1295. }
  1296. // panics if failed to marshal the given genesis document
  1297. func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
  1298. b, err := tmjson.Marshal(genDoc)
  1299. if err != nil {
  1300. panic(fmt.Sprintf("Failed to save genesis doc due to marshaling error: %v", err))
  1301. }
  1302. if err := db.SetSync(genesisDocKey, b); err != nil {
  1303. panic(fmt.Sprintf("Failed to save genesis doc: %v", err))
  1304. }
  1305. }
  1306. func createAndStartPrivValidatorSocketClient(
  1307. listenAddr,
  1308. chainID string,
  1309. logger log.Logger,
  1310. ) (types.PrivValidator, error) {
  1311. pve, err := privval.NewSignerListener(listenAddr, logger)
  1312. if err != nil {
  1313. return nil, fmt.Errorf("failed to start private validator: %w", err)
  1314. }
  1315. pvsc, err := privval.NewSignerClient(pve, chainID)
  1316. if err != nil {
  1317. return nil, fmt.Errorf("failed to start private validator: %w", err)
  1318. }
  1319. // try to get a pubkey from private validate first time
  1320. _, err = pvsc.GetPubKey()
  1321. if err != nil {
  1322. return nil, fmt.Errorf("can't get pubkey: %w", err)
  1323. }
  1324. const (
  1325. retries = 50 // 50 * 100ms = 5s total
  1326. timeout = 100 * time.Millisecond
  1327. )
  1328. pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout)
  1329. return pvscWithRetries, nil
  1330. }
  1331. // splitAndTrimEmpty slices s into all subslices separated by sep and returns a
  1332. // slice of the string s with all leading and trailing Unicode code points
  1333. // contained in cutset removed. If sep is empty, SplitAndTrim splits after each
  1334. // UTF-8 sequence. First part is equivalent to strings.SplitN with a count of
  1335. // -1. also filter out empty strings, only return non-empty strings.
  1336. func splitAndTrimEmpty(s, sep, cutset string) []string {
  1337. if s == "" {
  1338. return []string{}
  1339. }
  1340. spl := strings.Split(s, sep)
  1341. nonEmptyStrings := make([]string, 0, len(spl))
  1342. for i := 0; i < len(spl); i++ {
  1343. element := strings.Trim(spl[i], cutset)
  1344. if element != "" {
  1345. nonEmptyStrings = append(nonEmptyStrings, element)
  1346. }
  1347. }
  1348. return nonEmptyStrings
  1349. }