diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 233d11887..6c25ef89e 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -24,6 +24,8 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - [fastsync/rpc] \#6620 Add TotalSyncedTime & RemainingTime to SyncInfo in /status RPC (@JayT106) - [rpc/grpc] \#6725 Mark gRPC in the RPC layer as deprecated. - [blockchain/v2] \#6730 Fast Sync v2 is deprecated, please use v0 + - [rpc] Add genesis_chunked method to support paginated and parallel fetching of large genesis documents. + - [rpc/jsonrpc/server] \#6785 `Listen` function updated to take an `int` argument, `maxOpenConnections`, instead of an entire config object. (@williambanfield) - [rpc] \#6820 Update RPC methods to reflect changes in the p2p layer, disabling support for `UnsafeDialPeers` and `UnsafeDialPeers` when used with the new p2p layer, and changing the response format of the peer list in `NetInfo` for all users. - [cli] \#6854 Remove deprecated snake case commands. (@tychoish) - Apps @@ -105,6 +107,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - [config/indexer] \#6411 Introduce support for custom event indexing data sources, specifically PostgreSQL. (@JayT106) - [fastsync/event] \#6619 Emit fastsync status event when switching consensus/fastsync (@JayT106) - [statesync/event] \#6700 Emit statesync status start/end event (@JayT106) +- [inspect] \#6785 Add a new `inspect` command for introspecting the state and block store of a crashed tendermint node. (@williambanfield) ### IMPROVEMENTS diff --git a/cmd/tendermint/commands/inspect.go b/cmd/tendermint/commands/inspect.go new file mode 100644 index 000000000..de31d33d4 --- /dev/null +++ b/cmd/tendermint/commands/inspect.go @@ -0,0 +1,87 @@ +package commands + +import ( + "context" + "os" + "os/signal" + "syscall" + + "github.com/spf13/cobra" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/inspect" + "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer/sink" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" +) + +// InspectCmd is the command for starting an inspect server. +var InspectCmd = &cobra.Command{ + Use: "inspect", + Short: "Run an inspect server for investigating Tendermint state", + Long: ` + inspect runs a subset of Tendermint's RPC endpoints that are useful for debugging + issues with Tendermint. + + When the Tendermint consensus engine detects inconsistent state, it will crash the + tendermint process. Tendermint will not start up while in this inconsistent state. + The inspect command can be used to query the block and state store using Tendermint + RPC calls to debug issues of inconsistent state. + `, + + RunE: runInspect, +} + +func init() { + InspectCmd.Flags(). + String("rpc.laddr", + config.RPC.ListenAddress, "RPC listenener address. Port required") + InspectCmd.Flags(). + String("db-backend", + config.DBBackend, "database backend: goleveldb | cleveldb | boltdb | rocksdb | badgerdb") + InspectCmd.Flags(). + String("db-dir", config.DBPath, "database directory") +} + +func runInspect(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) + go func() { + <-c + cancel() + }() + + blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "blockstore", Config: config}) + if err != nil { + return err + } + blockStore := store.NewBlockStore(blockStoreDB) + stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "state", Config: config}) + if err != nil { + if err := blockStoreDB.Close(); err != nil { + logger.Error("error closing block store db", "error", err) + } + return err + } + genDoc, err := types.GenesisDocFromFile(config.GenesisFile()) + if err != nil { + return err + } + sinks, err := sink.EventSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID) + if err != nil { + return err + } + stateStore := state.NewStore(stateDB) + + ins := inspect.New(config.RPC, blockStore, stateStore, sinks, logger) + + logger.Info("starting inspect server") + if err := ins.Run(ctx); err != nil { + return err + } + return nil +} diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index b092f5645..c006c297d 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -28,6 +28,7 @@ func main() { cmd.ShowNodeIDCmd, cmd.GenNodeKeyCmd, cmd.VersionCmd, + cmd.InspectCmd, cmd.MakeKeyMigrateCommand(), debug.DebugCmd, cli.NewCompletionCmd(rootCmd, true), diff --git a/docs/tools/debugging/README.md b/docs/tools/debugging/README.md index 2932f6e86..053b43624 100644 --- a/docs/tools/debugging/README.md +++ b/docs/tools/debugging/README.md @@ -62,3 +62,30 @@ given destination directory. Each archive will contain: Note: goroutine.out and heap.out will only be written if a profile address is provided and is operational. This command is blocking and will log any error. + +## Tendermint Inspect + +Tendermint includes an `inspect` command for querying Tendermint's state store and block +store over Tendermint RPC. + +When the Tendermint consensus engine detects inconsistent state, it will crash the +entire Tendermint process. +While in this inconsistent state, a node running Tendermint's consensus engine will not start up. +The `inspect` command runs only a subset of Tendermint's RPC endpoints for querying the block store +and state store. +`inspect` allows operators to query a read-only view of the stage. +`inspect` does not run the consensus engine at all and can therefore be used to debug +processes that have crashed due to inconsistent state. + + +To start the `inspect` process, run +```bash +tendermint inspect +``` + +### RPC endpoints +The list of available RPC endpoints can be found by making a request to the RPC port. +For an `inspect` process running on `127.0.0.1:26657`, navigate your browser to +`http://127.0.0.1:26657/` to retrieve the list of enabled RPC endpoints. + +Additional information on the Tendermint RPC endpoints can be found in the [rpc documentation](https://docs.tendermint.com/master/rpc). diff --git a/docs/tools/debugging/pro.md b/docs/tools/debugging/pro.md index 3342deb49..b43ed5cba 100644 --- a/docs/tools/debugging/pro.md +++ b/docs/tools/debugging/pro.md @@ -64,13 +64,42 @@ It won’t kill the node, but it will gather all of the above data and package i At this point, depending on how severe the degradation is, you may want to restart the process. +## Tendermint Inspect + +What if the Tendermint node will not start up due to inconsistent consensus state? + +When a node running the Tendermint consensus engine detects an inconsistent state +it will crash the entire Tendermint process. +The Tendermint consensus engine cannot be run in this inconsistent state and the so node +will fail to start up as a result. +The Tendermint RPC server can provide valuable information for debugging in this situation. +The Tendermint `inspect` command will run a subset of the Tendermint RPC server +that is useful for debugging inconsistent state. + +### Running inspect + +Start up the `inspect` tool on the machine where Tendermint crashed using: +```bash +tendermint inspect --home= +``` + +`inspect` will use the data directory specified in your Tendermint configuration file. +`inspect` will also run the RPC server at the address specified in your Tendermint configuration file. + +### Using inspect + +With the `inspect` server running, you can access RPC endpoints that are critically important +for debugging. +Calling the `/status`, `/consensus_state` and `/dump_consensus_state` RPC endpoint +will return useful information about the Tendermint consensus state. + ## Outro -We’re hoping that the `tendermint debug` subcommand will become de facto the first response to any accidents. +We’re hoping that these Tendermint tools will become de facto the first response for any accidents. -Let us know what your experience has been so far! Have you had a chance to try `tendermint debug` yet? +Let us know what your experience has been so far! Have you had a chance to try `tendermint debug` or `tendermint inspect` yet? -Join our chat, where we discuss the current issues and future improvements. +Join our [discord chat](https://discord.gg/vcExX9T), where we discuss the current issues and future improvements. — diff --git a/go.mod b/go.mod index 35272bea1..c7723b096 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/vektra/mockery/v2 v2.9.0 golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect google.golang.org/grpc v1.40.0 gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect pgregory.net/rapid v0.4.7 diff --git a/go.sum b/go.sum index 74f189d5c..fff11dad3 100644 --- a/go.sum +++ b/go.sum @@ -1075,6 +1075,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/inspect/doc.go b/inspect/doc.go new file mode 100644 index 000000000..c53049e1a --- /dev/null +++ b/inspect/doc.go @@ -0,0 +1,36 @@ +/* +Package inspect provides a tool for investigating the state of a +failed Tendermint node. + +This package provides the Inspector type. The Inspector type runs a subset of the Tendermint +RPC endpoints that are useful for debugging issues with Tendermint consensus. + +When a node running the Tendermint consensus engine detects an inconsistent consensus state, +the entire node will crash. The Tendermint consensus engine cannot run in this +inconsistent state so the node will not be able to start up again. + +The RPC endpoints provided by the Inspector type allow for a node operator to inspect +the block store and state store to better understand what may have caused the inconsistent state. + + +The Inspector type's lifecycle is controlled by a context.Context + ins := inspect.NewFromConfig(rpcConfig) + ctx, cancelFunc:= context.WithCancel(context.Background()) + + // Run blocks until the Inspector server is shut down. + go ins.Run(ctx) + ... + + // calling the cancel function will stop the running inspect server + cancelFunc() + +Inspector serves its RPC endpoints on the address configured in the RPC configuration + + rpcConfig.ListenAddress = "tcp://127.0.0.1:26657" + ins := inspect.NewFromConfig(rpcConfig) + go ins.Run(ctx) + +The list of available RPC endpoints can then be viewed by navigating to +http://127.0.0.1:26657/ in the web browser. +*/ +package inspect diff --git a/inspect/inspect.go b/inspect/inspect.go new file mode 100644 index 000000000..38bc9ed5d --- /dev/null +++ b/inspect/inspect.go @@ -0,0 +1,149 @@ +package inspect + +import ( + "context" + "errors" + "fmt" + "net" + + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/inspect/rpc" + "github.com/tendermint/tendermint/libs/log" + tmstrings "github.com/tendermint/tendermint/libs/strings" + rpccore "github.com/tendermint/tendermint/rpc/core" + "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/state/indexer/sink" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" + + "golang.org/x/sync/errgroup" +) + +// Inspector manages an RPC service that exports methods to debug a failed node. +// After a node shuts down due to a consensus failure, it will no longer start +// up its state cannot easily be inspected. An Inspector value provides a similar interface +// to the node, using the underlying Tendermint data stores, without bringing up +// any other components. A caller can query the Inspector service to inspect the +// persisted state and debug the failure. +type Inspector struct { + routes rpccore.RoutesMap + + config *config.RPCConfig + + indexerService *indexer.Service + eventBus *types.EventBus + logger log.Logger +} + +// New returns an Inspector that serves RPC on the specified BlockStore and StateStore. +// The Inspector type does not modify the state or block stores. +// The sinks are used to enable block and transaction querying via the RPC server. +// The caller is responsible for starting and stopping the Inspector service. +/// +//nolint:lll +func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, es []indexer.EventSink, logger log.Logger) *Inspector { + routes := rpc.Routes(*cfg, ss, bs, es, logger) + eb := types.NewEventBus() + eb.SetLogger(logger.With("module", "events")) + is := indexer.NewIndexerService(es, eb) + is.SetLogger(logger.With("module", "txindex")) + return &Inspector{ + routes: routes, + config: cfg, + logger: logger, + eventBus: eb, + indexerService: is, + } +} + +// NewFromConfig constructs an Inspector using the values defined in the passed in config. +func NewFromConfig(cfg *config.Config) (*Inspector, error) { + bsDB, err := config.DefaultDBProvider(&config.DBContext{ID: "blockstore", Config: cfg}) + if err != nil { + return nil, err + } + bs := store.NewBlockStore(bsDB) + sDB, err := config.DefaultDBProvider(&config.DBContext{ID: "state", Config: cfg}) + if err != nil { + return nil, err + } + genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile()) + if err != nil { + return nil, err + } + sinks, err := sink.EventSinksFromConfig(cfg, config.DefaultDBProvider, genDoc.ChainID) + if err != nil { + return nil, err + } + logger := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false) + ss := state.NewStore(sDB) + return New(cfg.RPC, bs, ss, sinks, logger), nil +} + +// Run starts the Inspector servers and blocks until the servers shut down. The passed +// in context is used to control the lifecycle of the servers. +func (ins *Inspector) Run(ctx context.Context) error { + err := ins.eventBus.Start() + if err != nil { + return fmt.Errorf("error starting event bus: %s", err) + } + defer func() { + err := ins.eventBus.Stop() + if err != nil { + ins.logger.Error("event bus stopped with error", "err", err) + } + }() + err = ins.indexerService.Start() + if err != nil { + return fmt.Errorf("error starting indexer service: %s", err) + } + defer func() { + err := ins.indexerService.Stop() + if err != nil { + ins.logger.Error("indexer service stopped with error", "err", err) + } + }() + return startRPCServers(ctx, ins.config, ins.logger, ins.routes) +} + +func startRPCServers(ctx context.Context, cfg *config.RPCConfig, logger log.Logger, routes rpccore.RoutesMap) error { + g, tctx := errgroup.WithContext(ctx) + listenAddrs := tmstrings.SplitAndTrimEmpty(cfg.ListenAddress, ",", " ") + rh := rpc.Handler(cfg, routes, logger) + for _, listenerAddr := range listenAddrs { + server := rpc.Server{ + Logger: logger, + Config: cfg, + Handler: rh, + Addr: listenerAddr, + } + if cfg.IsTLSEnabled() { + keyFile := cfg.KeyFile() + certFile := cfg.CertFile() + listenerAddr := listenerAddr + g.Go(func() error { + logger.Info("RPC HTTPS server starting", "address", listenerAddr, + "certfile", certFile, "keyfile", keyFile) + err := server.ListenAndServeTLS(tctx, certFile, keyFile) + if !errors.Is(err, net.ErrClosed) { + return err + } + logger.Info("RPC HTTPS server stopped", "address", listenerAddr) + return nil + }) + } else { + listenerAddr := listenerAddr + g.Go(func() error { + logger.Info("RPC HTTP server starting", "address", listenerAddr) + err := server.ListenAndServe(tctx) + if !errors.Is(err, net.ErrClosed) { + return err + } + logger.Info("RPC HTTP server stopped", "address", listenerAddr) + return nil + }) + } + } + return g.Wait() +} diff --git a/inspect/inspect_test.go b/inspect/inspect_test.go new file mode 100644 index 000000000..c2a1df571 --- /dev/null +++ b/inspect/inspect_test.go @@ -0,0 +1,583 @@ +package inspect_test + +import ( + "context" + "fmt" + "net" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + abcitypes "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/inspect" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/proto/tendermint/state" + httpclient "github.com/tendermint/tendermint/rpc/client/http" + "github.com/tendermint/tendermint/state/indexer" + indexermocks "github.com/tendermint/tendermint/state/indexer/mocks" + statemocks "github.com/tendermint/tendermint/state/mocks" + "github.com/tendermint/tendermint/types" +) + +func TestInspectConstructor(t *testing.T) { + cfg := config.ResetTestRoot("test") + t.Cleanup(leaktest.Check(t)) + defer func() { _ = os.RemoveAll(cfg.RootDir) }() + t.Run("from config", func(t *testing.T) { + d, err := inspect.NewFromConfig(cfg) + require.NoError(t, err) + require.NotNil(t, d) + }) + +} + +func TestInspectRun(t *testing.T) { + cfg := config.ResetTestRoot("test") + t.Cleanup(leaktest.Check(t)) + defer func() { _ = os.RemoveAll(cfg.RootDir) }() + t.Run("from config", func(t *testing.T) { + d, err := inspect.NewFromConfig(cfg) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + stoppedWG := &sync.WaitGroup{} + stoppedWG.Add(1) + go func() { + require.NoError(t, d.Run(ctx)) + stoppedWG.Done() + }() + cancel() + stoppedWG.Wait() + }) + +} + +func TestBlock(t *testing.T) { + testHeight := int64(1) + testBlock := new(types.Block) + testBlock.Header.Height = testHeight + testBlock.Header.LastCommitHash = []byte("test hash") + stateStoreMock := &statemocks.Store{} + + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Height").Return(testHeight) + blockStoreMock.On("Base").Return(int64(0)) + blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{}) + blockStoreMock.On("LoadBlock", testHeight).Return(testBlock) + eventSinkMock := &indexermocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) + + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress) + require.NoError(t, err) + resultBlock, err := cli.Block(context.Background(), &testHeight) + require.NoError(t, err) + require.Equal(t, testBlock.Height, resultBlock.Block.Height) + require.Equal(t, testBlock.LastCommitHash, resultBlock.Block.LastCommitHash) + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestTxSearch(t *testing.T) { + testHash := []byte("test") + testTx := []byte("tx") + testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash)) + testTxResult := &abcitypes.TxResult{ + Height: 1, + Index: 100, + Tx: testTx, + } + + stateStoreMock := &statemocks.Store{} + blockStoreMock := &statemocks.BlockStore{} + eventSinkMock := &indexermocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) + eventSinkMock.On("Type").Return(indexer.KV) + eventSinkMock.On("SearchTxEvents", mock.Anything, + mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })). + Return([]*abcitypes.TxResult{testTxResult}, nil) + + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress) + require.NoError(t, err) + + var page = 1 + resultTxSearch, err := cli.TxSearch(context.Background(), testQuery, false, &page, &page, "") + require.NoError(t, err) + require.Len(t, resultTxSearch.Txs, 1) + require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx) + + cancel() + wg.Wait() + + eventSinkMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) + blockStoreMock.AssertExpectations(t) +} +func TestTx(t *testing.T) { + testHash := []byte("test") + testTx := []byte("tx") + + stateStoreMock := &statemocks.Store{} + blockStoreMock := &statemocks.BlockStore{} + eventSinkMock := &indexermocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) + eventSinkMock.On("Type").Return(indexer.KV) + eventSinkMock.On("GetTxByHash", testHash).Return(&abcitypes.TxResult{ + Tx: testTx, + }, nil) + + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress) + require.NoError(t, err) + + res, err := cli.Tx(context.Background(), testHash, false) + require.NoError(t, err) + require.Equal(t, types.Tx(testTx), res.Tx) + + cancel() + wg.Wait() + + eventSinkMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) + blockStoreMock.AssertExpectations(t) +} +func TestConsensusParams(t *testing.T) { + testHeight := int64(1) + testMaxGas := int64(55) + stateStoreMock := &statemocks.Store{} + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Height").Return(testHeight) + blockStoreMock.On("Base").Return(int64(0)) + stateStoreMock.On("LoadConsensusParams", testHeight).Return(types.ConsensusParams{ + Block: types.BlockParams{ + MaxGas: testMaxGas, + }, + }, nil) + eventSinkMock := &indexermocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress) + require.NoError(t, err) + params, err := cli.ConsensusParams(context.Background(), &testHeight) + require.NoError(t, err) + require.Equal(t, params.ConsensusParams.Block.MaxGas, testMaxGas) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestBlockResults(t *testing.T) { + testHeight := int64(1) + testGasUsed := int64(100) + stateStoreMock := &statemocks.Store{} + // tmstate "github.com/tendermint/tendermint/proto/tendermint/state" + stateStoreMock.On("LoadABCIResponses", testHeight).Return(&state.ABCIResponses{ + DeliverTxs: []*abcitypes.ResponseDeliverTx{ + { + GasUsed: testGasUsed, + }, + }, + EndBlock: &abcitypes.ResponseEndBlock{}, + BeginBlock: &abcitypes.ResponseBeginBlock{}, + }, nil) + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Base").Return(int64(0)) + blockStoreMock.On("Height").Return(testHeight) + eventSinkMock := &indexermocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress) + require.NoError(t, err) + res, err := cli.BlockResults(context.Background(), &testHeight) + require.NoError(t, err) + require.Equal(t, res.TotalGasUsed, testGasUsed) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestCommit(t *testing.T) { + testHeight := int64(1) + testRound := int32(101) + stateStoreMock := &statemocks.Store{} + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Base").Return(int64(0)) + blockStoreMock.On("Height").Return(testHeight) + blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{}, nil) + blockStoreMock.On("LoadSeenCommit").Return(&types.Commit{ + Height: testHeight, + Round: testRound, + }, nil) + eventSinkMock := &indexermocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress) + require.NoError(t, err) + res, err := cli.Commit(context.Background(), &testHeight) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, res.SignedHeader.Commit.Round, testRound) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestBlockByHash(t *testing.T) { + testHeight := int64(1) + testHash := []byte("test hash") + testBlock := new(types.Block) + testBlock.Header.Height = testHeight + testBlock.Header.LastCommitHash = testHash + stateStoreMock := &statemocks.Store{} + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{ + BlockID: types.BlockID{ + Hash: testHash, + }, + Header: types.Header{ + Height: testHeight, + }, + }, nil) + blockStoreMock.On("LoadBlockByHash", testHash).Return(testBlock, nil) + eventSinkMock := &indexermocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress) + require.NoError(t, err) + res, err := cli.BlockByHash(context.Background(), testHash) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, []byte(res.BlockID.Hash), testHash) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestBlockchain(t *testing.T) { + testHeight := int64(1) + testBlock := new(types.Block) + testBlockHash := []byte("test hash") + testBlock.Header.Height = testHeight + testBlock.Header.LastCommitHash = testBlockHash + stateStoreMock := &statemocks.Store{} + + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Height").Return(testHeight) + blockStoreMock.On("Base").Return(int64(0)) + blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{ + BlockID: types.BlockID{ + Hash: testBlockHash, + }, + }) + eventSinkMock := &indexermocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress) + require.NoError(t, err) + res, err := cli.BlockchainInfo(context.Background(), 0, 100) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, testBlockHash, []byte(res.BlockMetas[0].BlockID.Hash)) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestValidators(t *testing.T) { + testHeight := int64(1) + testVotingPower := int64(100) + testValidators := types.ValidatorSet{ + Validators: []*types.Validator{ + { + VotingPower: testVotingPower, + }, + }, + } + stateStoreMock := &statemocks.Store{} + stateStoreMock.On("LoadValidators", testHeight).Return(&testValidators, nil) + + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Height").Return(testHeight) + blockStoreMock.On("Base").Return(int64(0)) + eventSinkMock := &indexermocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress) + require.NoError(t, err) + + testPage := 1 + testPerPage := 100 + res, err := cli.Validators(context.Background(), &testHeight, &testPage, &testPerPage) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, testVotingPower, res.Validators[0].VotingPower) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestBlockSearch(t *testing.T) { + testHeight := int64(1) + testBlockHash := []byte("test hash") + testQuery := "block.height = 1" + stateStoreMock := &statemocks.Store{} + + blockStoreMock := &statemocks.BlockStore{} + eventSinkMock := &indexermocks.EventSink{} + eventSinkMock.On("Stop").Return(nil) + eventSinkMock.On("Type").Return(indexer.KV) + blockStoreMock.On("LoadBlock", testHeight).Return(&types.Block{ + Header: types.Header{ + Height: testHeight, + }, + }, nil) + blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{ + BlockID: types.BlockID{ + Hash: testBlockHash, + }, + }) + eventSinkMock.On("SearchBlockEvents", mock.Anything, + mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })). + Return([]int64{testHeight}, nil) + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress) + require.NoError(t, err) + + testPage := 1 + testPerPage := 100 + testOrderBy := "desc" + res, err := cli.BlockSearch(context.Background(), testQuery, &testPage, &testPerPage, testOrderBy) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, testBlockHash, []byte(res.Blocks[0].BlockID.Hash)) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func requireConnect(t testing.TB, addr string, retries int) { + parts := strings.SplitN(addr, "://", 2) + if len(parts) != 2 { + t.Fatalf("malformed address to dial: %s", addr) + } + var err error + for i := 0; i < retries; i++ { + var conn net.Conn + conn, err = net.Dial(parts[0], parts[1]) + if err == nil { + conn.Close() + return + } + // FIXME attempt to yield and let the other goroutine continue execution. + time.Sleep(time.Microsecond * 100) + } + t.Fatalf("unable to connect to server %s after %d tries: %s", addr, retries, err) +} diff --git a/inspect/rpc/rpc.go b/inspect/rpc/rpc.go new file mode 100644 index 000000000..76dcda4eb --- /dev/null +++ b/inspect/rpc/rpc.go @@ -0,0 +1,143 @@ +package rpc + +import ( + "context" + "net/http" + "time" + + "github.com/rs/cors" + + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/internal/consensus" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/pubsub" + "github.com/tendermint/tendermint/rpc/core" + "github.com/tendermint/tendermint/rpc/jsonrpc/server" + "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/types" +) + +// Server defines parameters for running an Inspector rpc server. +type Server struct { + Addr string // TCP address to listen on, ":http" if empty + Handler http.Handler + Logger log.Logger + Config *config.RPCConfig +} + +// Routes returns the set of routes used by the Inspector server. +// +//nolint: lll +func Routes(cfg config.RPCConfig, s state.Store, bs state.BlockStore, es []indexer.EventSink, logger log.Logger) core.RoutesMap { + env := &core.Environment{ + Config: cfg, + EventSinks: es, + StateStore: s, + BlockStore: bs, + ConsensusReactor: waitSyncCheckerImpl{}, + Logger: logger, + } + return core.RoutesMap{ + "blockchain": server.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true), + "consensus_params": server.NewRPCFunc(env.ConsensusParams, "height", true), + "block": server.NewRPCFunc(env.Block, "height", true), + "block_by_hash": server.NewRPCFunc(env.BlockByHash, "hash", true), + "block_results": server.NewRPCFunc(env.BlockResults, "height", true), + "commit": server.NewRPCFunc(env.Commit, "height", true), + "validators": server.NewRPCFunc(env.Validators, "height,page,per_page", true), + "tx": server.NewRPCFunc(env.Tx, "hash,prove", true), + "tx_search": server.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false), + "block_search": server.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false), + } +} + +// Handler returns the http.Handler configured for use with an Inspector server. Handler +// registers the routes on the http.Handler and also registers the websocket handler +// and the CORS handler if specified by the configuration options. +func Handler(rpcConfig *config.RPCConfig, routes core.RoutesMap, logger log.Logger) http.Handler { + mux := http.NewServeMux() + wmLogger := logger.With("protocol", "websocket") + + var eventBus types.EventBusSubscriber + + websocketDisconnectFn := func(remoteAddr string) { + err := eventBus.UnsubscribeAll(context.Background(), remoteAddr) + if err != nil && err != pubsub.ErrSubscriptionNotFound { + wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) + } + } + wm := server.NewWebsocketManager(routes, + server.OnDisconnect(websocketDisconnectFn), + server.ReadLimit(rpcConfig.MaxBodyBytes)) + wm.SetLogger(wmLogger) + mux.HandleFunc("/websocket", wm.WebsocketHandler) + + server.RegisterRPCFuncs(mux, routes, logger) + var rootHandler http.Handler = mux + if rpcConfig.IsCorsEnabled() { + rootHandler = addCORSHandler(rpcConfig, mux) + } + return rootHandler +} + +func addCORSHandler(rpcConfig *config.RPCConfig, h http.Handler) http.Handler { + corsMiddleware := cors.New(cors.Options{ + AllowedOrigins: rpcConfig.CORSAllowedOrigins, + AllowedMethods: rpcConfig.CORSAllowedMethods, + AllowedHeaders: rpcConfig.CORSAllowedHeaders, + }) + h = corsMiddleware.Handler(h) + return h +} + +type waitSyncCheckerImpl struct{} + +func (waitSyncCheckerImpl) WaitSync() bool { + return false +} + +func (waitSyncCheckerImpl) GetPeerState(peerID types.NodeID) (*consensus.PeerState, bool) { + return nil, false +} + +// ListenAndServe listens on the address specified in srv.Addr and handles any +// incoming requests over HTTP using the Inspector rpc handler specified on the server. +func (srv *Server) ListenAndServe(ctx context.Context) error { + listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections) + if err != nil { + return err + } + go func() { + <-ctx.Done() + listener.Close() + }() + return server.Serve(listener, srv.Handler, srv.Logger, serverRPCConfig(srv.Config)) +} + +// ListenAndServeTLS listens on the address specified in srv.Addr. ListenAndServeTLS handles +// incoming requests over HTTPS using the Inspector rpc handler specified on the server. +func (srv *Server) ListenAndServeTLS(ctx context.Context, certFile, keyFile string) error { + listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections) + if err != nil { + return err + } + go func() { + <-ctx.Done() + listener.Close() + }() + return server.ServeTLS(listener, srv.Handler, certFile, keyFile, srv.Logger, serverRPCConfig(srv.Config)) +} + +func serverRPCConfig(r *config.RPCConfig) *server.Config { + cfg := server.DefaultConfig() + cfg.MaxBodyBytes = r.MaxBodyBytes + cfg.MaxHeaderBytes = r.MaxHeaderBytes + // If necessary adjust global WriteTimeout to ensure it's greater than + // TimeoutBroadcastTxCommit. + // See https://github.com/tendermint/tendermint/issues/3435 + if cfg.WriteTimeout <= r.TimeoutBroadcastTxCommit { + cfg.WriteTimeout = r.TimeoutBroadcastTxCommit + 1*time.Second + } + return cfg +} diff --git a/light/proxy/proxy.go b/light/proxy/proxy.go index 8f1e7bf87..6f2622588 100644 --- a/light/proxy/proxy.go +++ b/light/proxy/proxy.go @@ -113,7 +113,7 @@ func (p *Proxy) listen() (net.Listener, *http.ServeMux, error) { } // 4) Start listening for new connections. - listener, err := rpcserver.Listen(p.Addr, p.Config) + listener, err := rpcserver.Listen(p.Addr, p.Config.MaxOpenConnections) if err != nil { return nil, mux, err } diff --git a/node/node.go b/node/node.go index a31362b49..9cb5315bc 100644 --- a/node/node.go +++ b/node/node.go @@ -441,22 +441,23 @@ func makeNode(config *cfg.Config, ProxyAppQuery: proxyApp.Query(), ProxyAppMempool: proxyApp.Mempool(), - StateStore: stateStore, - BlockStore: blockStore, - EvidencePool: evPool, - ConsensusState: csState, + StateStore: stateStore, + BlockStore: blockStore, + EvidencePool: evPool, + ConsensusState: csState, + + ConsensusReactor: csReactor, BlockSyncReactor: bcReactor.(cs.BlockSyncReactor), P2PPeers: sw, PeerManager: peerManager, - GenDoc: genDoc, - EventSinks: eventSinks, - ConsensusReactor: csReactor, - EventBus: eventBus, - Mempool: mp, - Logger: logger.With("module", "rpc"), - Config: *config.RPC, + GenDoc: genDoc, + EventSinks: eventSinks, + EventBus: eventBus, + Mempool: mp, + Logger: logger.With("module", "rpc"), + Config: *config.RPC, }, } @@ -825,7 +826,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) { rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger) listener, err := rpcserver.Listen( listenAddr, - config, + config.MaxOpenConnections, ) if err != nil { return nil, err @@ -883,7 +884,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) { if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second } - listener, err := rpcserver.Listen(grpcListenAddr, config) + listener, err := rpcserver.Listen(grpcListenAddr, config.MaxOpenConnections) if err != nil { return nil, err } diff --git a/node/setup.go b/node/setup.go index e9bfb029a..6d2a7523b 100644 --- a/node/setup.go +++ b/node/setup.go @@ -8,7 +8,6 @@ import ( "math" "net" _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port - "strings" "time" dbm "github.com/tendermint/tm-db" @@ -33,9 +32,7 @@ import ( "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" - kv "github.com/tendermint/tendermint/state/indexer/sink/kv" - null "github.com/tendermint/tendermint/state/indexer/sink/null" - psql "github.com/tendermint/tendermint/state/indexer/sink/psql" + "github.com/tendermint/tendermint/state/indexer/sink" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" @@ -78,56 +75,9 @@ func createAndStartIndexerService( logger log.Logger, chainID string, ) (*indexer.Service, []indexer.EventSink, error) { - - eventSinks := []indexer.EventSink{} - - // check for duplicated sinks - sinks := map[string]bool{} - for _, s := range config.TxIndex.Indexer { - sl := strings.ToLower(s) - if sinks[sl] { - return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml") - } - - sinks[sl] = true - } - -loop: - for k := range sinks { - switch k { - case string(indexer.NULL): - // When we see null in the config, the eventsinks will be reset with the - // nullEventSink. - eventSinks = []indexer.EventSink{null.NewEventSink()} - break loop - - case string(indexer.KV): - store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config}) - if err != nil { - return nil, nil, err - } - - eventSinks = append(eventSinks, kv.NewEventSink(store)) - - case string(indexer.PSQL): - conn := config.TxIndex.PsqlConn - if conn == "" { - return nil, nil, errors.New("the psql connection settings cannot be empty") - } - - es, _, err := psql.NewEventSink(conn, chainID) - if err != nil { - return nil, nil, err - } - eventSinks = append(eventSinks, es) - - default: - return nil, nil, errors.New("unsupported event sink type") - } - } - - if len(eventSinks) == 0 { - eventSinks = []indexer.EventSink{null.NewEventSink()} + eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID) + if err != nil { + return nil, nil, err } indexerService := indexer.NewIndexerService(eventSinks, eventBus) diff --git a/rpc/core/env.go b/rpc/core/env.go index c35b8229c..7069bc4d4 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -58,6 +58,11 @@ type peers interface { Peers() p2p.IPeerSet } +type consensusReactor interface { + WaitSync() bool + GetPeerState(peerID types.NodeID) (*consensus.PeerState, bool) +} + type peerManager interface { Peers() []types.NodeID Addresses(types.NodeID) []p2p.NodeAddress @@ -72,11 +77,12 @@ type Environment struct { ProxyAppMempool proxy.AppConnMempool // interfaces defined in types and above - StateStore sm.Store - BlockStore sm.BlockStore - EvidencePool sm.EvidencePool - ConsensusState consensusState - P2PPeers peers + StateStore sm.Store + BlockStore sm.BlockStore + EvidencePool sm.EvidencePool + ConsensusState consensusState + ConsensusReactor consensusReactor + P2PPeers peers // Legacy p2p stack P2PTransport transport @@ -88,7 +94,6 @@ type Environment struct { PubKey crypto.PubKey GenDoc *types.GenesisDoc // cache the genesis structure EventSinks []indexer.EventSink - ConsensusReactor *consensus.Reactor EventBus *types.EventBus // thread safe Mempool mempl.Mempool BlockSyncReactor consensus.BlockSyncReactor diff --git a/rpc/jsonrpc/jsonrpc_test.go b/rpc/jsonrpc/jsonrpc_test.go index 6e0c03f00..b5e422280 100644 --- a/rpc/jsonrpc/jsonrpc_test.go +++ b/rpc/jsonrpc/jsonrpc_test.go @@ -110,7 +110,7 @@ func setup() { wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) config := server.DefaultConfig() - listener1, err := server.Listen(tcpAddr, config) + listener1, err := server.Listen(tcpAddr, config.MaxOpenConnections) if err != nil { panic(err) } @@ -126,7 +126,7 @@ func setup() { wm = server.NewWebsocketManager(Routes) wm.SetLogger(unixLogger) mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) - listener2, err := server.Listen(unixAddr, config) + listener2, err := server.Listen(unixAddr, config.MaxOpenConnections) if err != nil { panic(err) } diff --git a/rpc/jsonrpc/server/http_server.go b/rpc/jsonrpc/server/http_server.go index c21c71c49..549671241 100644 --- a/rpc/jsonrpc/server/http_server.go +++ b/rpc/jsonrpc/server/http_server.go @@ -261,7 +261,7 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Listen starts a new net.Listener on the given address. // It returns an error if the address is invalid or the call to Listen() fails. -func Listen(addr string, config *Config) (listener net.Listener, err error) { +func Listen(addr string, maxOpenConnections int) (listener net.Listener, err error) { parts := strings.SplitN(addr, "://", 2) if len(parts) != 2 { return nil, fmt.Errorf( @@ -274,8 +274,8 @@ func Listen(addr string, config *Config) (listener net.Listener, err error) { if err != nil { return nil, fmt.Errorf("failed to listen on %v: %v", addr, err) } - if config.MaxOpenConnections > 0 { - listener = netutil.LimitListener(listener, config.MaxOpenConnections) + if maxOpenConnections > 0 { + listener = netutil.LimitListener(listener, maxOpenConnections) } return listener, nil diff --git a/rpc/jsonrpc/server/http_server_test.go b/rpc/jsonrpc/server/http_server_test.go index e7c517cde..823719e41 100644 --- a/rpc/jsonrpc/server/http_server_test.go +++ b/rpc/jsonrpc/server/http_server_test.go @@ -39,8 +39,7 @@ func TestMaxOpenConnections(t *testing.T) { fmt.Fprint(w, "some body") }) config := DefaultConfig() - config.MaxOpenConnections = max - l, err := Listen("tcp://127.0.0.1:0", config) + l, err := Listen("tcp://127.0.0.1:0", max) require.NoError(t, err) defer l.Close() go Serve(l, mux, log.TestingLogger(), config) //nolint:errcheck // ignore for tests diff --git a/rpc/jsonrpc/test/main.go b/rpc/jsonrpc/test/main.go index 1c949571f..d348e1639 100644 --- a/rpc/jsonrpc/test/main.go +++ b/rpc/jsonrpc/test/main.go @@ -33,7 +33,7 @@ func main() { rpcserver.RegisterRPCFuncs(mux, routes, logger) config := rpcserver.DefaultConfig() - listener, err := rpcserver.Listen("tcp://127.0.0.1:8008", config) + listener, err := rpcserver.Listen("tcp://127.0.0.1:8008", config.MaxOpenConnections) if err != nil { tmos.Exit(err.Error()) } diff --git a/state/indexer/indexer_service.go b/state/indexer/indexer_service.go index a429b66a0..39a1847f8 100644 --- a/state/indexer/indexer_service.go +++ b/state/indexer/indexer_service.go @@ -51,43 +51,47 @@ func (is *Service) OnStart() error { go func() { for { - msg := <-blockHeadersSub.Out() - - eventDataHeader := msg.Data().(types.EventDataNewBlockHeader) - height := eventDataHeader.Header.Height - batch := NewBatch(eventDataHeader.NumTxs) - - for i := int64(0); i < eventDataHeader.NumTxs; i++ { - msg2 := <-txsSub.Out() - txResult := msg2.Data().(types.EventDataTx).TxResult - - if err = batch.Add(&txResult); err != nil { - is.Logger.Error( - "failed to add tx to batch", - "height", height, - "index", txResult.Index, - "err", err, - ) + select { + case <-blockHeadersSub.Canceled(): + return + case msg := <-blockHeadersSub.Out(): + + eventDataHeader := msg.Data().(types.EventDataNewBlockHeader) + height := eventDataHeader.Header.Height + batch := NewBatch(eventDataHeader.NumTxs) + + for i := int64(0); i < eventDataHeader.NumTxs; i++ { + msg2 := <-txsSub.Out() + txResult := msg2.Data().(types.EventDataTx).TxResult + + if err = batch.Add(&txResult); err != nil { + is.Logger.Error( + "failed to add tx to batch", + "height", height, + "index", txResult.Index, + "err", err, + ) + } } - } - - if !IndexingEnabled(is.eventSinks) { - continue - } - for _, sink := range is.eventSinks { - if err := sink.IndexBlockEvents(eventDataHeader); err != nil { - is.Logger.Error("failed to index block", "height", height, "err", err) - } else { - is.Logger.Debug("indexed block", "height", height, "sink", sink.Type()) + if !IndexingEnabled(is.eventSinks) { + continue } - if len(batch.Ops) > 0 { - err := sink.IndexTxEvents(batch.Ops) - if err != nil { - is.Logger.Error("failed to index block txs", "height", height, "err", err) + for _, sink := range is.eventSinks { + if err := sink.IndexBlockEvents(eventDataHeader); err != nil { + is.Logger.Error("failed to index block", "height", height, "err", err) } else { - is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type()) + is.Logger.Debug("indexed block", "height", height, "sink", sink.Type()) + } + + if len(batch.Ops) > 0 { + err := sink.IndexTxEvents(batch.Ops) + if err != nil { + is.Logger.Error("failed to index block txs", "height", height, "err", err) + } else { + is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type()) + } } } } diff --git a/state/indexer/sink/sink.go b/state/indexer/sink/sink.go new file mode 100644 index 000000000..1e50721c6 --- /dev/null +++ b/state/indexer/sink/sink.go @@ -0,0 +1,65 @@ +package sink + +import ( + "errors" + "strings" + + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/state/indexer/sink/kv" + "github.com/tendermint/tendermint/state/indexer/sink/null" + "github.com/tendermint/tendermint/state/indexer/sink/psql" +) + +// EventSinksFromConfig constructs a slice of indexer.EventSink using the provided +// configuration. +// +//nolint:lll +func EventSinksFromConfig(cfg *config.Config, dbProvider config.DBProvider, chainID string) ([]indexer.EventSink, error) { + if len(cfg.TxIndex.Indexer) == 0 { + return []indexer.EventSink{null.NewEventSink()}, nil + } + + // check for duplicated sinks + sinks := map[string]struct{}{} + for _, s := range cfg.TxIndex.Indexer { + sl := strings.ToLower(s) + if _, ok := sinks[sl]; ok { + return nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml") + } + sinks[sl] = struct{}{} + } + eventSinks := []indexer.EventSink{} + for k := range sinks { + switch indexer.EventSinkType(k) { + case indexer.NULL: + // When we see null in the config, the eventsinks will be reset with the + // nullEventSink. + return []indexer.EventSink{null.NewEventSink()}, nil + + case indexer.KV: + store, err := dbProvider(&config.DBContext{ID: "tx_index", Config: cfg}) + if err != nil { + return nil, err + } + + eventSinks = append(eventSinks, kv.NewEventSink(store)) + + case indexer.PSQL: + conn := cfg.TxIndex.PsqlConn + if conn == "" { + return nil, errors.New("the psql connection settings cannot be empty") + } + + es, _, err := psql.NewEventSink(conn, chainID) + if err != nil { + return nil, err + } + eventSinks = append(eventSinks, es) + default: + return nil, errors.New("unsupported event sink type") + } + } + return eventSinks, nil + +}