You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

987 lines
29 KiB

limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
max-bytes PR follow-up (#2318) * ReapMaxTxs: return all txs if max is negative this mirrors ReapMaxBytes behavior See https://github.com/tendermint/tendermint/pull/2184#discussion_r214439950 * increase MaxAminoOverheadForBlock tested with: ``` func TestMaxAminoOverheadForBlock(t *testing.T) { maxChainID := "" for i := 0; i < MaxChainIDLen; i++ { maxChainID += "𠜎" } h := Header{ ChainID: maxChainID, Height: 10, Time: time.Now().UTC(), NumTxs: 100, TotalTxs: 200, LastBlockID: makeBlockID(make([]byte, 20), 300, make([]byte, 20)), LastCommitHash: tmhash.Sum([]byte("last_commit_hash")), DataHash: tmhash.Sum([]byte("data_hash")), ValidatorsHash: tmhash.Sum([]byte("validators_hash")), NextValidatorsHash: tmhash.Sum([]byte("next_validators_hash")), ConsensusHash: tmhash.Sum([]byte("consensus_hash")), AppHash: tmhash.Sum([]byte("app_hash")), LastResultsHash: tmhash.Sum([]byte("last_results_hash")), EvidenceHash: tmhash.Sum([]byte("evidence_hash")), ProposerAddress: tmhash.Sum([]byte("proposer_address")), } b := Block{ Header: h, Data: Data{Txs: makeTxs(10000, 100)}, Evidence: EvidenceData{}, LastCommit: &Commit{}, } bz, err := cdc.MarshalBinary(b) require.NoError(t, err) assert.Equal(t, MaxHeaderBytes+MaxAminoOverheadForBlock-2, len(bz)-1000000-20000-1) } ``` * fix MaxYYY constants calculation by using math.MaxInt64 See https://github.com/tendermint/tendermint/pull/2184#discussion_r214444244 * pass mempool filter as an option See https://github.com/tendermint/tendermint/pull/2184#discussion_r214445869 * fixes after Dev's comments
6 years ago
p2p: seed mode refactoring (#3011) ListOfKnownAddresses is removed panic if addrbook size is less than zero CrawlPeers does not attempt to connect to existing or peers we're currently dialing various perf. fixes improved tests (though not complete) move IsDialingOrExistingAddress check into DialPeerWithAddress (Fixes #2716) * addrbook: preallocate memory when saving addrbook to file * addrbook: remove oldestFirst struct and check for ID * oldestFirst replaced with sort.Slice * ID is now mandatory, so no need to check * addrbook: remove ListOfKnownAddresses GetSelection is used instead in seed mode. * addrbook: panic if size is less than 0 * rewrite addrbook#saveToFile to not use a counter * test AttemptDisconnects func * move IsDialingOrExistingAddress check into DialPeerWithAddress * save and cleanup crawl peer data * get rid of DefaultSeedDisconnectWaitPeriod * make linter happy * fix TestPEXReactorSeedMode * fix comment * add a changelog entry * Apply suggestions from code review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * rename ErrDialingOrExistingAddress to ErrCurrentlyDialingOrExistingAddress * lowercase errors * do not persist seed data pros: - no extra files - less IO cons: - if the node crashes, seed might crawl a peer too soon * fixes after Ethan's review * add a changelog entry * we should only consult Switch about peers checking addrbook size does not make sense since only PEX reactor uses it for dialing peers! https://github.com/tendermint/tendermint/pull/3011#discussion_r270948875
6 years ago
limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
limit number of /subscribe clients and queries per client (#3269) * limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
6 years ago
Close and retry a RemoteSigner on err (#2923) * Close and recreate a RemoteSigner on err * Update changelog * Address Anton's comments / suggestions: - update changelog - restart TCPVal - shut down on `ErrUnexpectedResponse` * re-init remote signer client with fresh connection if Ping fails - add/update TODOs in secret connection - rename tcp.go -> tcp_client.go, same with ipc to clarify their purpose * account for `conn returned by waitConnection can be `nil` - also add TODO about RemoteSigner conn field * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn - add rwmutex for conn field in IPC * comments and doc.go * fix ipc tests. fixes #2677 * use constants for tests * cleanup some error statements * fixes #2784, race in tests * remove print statement * minor fixes from review * update comment on sts spec * cosmetics * p2p/conn: add failing tests * p2p/conn: make SecretConnection thread safe * changelog * IPCVal signer refactor - use a .reset() method - don't use embedded RemoteSignerClient - guard RemoteSignerClient with mutex - drop the .conn - expose Close() on RemoteSignerClient * apply IPCVal refactor to TCPVal * remove mtx from RemoteSignerClient * consolidate IPCVal and TCPVal, fixes #3104 - done in tcp_client.go - now called SocketVal - takes a listener in the constructor - make tcpListener and unixListener contain all the differences * delete ipc files * introduce unix and tcp dialer for RemoteSigner * rename files - drop tcp_ prefix - rename priv_validator.go to file.go * bring back listener options * fix node * fix priv_val_server * fix node test * minor cleanup and comments
6 years ago
Close and retry a RemoteSigner on err (#2923) * Close and recreate a RemoteSigner on err * Update changelog * Address Anton's comments / suggestions: - update changelog - restart TCPVal - shut down on `ErrUnexpectedResponse` * re-init remote signer client with fresh connection if Ping fails - add/update TODOs in secret connection - rename tcp.go -> tcp_client.go, same with ipc to clarify their purpose * account for `conn returned by waitConnection can be `nil` - also add TODO about RemoteSigner conn field * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn - add rwmutex for conn field in IPC * comments and doc.go * fix ipc tests. fixes #2677 * use constants for tests * cleanup some error statements * fixes #2784, race in tests * remove print statement * minor fixes from review * update comment on sts spec * cosmetics * p2p/conn: add failing tests * p2p/conn: make SecretConnection thread safe * changelog * IPCVal signer refactor - use a .reset() method - don't use embedded RemoteSignerClient - guard RemoteSignerClient with mutex - drop the .conn - expose Close() on RemoteSignerClient * apply IPCVal refactor to TCPVal * remove mtx from RemoteSignerClient * consolidate IPCVal and TCPVal, fixes #3104 - done in tcp_client.go - now called SocketVal - takes a listener in the constructor - make tcpListener and unixListener contain all the differences * delete ipc files * introduce unix and tcp dialer for RemoteSigner * rename files - drop tcp_ prefix - rename priv_validator.go to file.go * bring back listener options * fix node * fix priv_val_server * fix node test * minor cleanup and comments
6 years ago
Close and retry a RemoteSigner on err (#2923) * Close and recreate a RemoteSigner on err * Update changelog * Address Anton's comments / suggestions: - update changelog - restart TCPVal - shut down on `ErrUnexpectedResponse` * re-init remote signer client with fresh connection if Ping fails - add/update TODOs in secret connection - rename tcp.go -> tcp_client.go, same with ipc to clarify their purpose * account for `conn returned by waitConnection can be `nil` - also add TODO about RemoteSigner conn field * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn - add rwmutex for conn field in IPC * comments and doc.go * fix ipc tests. fixes #2677 * use constants for tests * cleanup some error statements * fixes #2784, race in tests * remove print statement * minor fixes from review * update comment on sts spec * cosmetics * p2p/conn: add failing tests * p2p/conn: make SecretConnection thread safe * changelog * IPCVal signer refactor - use a .reset() method - don't use embedded RemoteSignerClient - guard RemoteSignerClient with mutex - drop the .conn - expose Close() on RemoteSignerClient * apply IPCVal refactor to TCPVal * remove mtx from RemoteSignerClient * consolidate IPCVal and TCPVal, fixes #3104 - done in tcp_client.go - now called SocketVal - takes a listener in the constructor - make tcpListener and unixListener contain all the differences * delete ipc files * introduce unix and tcp dialer for RemoteSigner * rename files - drop tcp_ prefix - rename priv_validator.go to file.go * bring back listener options * fix node * fix priv_val_server * fix node test * minor cleanup and comments
6 years ago
Close and retry a RemoteSigner on err (#2923) * Close and recreate a RemoteSigner on err * Update changelog * Address Anton's comments / suggestions: - update changelog - restart TCPVal - shut down on `ErrUnexpectedResponse` * re-init remote signer client with fresh connection if Ping fails - add/update TODOs in secret connection - rename tcp.go -> tcp_client.go, same with ipc to clarify their purpose * account for `conn returned by waitConnection can be `nil` - also add TODO about RemoteSigner conn field * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn - add rwmutex for conn field in IPC * comments and doc.go * fix ipc tests. fixes #2677 * use constants for tests * cleanup some error statements * fixes #2784, race in tests * remove print statement * minor fixes from review * update comment on sts spec * cosmetics * p2p/conn: add failing tests * p2p/conn: make SecretConnection thread safe * changelog * IPCVal signer refactor - use a .reset() method - don't use embedded RemoteSignerClient - guard RemoteSignerClient with mutex - drop the .conn - expose Close() on RemoteSignerClient * apply IPCVal refactor to TCPVal * remove mtx from RemoteSignerClient * consolidate IPCVal and TCPVal, fixes #3104 - done in tcp_client.go - now called SocketVal - takes a listener in the constructor - make tcpListener and unixListener contain all the differences * delete ipc files * introduce unix and tcp dialer for RemoteSigner * rename files - drop tcp_ prefix - rename priv_validator.go to file.go * bring back listener options * fix node * fix priv_val_server * fix node test * minor cleanup and comments
6 years ago
privval: improve Remote Signer implementation (#3351) This issue is related to #3107 This is a first renaming/refactoring step before reworking and removing heartbeats. As discussed with @Liamsi , we preferred to go for a couple of independent and separate PRs to simplify review work. The changes: Help to clarify the relation between the validator and remote signer endpoints Differentiate between timeouts and deadlines Prepare to encapsulate networking related code behind RemoteSigner in the next PR My intention is to separate and encapsulate the "network related" code from the actual signer. SignerRemote ---(uses/contains)--> SignerValidatorEndpoint <--(connects to)--> SignerServiceEndpoint ---> SignerService (future.. not here yet but would like to decouple too) All reconnection/heartbeat/whatever code goes in the endpoints. Signer[Remote/Service] do not need to know about that. I agree Endpoint may not be the perfect name. I tried to find something "Go-ish" enough. It is a common name in go-kit, kubernetes, etc. Right now: SignerValidatorEndpoint: handles the listener contains SignerRemote Implements the PrivValidator interface connects and sets a connection object in a contained SignerRemote delegates PrivValidator some calls to SignerRemote which in turn uses the conn object that was set externally SignerRemote: Implements the PrivValidator interface read/writes from a connection object directly handles heartbeats SignerServiceEndpoint: Does most things in a single place delegates to a PrivValidator IIRC. * cleanup * Refactoring step 1 * Refactoring step 2 * move messages to another file * mark for future work / next steps * mark deprecated classes in docs * Fix linter problems * additional linter fixes
6 years ago
Close and retry a RemoteSigner on err (#2923) * Close and recreate a RemoteSigner on err * Update changelog * Address Anton's comments / suggestions: - update changelog - restart TCPVal - shut down on `ErrUnexpectedResponse` * re-init remote signer client with fresh connection if Ping fails - add/update TODOs in secret connection - rename tcp.go -> tcp_client.go, same with ipc to clarify their purpose * account for `conn returned by waitConnection can be `nil` - also add TODO about RemoteSigner conn field * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn * Tests for retrying: IPC / TCP - shorter info log on success - set conn and use it in tests to close conn - add rwmutex for conn field in IPC * comments and doc.go * fix ipc tests. fixes #2677 * use constants for tests * cleanup some error statements * fixes #2784, race in tests * remove print statement * minor fixes from review * update comment on sts spec * cosmetics * p2p/conn: add failing tests * p2p/conn: make SecretConnection thread safe * changelog * IPCVal signer refactor - use a .reset() method - don't use embedded RemoteSignerClient - guard RemoteSignerClient with mutex - drop the .conn - expose Close() on RemoteSignerClient * apply IPCVal refactor to TCPVal * remove mtx from RemoteSignerClient * consolidate IPCVal and TCPVal, fixes #3104 - done in tcp_client.go - now called SocketVal - takes a listener in the constructor - make tcpListener and unixListener contain all the differences * delete ipc files * introduce unix and tcp dialer for RemoteSigner * rename files - drop tcp_ prefix - rename priv_validator.go to file.go * bring back listener options * fix node * fix priv_val_server * fix node test * minor cleanup and comments
6 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. _ "net/http/pprof"
  9. "os"
  10. "strings"
  11. "time"
  12. "github.com/pkg/errors"
  13. "github.com/prometheus/client_golang/prometheus"
  14. "github.com/prometheus/client_golang/prometheus/promhttp"
  15. "github.com/rs/cors"
  16. amino "github.com/tendermint/go-amino"
  17. abci "github.com/tendermint/tendermint/abci/types"
  18. bc "github.com/tendermint/tendermint/blockchain"
  19. cfg "github.com/tendermint/tendermint/config"
  20. cs "github.com/tendermint/tendermint/consensus"
  21. "github.com/tendermint/tendermint/crypto/ed25519"
  22. "github.com/tendermint/tendermint/evidence"
  23. cmn "github.com/tendermint/tendermint/libs/common"
  24. dbm "github.com/tendermint/tendermint/libs/db"
  25. "github.com/tendermint/tendermint/libs/log"
  26. tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
  27. mempl "github.com/tendermint/tendermint/mempool"
  28. "github.com/tendermint/tendermint/p2p"
  29. "github.com/tendermint/tendermint/p2p/pex"
  30. "github.com/tendermint/tendermint/privval"
  31. "github.com/tendermint/tendermint/proxy"
  32. rpccore "github.com/tendermint/tendermint/rpc/core"
  33. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  34. grpccore "github.com/tendermint/tendermint/rpc/grpc"
  35. rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
  36. sm "github.com/tendermint/tendermint/state"
  37. "github.com/tendermint/tendermint/state/txindex"
  38. "github.com/tendermint/tendermint/state/txindex/kv"
  39. "github.com/tendermint/tendermint/state/txindex/null"
  40. "github.com/tendermint/tendermint/types"
  41. tmtime "github.com/tendermint/tendermint/types/time"
  42. "github.com/tendermint/tendermint/version"
  43. )
  44. //------------------------------------------------------------------------------
  45. // DBContext specifies config information for loading a new DB.
  46. type DBContext struct {
  47. ID string
  48. Config *cfg.Config
  49. }
  50. // DBProvider takes a DBContext and returns an instantiated DB.
  51. type DBProvider func(*DBContext) (dbm.DB, error)
  52. // DefaultDBProvider returns a database using the DBBackend and DBDir
  53. // specified in the ctx.Config.
  54. func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
  55. dbType := dbm.DBBackendType(ctx.Config.DBBackend)
  56. return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()), nil
  57. }
  58. // GenesisDocProvider returns a GenesisDoc.
  59. // It allows the GenesisDoc to be pulled from sources other than the
  60. // filesystem, for instance from a distributed key-value store cluster.
  61. type GenesisDocProvider func() (*types.GenesisDoc, error)
  62. // DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
  63. // the GenesisDoc from the config.GenesisFile() on the filesystem.
  64. func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
  65. return func() (*types.GenesisDoc, error) {
  66. return types.GenesisDocFromFile(config.GenesisFile())
  67. }
  68. }
  69. // NodeProvider takes a config and a logger and returns a ready to go Node.
  70. type NodeProvider func(*cfg.Config, log.Logger) (*Node, error)
  71. // DefaultNewNode returns a Tendermint node with default settings for the
  72. // PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
  73. // It implements NodeProvider.
  74. func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
  75. // Generate node PrivKey
  76. nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
  77. if err != nil {
  78. return nil, err
  79. }
  80. // Convert old PrivValidator if it exists.
  81. oldPrivVal := config.OldPrivValidatorFile()
  82. newPrivValKey := config.PrivValidatorKeyFile()
  83. newPrivValState := config.PrivValidatorStateFile()
  84. if _, err := os.Stat(oldPrivVal); !os.IsNotExist(err) {
  85. oldPV, err := privval.LoadOldFilePV(oldPrivVal)
  86. if err != nil {
  87. return nil, fmt.Errorf("Error reading OldPrivValidator from %v: %v\n", oldPrivVal, err)
  88. }
  89. logger.Info("Upgrading PrivValidator file",
  90. "old", oldPrivVal,
  91. "newKey", newPrivValKey,
  92. "newState", newPrivValState,
  93. )
  94. oldPV.Upgrade(newPrivValKey, newPrivValState)
  95. }
  96. return NewNode(config,
  97. privval.LoadOrGenFilePV(newPrivValKey, newPrivValState),
  98. nodeKey,
  99. proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
  100. DefaultGenesisDocProviderFunc(config),
  101. DefaultDBProvider,
  102. DefaultMetricsProvider(config.Instrumentation),
  103. logger,
  104. )
  105. }
  106. // MetricsProvider returns a consensus, p2p and mempool Metrics.
  107. type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
  108. // DefaultMetricsProvider returns Metrics build using Prometheus client library
  109. // if Prometheus is enabled. Otherwise, it returns no-op Metrics.
  110. func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
  111. return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
  112. if config.Prometheus {
  113. return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
  114. p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
  115. mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
  116. sm.PrometheusMetrics(config.Namespace, "chain_id", chainID)
  117. }
  118. return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
  119. }
  120. }
  121. //------------------------------------------------------------------------------
  122. // Node is the highest level interface to a full Tendermint node.
  123. // It includes all configuration information and running services.
  124. type Node struct {
  125. cmn.BaseService
  126. // config
  127. config *cfg.Config
  128. genesisDoc *types.GenesisDoc // initial validator set
  129. privValidator types.PrivValidator // local node's validator key
  130. // network
  131. transport *p2p.MultiplexTransport
  132. sw *p2p.Switch // p2p connections
  133. addrBook pex.AddrBook // known peers
  134. nodeInfo p2p.NodeInfo
  135. nodeKey *p2p.NodeKey // our node privkey
  136. isListening bool
  137. // services
  138. eventBus *types.EventBus // pub/sub for services
  139. stateDB dbm.DB
  140. blockStore *bc.BlockStore // store the blockchain to disk
  141. bcReactor *bc.BlockchainReactor // for fast-syncing
  142. mempoolReactor *mempl.Reactor // for gossipping transactions
  143. mempool *mempl.CListMempool
  144. consensusState *cs.ConsensusState // latest consensus state
  145. consensusReactor *cs.ConsensusReactor // for participating in the consensus
  146. evidencePool *evidence.EvidencePool // tracking evidence
  147. proxyApp proxy.AppConns // connection to the application
  148. rpcListeners []net.Listener // rpc servers
  149. txIndexer txindex.TxIndexer
  150. indexerService *txindex.IndexerService
  151. prometheusSrv *http.Server
  152. }
  153. // NewNode returns a new, ready to go, Tendermint Node.
  154. func NewNode(config *cfg.Config,
  155. privValidator types.PrivValidator,
  156. nodeKey *p2p.NodeKey,
  157. clientCreator proxy.ClientCreator,
  158. genesisDocProvider GenesisDocProvider,
  159. dbProvider DBProvider,
  160. metricsProvider MetricsProvider,
  161. logger log.Logger) (*Node, error) {
  162. // Get BlockStore
  163. blockStoreDB, err := dbProvider(&DBContext{"blockstore", config})
  164. if err != nil {
  165. return nil, err
  166. }
  167. blockStore := bc.NewBlockStore(blockStoreDB)
  168. // Get State
  169. stateDB, err := dbProvider(&DBContext{"state", config})
  170. if err != nil {
  171. return nil, err
  172. }
  173. // Get genesis doc
  174. // TODO: move to state package?
  175. genDoc, err := loadGenesisDoc(stateDB)
  176. if err != nil {
  177. genDoc, err = genesisDocProvider()
  178. if err != nil {
  179. return nil, err
  180. }
  181. // save genesis doc to prevent a certain class of user errors (e.g. when it
  182. // was changed, accidentally or not). Also good for audit trail.
  183. saveGenesisDoc(stateDB, genDoc)
  184. }
  185. state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
  186. if err != nil {
  187. return nil, err
  188. }
  189. // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
  190. proxyApp := proxy.NewAppConns(clientCreator)
  191. proxyApp.SetLogger(logger.With("module", "proxy"))
  192. if err := proxyApp.Start(); err != nil {
  193. return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
  194. }
  195. // EventBus and IndexerService must be started before the handshake because
  196. // we might need to index the txs of the replayed block as this might not have happened
  197. // when the node stopped last time (i.e. the node stopped after it saved the block
  198. // but before it indexed the txs, or, endblocker panicked)
  199. eventBus := types.NewEventBus()
  200. eventBus.SetLogger(logger.With("module", "events"))
  201. err = eventBus.Start()
  202. if err != nil {
  203. return nil, err
  204. }
  205. // Transaction indexing
  206. var txIndexer txindex.TxIndexer
  207. switch config.TxIndex.Indexer {
  208. case "kv":
  209. store, err := dbProvider(&DBContext{"tx_index", config})
  210. if err != nil {
  211. return nil, err
  212. }
  213. if config.TxIndex.IndexTags != "" {
  214. txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
  215. } else if config.TxIndex.IndexAllTags {
  216. txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
  217. } else {
  218. txIndexer = kv.NewTxIndex(store)
  219. }
  220. default:
  221. txIndexer = &null.TxIndex{}
  222. }
  223. indexerService := txindex.NewIndexerService(txIndexer, eventBus)
  224. indexerService.SetLogger(logger.With("module", "txindex"))
  225. err = indexerService.Start()
  226. if err != nil {
  227. return nil, err
  228. }
  229. // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
  230. // and replays any blocks as necessary to sync tendermint with the app.
  231. consensusLogger := logger.With("module", "consensus")
  232. handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
  233. handshaker.SetLogger(consensusLogger)
  234. handshaker.SetEventBus(eventBus)
  235. if err := handshaker.Handshake(proxyApp); err != nil {
  236. return nil, fmt.Errorf("Error during handshake: %v", err)
  237. }
  238. // Reload the state. It will have the Version.Consensus.App set by the
  239. // Handshake, and may have other modifications as well (ie. depending on
  240. // what happened during block replay).
  241. state = sm.LoadState(stateDB)
  242. // Log the version info.
  243. logger.Info("Version info",
  244. "software", version.TMCoreSemVer,
  245. "block", version.BlockProtocol,
  246. "p2p", version.P2PProtocol,
  247. )
  248. // If the state and software differ in block version, at least log it.
  249. if state.Version.Consensus.Block != version.BlockProtocol {
  250. logger.Info("Software and state have different block protocols",
  251. "software", version.BlockProtocol,
  252. "state", state.Version.Consensus.Block,
  253. )
  254. }
  255. if config.PrivValidatorListenAddr != "" {
  256. // If an address is provided, listen on the socket for a connection from an
  257. // external signing process.
  258. // FIXME: we should start services inside OnStart
  259. privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger)
  260. if err != nil {
  261. return nil, errors.Wrap(err, "Error with private validator socket client")
  262. }
  263. }
  264. // Decide whether to fast-sync or not
  265. // We don't fast-sync when the only validator is us.
  266. fastSync := config.FastSync
  267. if state.Validators.Size() == 1 {
  268. addr, _ := state.Validators.GetByIndex(0)
  269. privValAddr := privValidator.GetPubKey().Address()
  270. if bytes.Equal(privValAddr, addr) {
  271. fastSync = false
  272. }
  273. }
  274. pubKey := privValidator.GetPubKey()
  275. addr := pubKey.Address()
  276. // Log whether this node is a validator or an observer
  277. if state.Validators.HasAddress(addr) {
  278. consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
  279. } else {
  280. consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
  281. }
  282. csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
  283. // Make Mempool Reactor
  284. mempool := mempl.NewCListMempool(
  285. config.Mempool,
  286. proxyApp.Mempool(),
  287. state.LastBlockHeight,
  288. mempl.WithMetrics(memplMetrics),
  289. mempl.WithPreCheck(sm.TxPreCheck(state)),
  290. mempl.WithPostCheck(sm.TxPostCheck(state)),
  291. )
  292. mempoolLogger := logger.With("module", "mempool")
  293. if config.Mempool.WalEnabled() {
  294. mempool.InitWAL() // no need to have the mempool wal during tests
  295. }
  296. mempoolReactor := mempl.NewReactor(config.Mempool, mempool)
  297. mempoolReactor.SetLogger(mempoolLogger)
  298. if config.Consensus.WaitForTxs() {
  299. mempool.EnableTxsAvailable()
  300. }
  301. // Make Evidence Reactor
  302. evidenceDB, err := dbProvider(&DBContext{"evidence", config})
  303. if err != nil {
  304. return nil, err
  305. }
  306. evidenceLogger := logger.With("module", "evidence")
  307. evidencePool := evidence.NewEvidencePool(stateDB, evidenceDB)
  308. evidencePool.SetLogger(evidenceLogger)
  309. evidenceReactor := evidence.NewEvidenceReactor(evidencePool)
  310. evidenceReactor.SetLogger(evidenceLogger)
  311. blockExecLogger := logger.With("module", "state")
  312. // make block executor for consensus and blockchain reactors to execute blocks
  313. blockExec := sm.NewBlockExecutor(
  314. stateDB,
  315. blockExecLogger,
  316. proxyApp.Consensus(),
  317. mempool,
  318. evidencePool,
  319. sm.BlockExecutorWithMetrics(smMetrics),
  320. )
  321. // Make BlockchainReactor
  322. bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
  323. bcReactor.SetLogger(logger.With("module", "blockchain"))
  324. // Make ConsensusReactor
  325. consensusState := cs.NewConsensusState(
  326. config.Consensus,
  327. state.Copy(),
  328. blockExec,
  329. blockStore,
  330. mempool,
  331. evidencePool,
  332. cs.StateMetrics(csMetrics),
  333. )
  334. consensusState.SetLogger(consensusLogger)
  335. if privValidator != nil {
  336. consensusState.SetPrivValidator(privValidator)
  337. }
  338. consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics))
  339. consensusReactor.SetLogger(consensusLogger)
  340. // services which will be publishing and/or subscribing for messages (events)
  341. // consensusReactor will set it on consensusState and blockExecutor
  342. consensusReactor.SetEventBus(eventBus)
  343. p2pLogger := logger.With("module", "p2p")
  344. nodeInfo, err := makeNodeInfo(
  345. config,
  346. nodeKey.ID(),
  347. txIndexer,
  348. genDoc.ChainID,
  349. p2p.NewProtocolVersion(
  350. version.P2PProtocol, // global
  351. state.Version.Consensus.Block,
  352. state.Version.Consensus.App,
  353. ),
  354. )
  355. if err != nil {
  356. return nil, err
  357. }
  358. // Setup Transport.
  359. var (
  360. mConnConfig = p2p.MConnConfig(config.P2P)
  361. transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig)
  362. connFilters = []p2p.ConnFilterFunc{}
  363. peerFilters = []p2p.PeerFilterFunc{}
  364. )
  365. if !config.P2P.AllowDuplicateIP {
  366. connFilters = append(connFilters, p2p.ConnDuplicateIPFilter())
  367. }
  368. // Filter peers by addr or pubkey with an ABCI query.
  369. // If the query return code is OK, add peer.
  370. if config.FilterPeers {
  371. connFilters = append(
  372. connFilters,
  373. // ABCI query for address filtering.
  374. func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
  375. res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
  376. Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
  377. })
  378. if err != nil {
  379. return err
  380. }
  381. if res.IsErr() {
  382. return fmt.Errorf("Error querying abci app: %v", res)
  383. }
  384. return nil
  385. },
  386. )
  387. peerFilters = append(
  388. peerFilters,
  389. // ABCI query for ID filtering.
  390. func(_ p2p.IPeerSet, p p2p.Peer) error {
  391. res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
  392. Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
  393. })
  394. if err != nil {
  395. return err
  396. }
  397. if res.IsErr() {
  398. return fmt.Errorf("Error querying abci app: %v", res)
  399. }
  400. return nil
  401. },
  402. )
  403. }
  404. p2p.MultiplexTransportConnFilters(connFilters...)(transport)
  405. // Setup Switch.
  406. sw := p2p.NewSwitch(
  407. config.P2P,
  408. transport,
  409. p2p.WithMetrics(p2pMetrics),
  410. p2p.SwitchPeerFilters(peerFilters...),
  411. )
  412. sw.SetLogger(p2pLogger)
  413. sw.AddReactor("MEMPOOL", mempoolReactor)
  414. sw.AddReactor("BLOCKCHAIN", bcReactor)
  415. sw.AddReactor("CONSENSUS", consensusReactor)
  416. sw.AddReactor("EVIDENCE", evidenceReactor)
  417. sw.SetNodeInfo(nodeInfo)
  418. sw.SetNodeKey(nodeKey)
  419. p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
  420. // Optionally, start the pex reactor
  421. //
  422. // TODO:
  423. //
  424. // We need to set Seeds and PersistentPeers on the switch,
  425. // since it needs to be able to use these (and their DNS names)
  426. // even if the PEX is off. We can include the DNS name in the NetAddress,
  427. // but it would still be nice to have a clear list of the current "PersistentPeers"
  428. // somewhere that we can return with net_info.
  429. //
  430. // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
  431. // Note we currently use the addrBook regardless at least for AddOurAddress
  432. addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  433. // Add ourselves to addrbook to prevent dialing ourselves
  434. addrBook.AddOurAddress(sw.NetAddress())
  435. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  436. if config.P2P.PexReactor {
  437. // TODO persistent peers ? so we can have their DNS addrs saved
  438. pexReactor := pex.NewPEXReactor(addrBook,
  439. &pex.PEXReactorConfig{
  440. Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
  441. SeedMode: config.P2P.SeedMode,
  442. // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
  443. // blocks assuming 10s blocks ~ 28 hours.
  444. // TODO (melekes): make it dynamic based on the actual block latencies
  445. // from the live network.
  446. // https://github.com/tendermint/tendermint/issues/3523
  447. SeedDisconnectWaitPeriod: 28 * time.Hour,
  448. })
  449. pexReactor.SetLogger(logger.With("module", "pex"))
  450. sw.AddReactor("PEX", pexReactor)
  451. }
  452. sw.SetAddrBook(addrBook)
  453. // run the profile server
  454. profileHost := config.ProfListenAddress
  455. if profileHost != "" {
  456. go func() {
  457. logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil))
  458. }()
  459. }
  460. node := &Node{
  461. config: config,
  462. genesisDoc: genDoc,
  463. privValidator: privValidator,
  464. transport: transport,
  465. sw: sw,
  466. addrBook: addrBook,
  467. nodeInfo: nodeInfo,
  468. nodeKey: nodeKey,
  469. stateDB: stateDB,
  470. blockStore: blockStore,
  471. bcReactor: bcReactor,
  472. mempoolReactor: mempoolReactor,
  473. mempool: mempool,
  474. consensusState: consensusState,
  475. consensusReactor: consensusReactor,
  476. evidencePool: evidencePool,
  477. proxyApp: proxyApp,
  478. txIndexer: txIndexer,
  479. indexerService: indexerService,
  480. eventBus: eventBus,
  481. }
  482. node.BaseService = *cmn.NewBaseService(logger, "Node", node)
  483. return node, nil
  484. }
  485. // OnStart starts the Node. It implements cmn.Service.
  486. func (n *Node) OnStart() error {
  487. now := tmtime.Now()
  488. genTime := n.genesisDoc.GenesisTime
  489. if genTime.After(now) {
  490. n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
  491. time.Sleep(genTime.Sub(now))
  492. }
  493. // Add private IDs to addrbook to block those peers being added
  494. n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
  495. // Start the RPC server before the P2P server
  496. // so we can eg. receive txs for the first block
  497. if n.config.RPC.ListenAddress != "" {
  498. listeners, err := n.startRPC()
  499. if err != nil {
  500. return err
  501. }
  502. n.rpcListeners = listeners
  503. }
  504. if n.config.Instrumentation.Prometheus &&
  505. n.config.Instrumentation.PrometheusListenAddr != "" {
  506. n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
  507. }
  508. // Start the transport.
  509. addr, err := p2p.NewNetAddressStringWithOptionalID(n.config.P2P.ListenAddress)
  510. if err != nil {
  511. return err
  512. }
  513. if err := n.transport.Listen(*addr); err != nil {
  514. return err
  515. }
  516. n.isListening = true
  517. // Start the switch (the P2P server).
  518. err = n.sw.Start()
  519. if err != nil {
  520. return err
  521. }
  522. // Always connect to persistent peers
  523. if n.config.P2P.PersistentPeers != "" {
  524. err = n.sw.DialPeersAsync(n.addrBook, splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "), true)
  525. if err != nil {
  526. return err
  527. }
  528. }
  529. return nil
  530. }
  531. // OnStop stops the Node. It implements cmn.Service.
  532. func (n *Node) OnStop() {
  533. n.BaseService.OnStop()
  534. n.Logger.Info("Stopping Node")
  535. // first stop the non-reactor services
  536. n.eventBus.Stop()
  537. n.indexerService.Stop()
  538. // now stop the reactors
  539. // TODO: gracefully disconnect from peers.
  540. n.sw.Stop()
  541. // stop mempool WAL
  542. if n.config.Mempool.WalEnabled() {
  543. n.mempool.CloseWAL()
  544. }
  545. if err := n.transport.Close(); err != nil {
  546. n.Logger.Error("Error closing transport", "err", err)
  547. }
  548. n.isListening = false
  549. // finally stop the listeners / external services
  550. for _, l := range n.rpcListeners {
  551. n.Logger.Info("Closing rpc listener", "listener", l)
  552. if err := l.Close(); err != nil {
  553. n.Logger.Error("Error closing listener", "listener", l, "err", err)
  554. }
  555. }
  556. if pvsc, ok := n.privValidator.(cmn.Service); ok {
  557. pvsc.Stop()
  558. }
  559. if n.prometheusSrv != nil {
  560. if err := n.prometheusSrv.Shutdown(context.Background()); err != nil {
  561. // Error from closing listeners, or context timeout:
  562. n.Logger.Error("Prometheus HTTP server Shutdown", "err", err)
  563. }
  564. }
  565. }
  566. // ConfigureRPC sets all variables in rpccore so they will serve
  567. // rpc calls from this node
  568. func (n *Node) ConfigureRPC() {
  569. rpccore.SetStateDB(n.stateDB)
  570. rpccore.SetBlockStore(n.blockStore)
  571. rpccore.SetConsensusState(n.consensusState)
  572. rpccore.SetMempool(n.mempool)
  573. rpccore.SetEvidencePool(n.evidencePool)
  574. rpccore.SetP2PPeers(n.sw)
  575. rpccore.SetP2PTransport(n)
  576. pubKey := n.privValidator.GetPubKey()
  577. rpccore.SetPubKey(pubKey)
  578. rpccore.SetGenesisDoc(n.genesisDoc)
  579. rpccore.SetAddrBook(n.addrBook)
  580. rpccore.SetProxyAppQuery(n.proxyApp.Query())
  581. rpccore.SetTxIndexer(n.txIndexer)
  582. rpccore.SetConsensusReactor(n.consensusReactor)
  583. rpccore.SetEventBus(n.eventBus)
  584. rpccore.SetLogger(n.Logger.With("module", "rpc"))
  585. rpccore.SetConfig(*n.config.RPC)
  586. }
  587. func (n *Node) startRPC() ([]net.Listener, error) {
  588. n.ConfigureRPC()
  589. listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ")
  590. coreCodec := amino.NewCodec()
  591. ctypes.RegisterAmino(coreCodec)
  592. if n.config.RPC.Unsafe {
  593. rpccore.AddUnsafeRoutes()
  594. }
  595. // we may expose the rpc over both a unix and tcp socket
  596. listeners := make([]net.Listener, len(listenAddrs))
  597. for i, listenAddr := range listenAddrs {
  598. mux := http.NewServeMux()
  599. rpcLogger := n.Logger.With("module", "rpc-server")
  600. wmLogger := rpcLogger.With("protocol", "websocket")
  601. wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec,
  602. rpcserver.OnDisconnect(func(remoteAddr string) {
  603. err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
  604. if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
  605. wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
  606. }
  607. }))
  608. wm.SetLogger(wmLogger)
  609. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  610. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)
  611. config := rpcserver.DefaultConfig()
  612. config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
  613. // If necessary adjust global WriteTimeout to ensure it's greater than
  614. // TimeoutBroadcastTxCommit.
  615. // See https://github.com/tendermint/tendermint/issues/3435
  616. if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
  617. config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
  618. }
  619. listener, err := rpcserver.Listen(
  620. listenAddr,
  621. config,
  622. )
  623. if err != nil {
  624. return nil, err
  625. }
  626. var rootHandler http.Handler = mux
  627. if n.config.RPC.IsCorsEnabled() {
  628. corsMiddleware := cors.New(cors.Options{
  629. AllowedOrigins: n.config.RPC.CORSAllowedOrigins,
  630. AllowedMethods: n.config.RPC.CORSAllowedMethods,
  631. AllowedHeaders: n.config.RPC.CORSAllowedHeaders,
  632. })
  633. rootHandler = corsMiddleware.Handler(mux)
  634. }
  635. if n.config.RPC.IsTLSEnabled() {
  636. go rpcserver.StartHTTPAndTLSServer(
  637. listener,
  638. rootHandler,
  639. n.config.RPC.CertFile(),
  640. n.config.RPC.KeyFile(),
  641. rpcLogger,
  642. config,
  643. )
  644. } else {
  645. go rpcserver.StartHTTPServer(
  646. listener,
  647. rootHandler,
  648. rpcLogger,
  649. config,
  650. )
  651. }
  652. listeners[i] = listener
  653. }
  654. // we expose a simplified api over grpc for convenience to app devs
  655. grpcListenAddr := n.config.RPC.GRPCListenAddress
  656. if grpcListenAddr != "" {
  657. config := rpcserver.DefaultConfig()
  658. config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
  659. listener, err := rpcserver.Listen(grpcListenAddr, config)
  660. if err != nil {
  661. return nil, err
  662. }
  663. go grpccore.StartGRPCServer(listener)
  664. listeners = append(listeners, listener)
  665. }
  666. return listeners, nil
  667. }
  668. // startPrometheusServer starts a Prometheus HTTP server, listening for metrics
  669. // collectors on addr.
  670. func (n *Node) startPrometheusServer(addr string) *http.Server {
  671. srv := &http.Server{
  672. Addr: addr,
  673. Handler: promhttp.InstrumentMetricHandler(
  674. prometheus.DefaultRegisterer, promhttp.HandlerFor(
  675. prometheus.DefaultGatherer,
  676. promhttp.HandlerOpts{MaxRequestsInFlight: n.config.Instrumentation.MaxOpenConnections},
  677. ),
  678. ),
  679. }
  680. go func() {
  681. if err := srv.ListenAndServe(); err != http.ErrServerClosed {
  682. // Error starting or closing listener:
  683. n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
  684. }
  685. }()
  686. return srv
  687. }
  688. // Switch returns the Node's Switch.
  689. func (n *Node) Switch() *p2p.Switch {
  690. return n.sw
  691. }
  692. // BlockStore returns the Node's BlockStore.
  693. func (n *Node) BlockStore() *bc.BlockStore {
  694. return n.blockStore
  695. }
  696. // ConsensusState returns the Node's ConsensusState.
  697. func (n *Node) ConsensusState() *cs.ConsensusState {
  698. return n.consensusState
  699. }
  700. // ConsensusReactor returns the Node's ConsensusReactor.
  701. func (n *Node) ConsensusReactor() *cs.ConsensusReactor {
  702. return n.consensusReactor
  703. }
  704. // MempoolReactor returns the Node's mempool reactor.
  705. func (n *Node) MempoolReactor() *mempl.Reactor {
  706. return n.mempoolReactor
  707. }
  708. // Mempool returns the Node's mempool.
  709. func (n *Node) Mempool() *mempl.CListMempool {
  710. return n.mempool
  711. }
  712. // EvidencePool returns the Node's EvidencePool.
  713. func (n *Node) EvidencePool() *evidence.EvidencePool {
  714. return n.evidencePool
  715. }
  716. // EventBus returns the Node's EventBus.
  717. func (n *Node) EventBus() *types.EventBus {
  718. return n.eventBus
  719. }
  720. // PrivValidator returns the Node's PrivValidator.
  721. // XXX: for convenience only!
  722. func (n *Node) PrivValidator() types.PrivValidator {
  723. return n.privValidator
  724. }
  725. // GenesisDoc returns the Node's GenesisDoc.
  726. func (n *Node) GenesisDoc() *types.GenesisDoc {
  727. return n.genesisDoc
  728. }
  729. // ProxyApp returns the Node's AppConns, representing its connections to the ABCI application.
  730. func (n *Node) ProxyApp() proxy.AppConns {
  731. return n.proxyApp
  732. }
  733. // Config returns the Node's config.
  734. func (n *Node) Config() *cfg.Config {
  735. return n.config
  736. }
  737. //------------------------------------------------------------------------------
  738. func (n *Node) Listeners() []string {
  739. return []string{
  740. fmt.Sprintf("Listener(@%v)", n.config.P2P.ExternalAddress),
  741. }
  742. }
  743. func (n *Node) IsListening() bool {
  744. return n.isListening
  745. }
  746. // NodeInfo returns the Node's Info from the Switch.
  747. func (n *Node) NodeInfo() p2p.NodeInfo {
  748. return n.nodeInfo
  749. }
  750. func makeNodeInfo(
  751. config *cfg.Config,
  752. nodeID p2p.ID,
  753. txIndexer txindex.TxIndexer,
  754. chainID string,
  755. protocolVersion p2p.ProtocolVersion,
  756. ) (p2p.NodeInfo, error) {
  757. txIndexerStatus := "on"
  758. if _, ok := txIndexer.(*null.TxIndex); ok {
  759. txIndexerStatus = "off"
  760. }
  761. nodeInfo := p2p.DefaultNodeInfo{
  762. ProtocolVersion: protocolVersion,
  763. ID_: nodeID,
  764. Network: chainID,
  765. Version: version.TMCoreSemVer,
  766. Channels: []byte{
  767. bc.BlockchainChannel,
  768. cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
  769. mempl.MempoolChannel,
  770. evidence.EvidenceChannel,
  771. },
  772. Moniker: config.Moniker,
  773. Other: p2p.DefaultNodeInfoOther{
  774. TxIndex: txIndexerStatus,
  775. RPCAddress: config.RPC.ListenAddress,
  776. },
  777. }
  778. if config.P2P.PexReactor {
  779. nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
  780. }
  781. lAddr := config.P2P.ExternalAddress
  782. if lAddr == "" {
  783. lAddr = config.P2P.ListenAddress
  784. }
  785. nodeInfo.ListenAddr = lAddr
  786. err := nodeInfo.Validate()
  787. return nodeInfo, err
  788. }
  789. //------------------------------------------------------------------------------
  790. var (
  791. genesisDocKey = []byte("genesisDoc")
  792. )
  793. // panics if failed to unmarshal bytes
  794. func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) {
  795. bytes := db.Get(genesisDocKey)
  796. if len(bytes) == 0 {
  797. return nil, errors.New("Genesis doc not found")
  798. }
  799. var genDoc *types.GenesisDoc
  800. err := cdc.UnmarshalJSON(bytes, &genDoc)
  801. if err != nil {
  802. cmn.PanicCrisis(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, bytes))
  803. }
  804. return genDoc, nil
  805. }
  806. // panics if failed to marshal the given genesis document
  807. func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
  808. bytes, err := cdc.MarshalJSON(genDoc)
  809. if err != nil {
  810. cmn.PanicCrisis(fmt.Sprintf("Failed to save genesis doc due to marshaling error: %v", err))
  811. }
  812. db.SetSync(genesisDocKey, bytes)
  813. }
  814. func createAndStartPrivValidatorSocketClient(
  815. listenAddr string,
  816. logger log.Logger,
  817. ) (types.PrivValidator, error) {
  818. var listener net.Listener
  819. protocol, address := cmn.ProtocolAndAddress(listenAddr)
  820. ln, err := net.Listen(protocol, address)
  821. if err != nil {
  822. return nil, err
  823. }
  824. switch protocol {
  825. case "unix":
  826. listener = privval.NewUnixListener(ln)
  827. case "tcp":
  828. // TODO: persist this key so external signer
  829. // can actually authenticate us
  830. listener = privval.NewTCPListener(ln, ed25519.GenPrivKey())
  831. default:
  832. return nil, fmt.Errorf(
  833. "Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
  834. protocol,
  835. )
  836. }
  837. pvsc := privval.NewSignerValidatorEndpoint(logger.With("module", "privval"), listener)
  838. if err := pvsc.Start(); err != nil {
  839. return nil, errors.Wrap(err, "failed to start private validator")
  840. }
  841. return pvsc, nil
  842. }
  843. // splitAndTrimEmpty slices s into all subslices separated by sep and returns a
  844. // slice of the string s with all leading and trailing Unicode code points
  845. // contained in cutset removed. If sep is empty, SplitAndTrim splits after each
  846. // UTF-8 sequence. First part is equivalent to strings.SplitN with a count of
  847. // -1. also filter out empty strings, only return non-empty strings.
  848. func splitAndTrimEmpty(s, sep, cutset string) []string {
  849. if s == "" {
  850. return []string{}
  851. }
  852. spl := strings.Split(s, sep)
  853. nonEmptyStrings := make([]string, 0, len(spl))
  854. for i := 0; i < len(spl); i++ {
  855. element := strings.Trim(spl[i], cutset)
  856. if element != "" {
  857. nonEmptyStrings = append(nonEmptyStrings, element)
  858. }
  859. }
  860. return nonEmptyStrings
  861. }