You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

975 lines
29 KiB

limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
max-bytes PR follow-up (#2318) * ReapMaxTxs: return all txs if max is negative this mirrors ReapMaxBytes behavior See https://github.com/tendermint/tendermint/pull/2184#discussion_r214439950 * increase MaxAminoOverheadForBlock tested with: ``` func TestMaxAminoOverheadForBlock(t *testing.T) { maxChainID := "" for i := 0; i < MaxChainIDLen; i++ { maxChainID += "𠜎" } h := Header{ ChainID: maxChainID, Height: 10, Time: time.Now().UTC(), NumTxs: 100, TotalTxs: 200, LastBlockID: makeBlockID(make([]byte, 20), 300, make([]byte, 20)), LastCommitHash: tmhash.Sum([]byte("last_commit_hash")), DataHash: tmhash.Sum([]byte("data_hash")), ValidatorsHash: tmhash.Sum([]byte("validators_hash")), NextValidatorsHash: tmhash.Sum([]byte("next_validators_hash")), ConsensusHash: tmhash.Sum([]byte("consensus_hash")), AppHash: tmhash.Sum([]byte("app_hash")), LastResultsHash: tmhash.Sum([]byte("last_results_hash")), EvidenceHash: tmhash.Sum([]byte("evidence_hash")), ProposerAddress: tmhash.Sum([]byte("proposer_address")), } b := Block{ Header: h, Data: Data{Txs: makeTxs(10000, 100)}, Evidence: EvidenceData{}, LastCommit: &Commit{}, } bz, err := cdc.MarshalBinary(b) require.NoError(t, err) assert.Equal(t, MaxHeaderBytes+MaxAminoOverheadForBlock-2, len(bz)-1000000-20000-1) } ``` * fix MaxYYY constants calculation by using math.MaxInt64 See https://github.com/tendermint/tendermint/pull/2184#discussion_r214444244 * pass mempool filter as an option See https://github.com/tendermint/tendermint/pull/2184#discussion_r214445869 * fixes after Dev's comments
6 years ago
limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
Close and retry a RemoteSigner on err (#2923) * Close and recreate a RemoteSigner on err * Update changelog * Address Anton's comments / suggestions: - update changelog - restart TCPVal - shut down on `ErrUnexpectedResponse` * re-init remote signer client with fresh connection if Ping fails - add/update TODOs in secret connection - rename tcp.go -> tcp_client.go, same with ipc to clarify their purpose * account for `conn returned by waitConnection can be `nil` - also add TODO about RemoteSigner conn field * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn - add rwmutex for conn field in IPC * comments and doc.go * fix ipc tests. fixes #2677 * use constants for tests * cleanup some error statements * fixes #2784, race in tests * remove print statement * minor fixes from review * update comment on sts spec * cosmetics * p2p/conn: add failing tests * p2p/conn: make SecretConnection thread safe * changelog * IPCVal signer refactor - use a .reset() method - don't use embedded RemoteSignerClient - guard RemoteSignerClient with mutex - drop the .conn - expose Close() on RemoteSignerClient * apply IPCVal refactor to TCPVal * remove mtx from RemoteSignerClient * consolidate IPCVal and TCPVal, fixes #3104 - done in tcp_client.go - now called SocketVal - takes a listener in the constructor - make tcpListener and unixListener contain all the differences * delete ipc files * introduce unix and tcp dialer for RemoteSigner * rename files - drop tcp_ prefix - rename priv_validator.go to file.go * bring back listener options * fix node * fix priv_val_server * fix node test * minor cleanup and comments
6 years ago
Close and retry a RemoteSigner on err (#2923) * Close and recreate a RemoteSigner on err * Update changelog * Address Anton's comments / suggestions: - update changelog - restart TCPVal - shut down on `ErrUnexpectedResponse` * re-init remote signer client with fresh connection if Ping fails - add/update TODOs in secret connection - rename tcp.go -> tcp_client.go, same with ipc to clarify their purpose * account for `conn returned by waitConnection can be `nil` - also add TODO about RemoteSigner conn field * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn - add rwmutex for conn field in IPC * comments and doc.go * fix ipc tests. fixes #2677 * use constants for tests * cleanup some error statements * fixes #2784, race in tests * remove print statement * minor fixes from review * update comment on sts spec * cosmetics * p2p/conn: add failing tests * p2p/conn: make SecretConnection thread safe * changelog * IPCVal signer refactor - use a .reset() method - don't use embedded RemoteSignerClient - guard RemoteSignerClient with mutex - drop the .conn - expose Close() on RemoteSignerClient * apply IPCVal refactor to TCPVal * remove mtx from RemoteSignerClient * consolidate IPCVal and TCPVal, fixes #3104 - done in tcp_client.go - now called SocketVal - takes a listener in the constructor - make tcpListener and unixListener contain all the differences * delete ipc files * introduce unix and tcp dialer for RemoteSigner * rename files - drop tcp_ prefix - rename priv_validator.go to file.go * bring back listener options * fix node * fix priv_val_server * fix node test * minor cleanup and comments
6 years ago
Close and retry a RemoteSigner on err (#2923) * Close and recreate a RemoteSigner on err * Update changelog * Address Anton's comments / suggestions: - update changelog - restart TCPVal - shut down on `ErrUnexpectedResponse` * re-init remote signer client with fresh connection if Ping fails - add/update TODOs in secret connection - rename tcp.go -> tcp_client.go, same with ipc to clarify their purpose * account for `conn returned by waitConnection can be `nil` - also add TODO about RemoteSigner conn field * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn - add rwmutex for conn field in IPC * comments and doc.go * fix ipc tests. fixes #2677 * use constants for tests * cleanup some error statements * fixes #2784, race in tests * remove print statement * minor fixes from review * update comment on sts spec * cosmetics * p2p/conn: add failing tests * p2p/conn: make SecretConnection thread safe * changelog * IPCVal signer refactor - use a .reset() method - don't use embedded RemoteSignerClient - guard RemoteSignerClient with mutex - drop the .conn - expose Close() on RemoteSignerClient * apply IPCVal refactor to TCPVal * remove mtx from RemoteSignerClient * consolidate IPCVal and TCPVal, fixes #3104 - done in tcp_client.go - now called SocketVal - takes a listener in the constructor - make tcpListener and unixListener contain all the differences * delete ipc files * introduce unix and tcp dialer for RemoteSigner * rename files - drop tcp_ prefix - rename priv_validator.go to file.go * bring back listener options * fix node * fix priv_val_server * fix node test * minor cleanup and comments
6 years ago
Close and retry a RemoteSigner on err (#2923) * Close and recreate a RemoteSigner on err * Update changelog * Address Anton's comments / suggestions: - update changelog - restart TCPVal - shut down on `ErrUnexpectedResponse` * re-init remote signer client with fresh connection if Ping fails - add/update TODOs in secret connection - rename tcp.go -> tcp_client.go, same with ipc to clarify their purpose * account for `conn returned by waitConnection can be `nil` - also add TODO about RemoteSigner conn field * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn - add rwmutex for conn field in IPC * comments and doc.go * fix ipc tests. fixes #2677 * use constants for tests * cleanup some error statements * fixes #2784, race in tests * remove print statement * minor fixes from review * update comment on sts spec * cosmetics * p2p/conn: add failing tests * p2p/conn: make SecretConnection thread safe * changelog * IPCVal signer refactor - use a .reset() method - don't use embedded RemoteSignerClient - guard RemoteSignerClient with mutex - drop the .conn - expose Close() on RemoteSignerClient * apply IPCVal refactor to TCPVal * remove mtx from RemoteSignerClient * consolidate IPCVal and TCPVal, fixes #3104 - done in tcp_client.go - now called SocketVal - takes a listener in the constructor - make tcpListener and unixListener contain all the differences * delete ipc files * introduce unix and tcp dialer for RemoteSigner * rename files - drop tcp_ prefix - rename priv_validator.go to file.go * bring back listener options * fix node * fix priv_val_server * fix node test * minor cleanup and comments
6 years ago
privval: improve Remote Signer implementation (#3351) This issue is related to #3107 This is a first renaming/refactoring step before reworking and removing heartbeats. As discussed with @Liamsi , we preferred to go for a couple of independent and separate PRs to simplify review work. The changes: Help to clarify the relation between the validator and remote signer endpoints Differentiate between timeouts and deadlines Prepare to encapsulate networking related code behind RemoteSigner in the next PR My intention is to separate and encapsulate the "network related" code from the actual signer. SignerRemote ---(uses/contains)--> SignerValidatorEndpoint <--(connects to)--> SignerServiceEndpoint ---> SignerService (future.. not here yet but would like to decouple too) All reconnection/heartbeat/whatever code goes in the endpoints. Signer[Remote/Service] do not need to know about that. I agree Endpoint may not be the perfect name. I tried to find something "Go-ish" enough. It is a common name in go-kit, kubernetes, etc. Right now: SignerValidatorEndpoint: handles the listener contains SignerRemote Implements the PrivValidator interface connects and sets a connection object in a contained SignerRemote delegates PrivValidator some calls to SignerRemote which in turn uses the conn object that was set externally SignerRemote: Implements the PrivValidator interface read/writes from a connection object directly handles heartbeats SignerServiceEndpoint: Does most things in a single place delegates to a PrivValidator IIRC. * cleanup * Refactoring step 1 * Refactoring step 2 * move messages to another file * mark for future work / next steps * mark deprecated classes in docs * Fix linter problems * additional linter fixes
6 years ago
Close and retry a RemoteSigner on err (#2923) * Close and recreate a RemoteSigner on err * Update changelog * Address Anton's comments / suggestions: - update changelog - restart TCPVal - shut down on `ErrUnexpectedResponse` * re-init remote signer client with fresh connection if Ping fails - add/update TODOs in secret connection - rename tcp.go -> tcp_client.go, same with ipc to clarify their purpose * account for `conn returned by waitConnection can be `nil` - also add TODO about RemoteSigner conn field * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn - add rwmutex for conn field in IPC * comments and doc.go * fix ipc tests. fixes #2677 * use constants for tests * cleanup some error statements * fixes #2784, race in tests * remove print statement * minor fixes from review * update comment on sts spec * cosmetics * p2p/conn: add failing tests * p2p/conn: make SecretConnection thread safe * changelog * IPCVal signer refactor - use a .reset() method - don't use embedded RemoteSignerClient - guard RemoteSignerClient with mutex - drop the .conn - expose Close() on RemoteSignerClient * apply IPCVal refactor to TCPVal * remove mtx from RemoteSignerClient * consolidate IPCVal and TCPVal, fixes #3104 - done in tcp_client.go - now called SocketVal - takes a listener in the constructor - make tcpListener and unixListener contain all the differences * delete ipc files * introduce unix and tcp dialer for RemoteSigner * rename files - drop tcp_ prefix - rename priv_validator.go to file.go * bring back listener options * fix node * fix priv_val_server * fix node test * minor cleanup and comments
6 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. _ "net/http/pprof"
  9. "os"
  10. "strings"
  11. "time"
  12. "github.com/pkg/errors"
  13. "github.com/prometheus/client_golang/prometheus"
  14. "github.com/prometheus/client_golang/prometheus/promhttp"
  15. "github.com/rs/cors"
  16. amino "github.com/tendermint/go-amino"
  17. abci "github.com/tendermint/tendermint/abci/types"
  18. bc "github.com/tendermint/tendermint/blockchain"
  19. cfg "github.com/tendermint/tendermint/config"
  20. cs "github.com/tendermint/tendermint/consensus"
  21. "github.com/tendermint/tendermint/crypto/ed25519"
  22. "github.com/tendermint/tendermint/evidence"
  23. cmn "github.com/tendermint/tendermint/libs/common"
  24. dbm "github.com/tendermint/tendermint/libs/db"
  25. "github.com/tendermint/tendermint/libs/log"
  26. tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
  27. mempl "github.com/tendermint/tendermint/mempool"
  28. "github.com/tendermint/tendermint/p2p"
  29. "github.com/tendermint/tendermint/p2p/pex"
  30. "github.com/tendermint/tendermint/privval"
  31. "github.com/tendermint/tendermint/proxy"
  32. rpccore "github.com/tendermint/tendermint/rpc/core"
  33. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  34. grpccore "github.com/tendermint/tendermint/rpc/grpc"
  35. rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
  36. sm "github.com/tendermint/tendermint/state"
  37. "github.com/tendermint/tendermint/state/txindex"
  38. "github.com/tendermint/tendermint/state/txindex/kv"
  39. "github.com/tendermint/tendermint/state/txindex/null"
  40. "github.com/tendermint/tendermint/types"
  41. tmtime "github.com/tendermint/tendermint/types/time"
  42. "github.com/tendermint/tendermint/version"
  43. )
  44. //------------------------------------------------------------------------------
  45. // DBContext specifies config information for loading a new DB.
  46. type DBContext struct {
  47. ID string
  48. Config *cfg.Config
  49. }
  50. // DBProvider takes a DBContext and returns an instantiated DB.
  51. type DBProvider func(*DBContext) (dbm.DB, error)
  52. // DefaultDBProvider returns a database using the DBBackend and DBDir
  53. // specified in the ctx.Config.
  54. func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
  55. dbType := dbm.DBBackendType(ctx.Config.DBBackend)
  56. return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()), nil
  57. }
  58. // GenesisDocProvider returns a GenesisDoc.
  59. // It allows the GenesisDoc to be pulled from sources other than the
  60. // filesystem, for instance from a distributed key-value store cluster.
  61. type GenesisDocProvider func() (*types.GenesisDoc, error)
  62. // DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
  63. // the GenesisDoc from the config.GenesisFile() on the filesystem.
  64. func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
  65. return func() (*types.GenesisDoc, error) {
  66. return types.GenesisDocFromFile(config.GenesisFile())
  67. }
  68. }
  69. // NodeProvider takes a config and a logger and returns a ready to go Node.
  70. type NodeProvider func(*cfg.Config, log.Logger) (*Node, error)
  71. // DefaultNewNode returns a Tendermint node with default settings for the
  72. // PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
  73. // It implements NodeProvider.
  74. func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
  75. // Generate node PrivKey
  76. nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
  77. if err != nil {
  78. return nil, err
  79. }
  80. // Convert old PrivValidator if it exists.
  81. oldPrivVal := config.OldPrivValidatorFile()
  82. newPrivValKey := config.PrivValidatorKeyFile()
  83. newPrivValState := config.PrivValidatorStateFile()
  84. if _, err := os.Stat(oldPrivVal); !os.IsNotExist(err) {
  85. oldPV, err := privval.LoadOldFilePV(oldPrivVal)
  86. if err != nil {
  87. return nil, fmt.Errorf("Error reading OldPrivValidator from %v: %v\n", oldPrivVal, err)
  88. }
  89. logger.Info("Upgrading PrivValidator file",
  90. "old", oldPrivVal,
  91. "newKey", newPrivValKey,
  92. "newState", newPrivValState,
  93. )
  94. oldPV.Upgrade(newPrivValKey, newPrivValState)
  95. }
  96. return NewNode(config,
  97. privval.LoadOrGenFilePV(newPrivValKey, newPrivValState),
  98. nodeKey,
  99. proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
  100. DefaultGenesisDocProviderFunc(config),
  101. DefaultDBProvider,
  102. DefaultMetricsProvider(config.Instrumentation),
  103. logger,
  104. )
  105. }
  106. // MetricsProvider returns a consensus, p2p and mempool Metrics.
  107. type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
  108. // DefaultMetricsProvider returns Metrics build using Prometheus client library
  109. // if Prometheus is enabled. Otherwise, it returns no-op Metrics.
  110. func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
  111. return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
  112. if config.Prometheus {
  113. return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
  114. p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
  115. mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
  116. sm.PrometheusMetrics(config.Namespace, "chain_id", chainID)
  117. }
  118. return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
  119. }
  120. }
  121. //------------------------------------------------------------------------------
  122. // Node is the highest level interface to a full Tendermint node.
  123. // It includes all configuration information and running services.
  124. type Node struct {
  125. cmn.BaseService
  126. // config
  127. config *cfg.Config
  128. genesisDoc *types.GenesisDoc // initial validator set
  129. privValidator types.PrivValidator // local node's validator key
  130. // network
  131. transport *p2p.MultiplexTransport
  132. sw *p2p.Switch // p2p connections
  133. addrBook pex.AddrBook // known peers
  134. nodeInfo p2p.NodeInfo
  135. nodeKey *p2p.NodeKey // our node privkey
  136. isListening bool
  137. // services
  138. eventBus *types.EventBus // pub/sub for services
  139. stateDB dbm.DB
  140. blockStore *bc.BlockStore // store the blockchain to disk
  141. bcReactor *bc.BlockchainReactor // for fast-syncing
  142. mempoolReactor *mempl.MempoolReactor // for gossipping transactions
  143. consensusState *cs.ConsensusState // latest consensus state
  144. consensusReactor *cs.ConsensusReactor // for participating in the consensus
  145. evidencePool *evidence.EvidencePool // tracking evidence
  146. proxyApp proxy.AppConns // connection to the application
  147. rpcListeners []net.Listener // rpc servers
  148. txIndexer txindex.TxIndexer
  149. indexerService *txindex.IndexerService
  150. prometheusSrv *http.Server
  151. }
  152. // NewNode returns a new, ready to go, Tendermint Node.
  153. func NewNode(config *cfg.Config,
  154. privValidator types.PrivValidator,
  155. nodeKey *p2p.NodeKey,
  156. clientCreator proxy.ClientCreator,
  157. genesisDocProvider GenesisDocProvider,
  158. dbProvider DBProvider,
  159. metricsProvider MetricsProvider,
  160. logger log.Logger) (*Node, error) {
  161. // Get BlockStore
  162. blockStoreDB, err := dbProvider(&DBContext{"blockstore", config})
  163. if err != nil {
  164. return nil, err
  165. }
  166. blockStore := bc.NewBlockStore(blockStoreDB)
  167. // Get State
  168. stateDB, err := dbProvider(&DBContext{"state", config})
  169. if err != nil {
  170. return nil, err
  171. }
  172. // Get genesis doc
  173. // TODO: move to state package?
  174. genDoc, err := loadGenesisDoc(stateDB)
  175. if err != nil {
  176. genDoc, err = genesisDocProvider()
  177. if err != nil {
  178. return nil, err
  179. }
  180. // save genesis doc to prevent a certain class of user errors (e.g. when it
  181. // was changed, accidentally or not). Also good for audit trail.
  182. saveGenesisDoc(stateDB, genDoc)
  183. }
  184. state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
  185. if err != nil {
  186. return nil, err
  187. }
  188. // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
  189. proxyApp := proxy.NewAppConns(clientCreator)
  190. proxyApp.SetLogger(logger.With("module", "proxy"))
  191. if err := proxyApp.Start(); err != nil {
  192. return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
  193. }
  194. // EventBus and IndexerService must be started before the handshake because
  195. // we might need to index the txs of the replayed block as this might not have happened
  196. // when the node stopped last time (i.e. the node stopped after it saved the block
  197. // but before it indexed the txs, or, endblocker panicked)
  198. eventBus := types.NewEventBus()
  199. eventBus.SetLogger(logger.With("module", "events"))
  200. err = eventBus.Start()
  201. if err != nil {
  202. return nil, err
  203. }
  204. // Transaction indexing
  205. var txIndexer txindex.TxIndexer
  206. switch config.TxIndex.Indexer {
  207. case "kv":
  208. store, err := dbProvider(&DBContext{"tx_index", config})
  209. if err != nil {
  210. return nil, err
  211. }
  212. if config.TxIndex.IndexTags != "" {
  213. txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
  214. } else if config.TxIndex.IndexAllTags {
  215. txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
  216. } else {
  217. txIndexer = kv.NewTxIndex(store)
  218. }
  219. default:
  220. txIndexer = &null.TxIndex{}
  221. }
  222. indexerService := txindex.NewIndexerService(txIndexer, eventBus)
  223. indexerService.SetLogger(logger.With("module", "txindex"))
  224. err = indexerService.Start()
  225. if err != nil {
  226. return nil, err
  227. }
  228. // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
  229. // and replays any blocks as necessary to sync tendermint with the app.
  230. consensusLogger := logger.With("module", "consensus")
  231. handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
  232. handshaker.SetLogger(consensusLogger)
  233. handshaker.SetEventBus(eventBus)
  234. if err := handshaker.Handshake(proxyApp); err != nil {
  235. return nil, fmt.Errorf("Error during handshake: %v", err)
  236. }
  237. // Reload the state. It will have the Version.Consensus.App set by the
  238. // Handshake, and may have other modifications as well (ie. depending on
  239. // what happened during block replay).
  240. state = sm.LoadState(stateDB)
  241. // Log the version info.
  242. logger.Info("Version info",
  243. "software", version.TMCoreSemVer,
  244. "block", version.BlockProtocol,
  245. "p2p", version.P2PProtocol,
  246. )
  247. // If the state and software differ in block version, at least log it.
  248. if state.Version.Consensus.Block != version.BlockProtocol {
  249. logger.Info("Software and state have different block protocols",
  250. "software", version.BlockProtocol,
  251. "state", state.Version.Consensus.Block,
  252. )
  253. }
  254. if config.PrivValidatorListenAddr != "" {
  255. // If an address is provided, listen on the socket for a connection from an
  256. // external signing process.
  257. // FIXME: we should start services inside OnStart
  258. privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger)
  259. if err != nil {
  260. return nil, errors.Wrap(err, "Error with private validator socket client")
  261. }
  262. }
  263. // Decide whether to fast-sync or not
  264. // We don't fast-sync when the only validator is us.
  265. fastSync := config.FastSync
  266. if state.Validators.Size() == 1 {
  267. addr, _ := state.Validators.GetByIndex(0)
  268. privValAddr := privValidator.GetPubKey().Address()
  269. if bytes.Equal(privValAddr, addr) {
  270. fastSync = false
  271. }
  272. }
  273. pubKey := privValidator.GetPubKey()
  274. addr := pubKey.Address()
  275. // Log whether this node is a validator or an observer
  276. if state.Validators.HasAddress(addr) {
  277. consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
  278. } else {
  279. consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
  280. }
  281. csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
  282. // Make MempoolReactor
  283. mempool := mempl.NewMempool(
  284. config.Mempool,
  285. proxyApp.Mempool(),
  286. state.LastBlockHeight,
  287. mempl.WithMetrics(memplMetrics),
  288. mempl.WithPreCheck(sm.TxPreCheck(state)),
  289. mempl.WithPostCheck(sm.TxPostCheck(state)),
  290. )
  291. mempoolLogger := logger.With("module", "mempool")
  292. mempool.SetLogger(mempoolLogger)
  293. if config.Mempool.WalEnabled() {
  294. mempool.InitWAL() // no need to have the mempool wal during tests
  295. }
  296. mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
  297. mempoolReactor.SetLogger(mempoolLogger)
  298. if config.Consensus.WaitForTxs() {
  299. mempool.EnableTxsAvailable()
  300. }
  301. // Make Evidence Reactor
  302. evidenceDB, err := dbProvider(&DBContext{"evidence", config})
  303. if err != nil {
  304. return nil, err
  305. }
  306. evidenceLogger := logger.With("module", "evidence")
  307. evidencePool := evidence.NewEvidencePool(stateDB, evidenceDB)
  308. evidencePool.SetLogger(evidenceLogger)
  309. evidenceReactor := evidence.NewEvidenceReactor(evidencePool)
  310. evidenceReactor.SetLogger(evidenceLogger)
  311. blockExecLogger := logger.With("module", "state")
  312. // make block executor for consensus and blockchain reactors to execute blocks
  313. blockExec := sm.NewBlockExecutor(
  314. stateDB,
  315. blockExecLogger,
  316. proxyApp.Consensus(),
  317. mempool,
  318. evidencePool,
  319. sm.BlockExecutorWithMetrics(smMetrics),
  320. )
  321. // Make BlockchainReactor
  322. bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
  323. bcReactor.SetLogger(logger.With("module", "blockchain"))
  324. // Make ConsensusReactor
  325. consensusState := cs.NewConsensusState(
  326. config.Consensus,
  327. state.Copy(),
  328. blockExec,
  329. blockStore,
  330. mempool,
  331. evidencePool,
  332. cs.StateMetrics(csMetrics),
  333. )
  334. consensusState.SetLogger(consensusLogger)
  335. if privValidator != nil {
  336. consensusState.SetPrivValidator(privValidator)
  337. }
  338. consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics))
  339. consensusReactor.SetLogger(consensusLogger)
  340. // services which will be publishing and/or subscribing for messages (events)
  341. // consensusReactor will set it on consensusState and blockExecutor
  342. consensusReactor.SetEventBus(eventBus)
  343. p2pLogger := logger.With("module", "p2p")
  344. nodeInfo, err := makeNodeInfo(
  345. config,
  346. nodeKey.ID(),
  347. txIndexer,
  348. genDoc.ChainID,
  349. p2p.NewProtocolVersion(
  350. version.P2PProtocol, // global
  351. state.Version.Consensus.Block,
  352. state.Version.Consensus.App,
  353. ),
  354. )
  355. if err != nil {
  356. return nil, err
  357. }
  358. // Setup Transport.
  359. var (
  360. mConnConfig = p2p.MConnConfig(config.P2P)
  361. transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig)
  362. connFilters = []p2p.ConnFilterFunc{}
  363. peerFilters = []p2p.PeerFilterFunc{}
  364. )
  365. if !config.P2P.AllowDuplicateIP {
  366. connFilters = append(connFilters, p2p.ConnDuplicateIPFilter())
  367. }
  368. // Filter peers by addr or pubkey with an ABCI query.
  369. // If the query return code is OK, add peer.
  370. if config.FilterPeers {
  371. connFilters = append(
  372. connFilters,
  373. // ABCI query for address filtering.
  374. func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
  375. res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
  376. Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
  377. })
  378. if err != nil {
  379. return err
  380. }
  381. if res.IsErr() {
  382. return fmt.Errorf("Error querying abci app: %v", res)
  383. }
  384. return nil
  385. },
  386. )
  387. peerFilters = append(
  388. peerFilters,
  389. // ABCI query for ID filtering.
  390. func(_ p2p.IPeerSet, p p2p.Peer) error {
  391. res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
  392. Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
  393. })
  394. if err != nil {
  395. return err
  396. }
  397. if res.IsErr() {
  398. return fmt.Errorf("Error querying abci app: %v", res)
  399. }
  400. return nil
  401. },
  402. )
  403. }
  404. p2p.MultiplexTransportConnFilters(connFilters...)(transport)
  405. // Setup Switch.
  406. sw := p2p.NewSwitch(
  407. config.P2P,
  408. transport,
  409. p2p.WithMetrics(p2pMetrics),
  410. p2p.SwitchPeerFilters(peerFilters...),
  411. )
  412. sw.SetLogger(p2pLogger)
  413. sw.AddReactor("MEMPOOL", mempoolReactor)
  414. sw.AddReactor("BLOCKCHAIN", bcReactor)
  415. sw.AddReactor("CONSENSUS", consensusReactor)
  416. sw.AddReactor("EVIDENCE", evidenceReactor)
  417. sw.SetNodeInfo(nodeInfo)
  418. sw.SetNodeKey(nodeKey)
  419. p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
  420. // Optionally, start the pex reactor
  421. //
  422. // TODO:
  423. //
  424. // We need to set Seeds and PersistentPeers on the switch,
  425. // since it needs to be able to use these (and their DNS names)
  426. // even if the PEX is off. We can include the DNS name in the NetAddress,
  427. // but it would still be nice to have a clear list of the current "PersistentPeers"
  428. // somewhere that we can return with net_info.
  429. //
  430. // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
  431. // Note we currently use the addrBook regardless at least for AddOurAddress
  432. addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  433. // Add ourselves to addrbook to prevent dialing ourselves
  434. addrBook.AddOurAddress(nodeInfo.NetAddress())
  435. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  436. if config.P2P.PexReactor {
  437. // TODO persistent peers ? so we can have their DNS addrs saved
  438. pexReactor := pex.NewPEXReactor(addrBook,
  439. &pex.PEXReactorConfig{
  440. Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
  441. SeedMode: config.P2P.SeedMode,
  442. })
  443. pexReactor.SetLogger(logger.With("module", "pex"))
  444. sw.AddReactor("PEX", pexReactor)
  445. }
  446. sw.SetAddrBook(addrBook)
  447. // run the profile server
  448. profileHost := config.ProfListenAddress
  449. if profileHost != "" {
  450. go func() {
  451. logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil))
  452. }()
  453. }
  454. node := &Node{
  455. config: config,
  456. genesisDoc: genDoc,
  457. privValidator: privValidator,
  458. transport: transport,
  459. sw: sw,
  460. addrBook: addrBook,
  461. nodeInfo: nodeInfo,
  462. nodeKey: nodeKey,
  463. stateDB: stateDB,
  464. blockStore: blockStore,
  465. bcReactor: bcReactor,
  466. mempoolReactor: mempoolReactor,
  467. consensusState: consensusState,
  468. consensusReactor: consensusReactor,
  469. evidencePool: evidencePool,
  470. proxyApp: proxyApp,
  471. txIndexer: txIndexer,
  472. indexerService: indexerService,
  473. eventBus: eventBus,
  474. }
  475. node.BaseService = *cmn.NewBaseService(logger, "Node", node)
  476. return node, nil
  477. }
  478. // OnStart starts the Node. It implements cmn.Service.
  479. func (n *Node) OnStart() error {
  480. now := tmtime.Now()
  481. genTime := n.genesisDoc.GenesisTime
  482. if genTime.After(now) {
  483. n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
  484. time.Sleep(genTime.Sub(now))
  485. }
  486. // Add private IDs to addrbook to block those peers being added
  487. n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
  488. // Start the RPC server before the P2P server
  489. // so we can eg. receive txs for the first block
  490. if n.config.RPC.ListenAddress != "" {
  491. listeners, err := n.startRPC()
  492. if err != nil {
  493. return err
  494. }
  495. n.rpcListeners = listeners
  496. }
  497. if n.config.Instrumentation.Prometheus &&
  498. n.config.Instrumentation.PrometheusListenAddr != "" {
  499. n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
  500. }
  501. // Start the transport.
  502. addr, err := p2p.NewNetAddressStringWithOptionalID(n.config.P2P.ListenAddress)
  503. if err != nil {
  504. return err
  505. }
  506. if err := n.transport.Listen(*addr); err != nil {
  507. return err
  508. }
  509. n.isListening = true
  510. // Start the switch (the P2P server).
  511. err = n.sw.Start()
  512. if err != nil {
  513. return err
  514. }
  515. // Always connect to persistent peers
  516. if n.config.P2P.PersistentPeers != "" {
  517. err = n.sw.DialPeersAsync(n.addrBook, splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "), true)
  518. if err != nil {
  519. return err
  520. }
  521. }
  522. return nil
  523. }
  524. // OnStop stops the Node. It implements cmn.Service.
  525. func (n *Node) OnStop() {
  526. n.BaseService.OnStop()
  527. n.Logger.Info("Stopping Node")
  528. // first stop the non-reactor services
  529. n.eventBus.Stop()
  530. n.indexerService.Stop()
  531. // now stop the reactors
  532. // TODO: gracefully disconnect from peers.
  533. n.sw.Stop()
  534. // stop mempool WAL
  535. if n.config.Mempool.WalEnabled() {
  536. n.mempoolReactor.Mempool.CloseWAL()
  537. }
  538. if err := n.transport.Close(); err != nil {
  539. n.Logger.Error("Error closing transport", "err", err)
  540. }
  541. n.isListening = false
  542. // finally stop the listeners / external services
  543. for _, l := range n.rpcListeners {
  544. n.Logger.Info("Closing rpc listener", "listener", l)
  545. if err := l.Close(); err != nil {
  546. n.Logger.Error("Error closing listener", "listener", l, "err", err)
  547. }
  548. }
  549. if pvsc, ok := n.privValidator.(cmn.Service); ok {
  550. pvsc.Stop()
  551. }
  552. if n.prometheusSrv != nil {
  553. if err := n.prometheusSrv.Shutdown(context.Background()); err != nil {
  554. // Error from closing listeners, or context timeout:
  555. n.Logger.Error("Prometheus HTTP server Shutdown", "err", err)
  556. }
  557. }
  558. }
  559. // ConfigureRPC sets all variables in rpccore so they will serve
  560. // rpc calls from this node
  561. func (n *Node) ConfigureRPC() {
  562. rpccore.SetStateDB(n.stateDB)
  563. rpccore.SetBlockStore(n.blockStore)
  564. rpccore.SetConsensusState(n.consensusState)
  565. rpccore.SetMempool(n.mempoolReactor.Mempool)
  566. rpccore.SetEvidencePool(n.evidencePool)
  567. rpccore.SetP2PPeers(n.sw)
  568. rpccore.SetP2PTransport(n)
  569. pubKey := n.privValidator.GetPubKey()
  570. rpccore.SetPubKey(pubKey)
  571. rpccore.SetGenesisDoc(n.genesisDoc)
  572. rpccore.SetAddrBook(n.addrBook)
  573. rpccore.SetProxyAppQuery(n.proxyApp.Query())
  574. rpccore.SetTxIndexer(n.txIndexer)
  575. rpccore.SetConsensusReactor(n.consensusReactor)
  576. rpccore.SetEventBus(n.eventBus)
  577. rpccore.SetLogger(n.Logger.With("module", "rpc"))
  578. rpccore.SetConfig(*n.config.RPC)
  579. }
  580. func (n *Node) startRPC() ([]net.Listener, error) {
  581. n.ConfigureRPC()
  582. listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ")
  583. coreCodec := amino.NewCodec()
  584. ctypes.RegisterAmino(coreCodec)
  585. if n.config.RPC.Unsafe {
  586. rpccore.AddUnsafeRoutes()
  587. }
  588. // we may expose the rpc over both a unix and tcp socket
  589. listeners := make([]net.Listener, len(listenAddrs))
  590. for i, listenAddr := range listenAddrs {
  591. mux := http.NewServeMux()
  592. rpcLogger := n.Logger.With("module", "rpc-server")
  593. wmLogger := rpcLogger.With("protocol", "websocket")
  594. wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec,
  595. rpcserver.OnDisconnect(func(remoteAddr string) {
  596. err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
  597. if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
  598. wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
  599. }
  600. }))
  601. wm.SetLogger(wmLogger)
  602. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  603. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)
  604. config := rpcserver.DefaultConfig()
  605. config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
  606. // If necessary adjust global WriteTimeout to ensure it's greater than
  607. // TimeoutBroadcastTxCommit.
  608. // See https://github.com/tendermint/tendermint/issues/3435
  609. if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
  610. config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
  611. }
  612. listener, err := rpcserver.Listen(
  613. listenAddr,
  614. config,
  615. )
  616. if err != nil {
  617. return nil, err
  618. }
  619. var rootHandler http.Handler = mux
  620. if n.config.RPC.IsCorsEnabled() {
  621. corsMiddleware := cors.New(cors.Options{
  622. AllowedOrigins: n.config.RPC.CORSAllowedOrigins,
  623. AllowedMethods: n.config.RPC.CORSAllowedMethods,
  624. AllowedHeaders: n.config.RPC.CORSAllowedHeaders,
  625. })
  626. rootHandler = corsMiddleware.Handler(mux)
  627. }
  628. if n.config.RPC.IsTLSEnabled() {
  629. go rpcserver.StartHTTPAndTLSServer(
  630. listener,
  631. rootHandler,
  632. n.config.RPC.CertFile(),
  633. n.config.RPC.KeyFile(),
  634. rpcLogger,
  635. config,
  636. )
  637. } else {
  638. go rpcserver.StartHTTPServer(
  639. listener,
  640. rootHandler,
  641. rpcLogger,
  642. config,
  643. )
  644. }
  645. listeners[i] = listener
  646. }
  647. // we expose a simplified api over grpc for convenience to app devs
  648. grpcListenAddr := n.config.RPC.GRPCListenAddress
  649. if grpcListenAddr != "" {
  650. config := rpcserver.DefaultConfig()
  651. config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
  652. listener, err := rpcserver.Listen(grpcListenAddr, config)
  653. if err != nil {
  654. return nil, err
  655. }
  656. go grpccore.StartGRPCServer(listener)
  657. listeners = append(listeners, listener)
  658. }
  659. return listeners, nil
  660. }
  661. // startPrometheusServer starts a Prometheus HTTP server, listening for metrics
  662. // collectors on addr.
  663. func (n *Node) startPrometheusServer(addr string) *http.Server {
  664. srv := &http.Server{
  665. Addr: addr,
  666. Handler: promhttp.InstrumentMetricHandler(
  667. prometheus.DefaultRegisterer, promhttp.HandlerFor(
  668. prometheus.DefaultGatherer,
  669. promhttp.HandlerOpts{MaxRequestsInFlight: n.config.Instrumentation.MaxOpenConnections},
  670. ),
  671. ),
  672. }
  673. go func() {
  674. if err := srv.ListenAndServe(); err != http.ErrServerClosed {
  675. // Error starting or closing listener:
  676. n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
  677. }
  678. }()
  679. return srv
  680. }
  681. // Switch returns the Node's Switch.
  682. func (n *Node) Switch() *p2p.Switch {
  683. return n.sw
  684. }
  685. // BlockStore returns the Node's BlockStore.
  686. func (n *Node) BlockStore() *bc.BlockStore {
  687. return n.blockStore
  688. }
  689. // ConsensusState returns the Node's ConsensusState.
  690. func (n *Node) ConsensusState() *cs.ConsensusState {
  691. return n.consensusState
  692. }
  693. // ConsensusReactor returns the Node's ConsensusReactor.
  694. func (n *Node) ConsensusReactor() *cs.ConsensusReactor {
  695. return n.consensusReactor
  696. }
  697. // MempoolReactor returns the Node's MempoolReactor.
  698. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  699. return n.mempoolReactor
  700. }
  701. // EvidencePool returns the Node's EvidencePool.
  702. func (n *Node) EvidencePool() *evidence.EvidencePool {
  703. return n.evidencePool
  704. }
  705. // EventBus returns the Node's EventBus.
  706. func (n *Node) EventBus() *types.EventBus {
  707. return n.eventBus
  708. }
  709. // PrivValidator returns the Node's PrivValidator.
  710. // XXX: for convenience only!
  711. func (n *Node) PrivValidator() types.PrivValidator {
  712. return n.privValidator
  713. }
  714. // GenesisDoc returns the Node's GenesisDoc.
  715. func (n *Node) GenesisDoc() *types.GenesisDoc {
  716. return n.genesisDoc
  717. }
  718. // ProxyApp returns the Node's AppConns, representing its connections to the ABCI application.
  719. func (n *Node) ProxyApp() proxy.AppConns {
  720. return n.proxyApp
  721. }
  722. // Config returns the Node's config.
  723. func (n *Node) Config() *cfg.Config {
  724. return n.config
  725. }
  726. //------------------------------------------------------------------------------
  727. func (n *Node) Listeners() []string {
  728. return []string{
  729. fmt.Sprintf("Listener(@%v)", n.config.P2P.ExternalAddress),
  730. }
  731. }
  732. func (n *Node) IsListening() bool {
  733. return n.isListening
  734. }
  735. // NodeInfo returns the Node's Info from the Switch.
  736. func (n *Node) NodeInfo() p2p.NodeInfo {
  737. return n.nodeInfo
  738. }
  739. func makeNodeInfo(
  740. config *cfg.Config,
  741. nodeID p2p.ID,
  742. txIndexer txindex.TxIndexer,
  743. chainID string,
  744. protocolVersion p2p.ProtocolVersion,
  745. ) (p2p.NodeInfo, error) {
  746. txIndexerStatus := "on"
  747. if _, ok := txIndexer.(*null.TxIndex); ok {
  748. txIndexerStatus = "off"
  749. }
  750. nodeInfo := p2p.DefaultNodeInfo{
  751. ProtocolVersion: protocolVersion,
  752. ID_: nodeID,
  753. Network: chainID,
  754. Version: version.TMCoreSemVer,
  755. Channels: []byte{
  756. bc.BlockchainChannel,
  757. cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
  758. mempl.MempoolChannel,
  759. evidence.EvidenceChannel,
  760. },
  761. Moniker: config.Moniker,
  762. Other: p2p.DefaultNodeInfoOther{
  763. TxIndex: txIndexerStatus,
  764. RPCAddress: config.RPC.ListenAddress,
  765. },
  766. }
  767. if config.P2P.PexReactor {
  768. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  769. }
  770. lAddr := config.P2P.ExternalAddress
  771. if lAddr == "" {
  772. lAddr = config.P2P.ListenAddress
  773. }
  774. nodeInfo.ListenAddr = lAddr
  775. err := nodeInfo.Validate()
  776. return nodeInfo, err
  777. }
  778. //------------------------------------------------------------------------------
  779. var (
  780. genesisDocKey = []byte("genesisDoc")
  781. )
  782. // panics if failed to unmarshal bytes
  783. func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) {
  784. bytes := db.Get(genesisDocKey)
  785. if len(bytes) == 0 {
  786. return nil, errors.New("Genesis doc not found")
  787. }
  788. var genDoc *types.GenesisDoc
  789. err := cdc.UnmarshalJSON(bytes, &genDoc)
  790. if err != nil {
  791. cmn.PanicCrisis(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, bytes))
  792. }
  793. return genDoc, nil
  794. }
  795. // panics if failed to marshal the given genesis document
  796. func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
  797. bytes, err := cdc.MarshalJSON(genDoc)
  798. if err != nil {
  799. cmn.PanicCrisis(fmt.Sprintf("Failed to save genesis doc due to marshaling error: %v", err))
  800. }
  801. db.SetSync(genesisDocKey, bytes)
  802. }
  803. func createAndStartPrivValidatorSocketClient(
  804. listenAddr string,
  805. logger log.Logger,
  806. ) (types.PrivValidator, error) {
  807. var listener net.Listener
  808. protocol, address := cmn.ProtocolAndAddress(listenAddr)
  809. ln, err := net.Listen(protocol, address)
  810. if err != nil {
  811. return nil, err
  812. }
  813. switch protocol {
  814. case "unix":
  815. listener = privval.NewUnixListener(ln)
  816. case "tcp":
  817. // TODO: persist this key so external signer
  818. // can actually authenticate us
  819. listener = privval.NewTCPListener(ln, ed25519.GenPrivKey())
  820. default:
  821. return nil, fmt.Errorf(
  822. "Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
  823. protocol,
  824. )
  825. }
  826. pvsc := privval.NewSignerValidatorEndpoint(logger.With("module", "privval"), listener)
  827. if err := pvsc.Start(); err != nil {
  828. return nil, errors.Wrap(err, "failed to start private validator")
  829. }
  830. return pvsc, nil
  831. }
  832. // splitAndTrimEmpty slices s into all subslices separated by sep and returns a
  833. // slice of the string s with all leading and trailing Unicode code points
  834. // contained in cutset removed. If sep is empty, SplitAndTrim splits after each
  835. // UTF-8 sequence. First part is equivalent to strings.SplitN with a count of
  836. // -1. also filter out empty strings, only return non-empty strings.
  837. func splitAndTrimEmpty(s, sep, cutset string) []string {
  838. if s == "" {
  839. return []string{}
  840. }
  841. spl := strings.Split(s, sep)
  842. nonEmptyStrings := make([]string, 0, len(spl))
  843. for i := 0; i < len(spl); i++ {
  844. element := strings.Trim(spl[i], cutset)
  845. if element != "" {
  846. nonEmptyStrings = append(nonEmptyStrings, element)
  847. }
  848. }
  849. return nonEmptyStrings
  850. }