EDIT: Updated, see [comment below]( https://github.com/tendermint/tendermint/pull/6785#issuecomment-897793175) This change adds a sketch of the `Debug` mode. This change adds a `Debug` struct to the node package. This `Debug` struct is intended to be created and started by a command in the `cmd` directory. The `Debug` struct runs the RPC server on the data directories: both the state store and the block store. This change required a good deal of refactoring. Namely, a new `rpc.go` file was added to the `node` package. This file encapsulates functions for starting RPC servers used by nodes. A potential additional change is to further factor this code into shared code _in_ the `rpc` package. Minor API tweaks were also made that seemed appropriate such as the mechanism for fetching routes from the `rpc/core` package. Additional work is required to register the `Debug` service as a command in the `cmd` directory but I am looking for feedback on if this direction seems appropriate before diving much further. closes: #5908pull/6860/head
@ -0,0 +1,87 @@ | |||
package commands | |||
import ( | |||
"context" | |||
"os" | |||
"os/signal" | |||
"syscall" | |||
"github.com/spf13/cobra" | |||
cfg "github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/inspect" | |||
"github.com/tendermint/tendermint/state" | |||
"github.com/tendermint/tendermint/state/indexer/sink" | |||
"github.com/tendermint/tendermint/store" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// InspectCmd is the command for starting an inspect server. | |||
var InspectCmd = &cobra.Command{ | |||
Use: "inspect", | |||
Short: "Run an inspect server for investigating Tendermint state", | |||
Long: ` | |||
inspect runs a subset of Tendermint's RPC endpoints that are useful for debugging | |||
issues with Tendermint. | |||
When the Tendermint consensus engine detects inconsistent state, it will crash the | |||
tendermint process. Tendermint will not start up while in this inconsistent state. | |||
The inspect command can be used to query the block and state store using Tendermint | |||
RPC calls to debug issues of inconsistent state. | |||
`, | |||
RunE: runInspect, | |||
} | |||
func init() { | |||
InspectCmd.Flags(). | |||
String("rpc.laddr", | |||
config.RPC.ListenAddress, "RPC listenener address. Port required") | |||
InspectCmd.Flags(). | |||
String("db-backend", | |||
config.DBBackend, "database backend: goleveldb | cleveldb | boltdb | rocksdb | badgerdb") | |||
InspectCmd.Flags(). | |||
String("db-dir", config.DBPath, "database directory") | |||
} | |||
func runInspect(cmd *cobra.Command, args []string) error { | |||
ctx, cancel := context.WithCancel(cmd.Context()) | |||
defer cancel() | |||
c := make(chan os.Signal, 1) | |||
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) | |||
go func() { | |||
<-c | |||
cancel() | |||
}() | |||
blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "blockstore", Config: config}) | |||
if err != nil { | |||
return err | |||
} | |||
blockStore := store.NewBlockStore(blockStoreDB) | |||
stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "state", Config: config}) | |||
if err != nil { | |||
if err := blockStoreDB.Close(); err != nil { | |||
logger.Error("error closing block store db", "error", err) | |||
} | |||
return err | |||
} | |||
genDoc, err := types.GenesisDocFromFile(config.GenesisFile()) | |||
if err != nil { | |||
return err | |||
} | |||
sinks, err := sink.EventSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID) | |||
if err != nil { | |||
return err | |||
} | |||
stateStore := state.NewStore(stateDB) | |||
ins := inspect.New(config.RPC, blockStore, stateStore, sinks, logger) | |||
logger.Info("starting inspect server") | |||
if err := ins.Run(ctx); err != nil { | |||
return err | |||
} | |||
return nil | |||
} |
@ -0,0 +1,36 @@ | |||
/* | |||
Package inspect provides a tool for investigating the state of a | |||
failed Tendermint node. | |||
This package provides the Inspector type. The Inspector type runs a subset of the Tendermint | |||
RPC endpoints that are useful for debugging issues with Tendermint consensus. | |||
When a node running the Tendermint consensus engine detects an inconsistent consensus state, | |||
the entire node will crash. The Tendermint consensus engine cannot run in this | |||
inconsistent state so the node will not be able to start up again. | |||
The RPC endpoints provided by the Inspector type allow for a node operator to inspect | |||
the block store and state store to better understand what may have caused the inconsistent state. | |||
The Inspector type's lifecycle is controlled by a context.Context | |||
ins := inspect.NewFromConfig(rpcConfig) | |||
ctx, cancelFunc:= context.WithCancel(context.Background()) | |||
// Run blocks until the Inspector server is shut down. | |||
go ins.Run(ctx) | |||
... | |||
// calling the cancel function will stop the running inspect server | |||
cancelFunc() | |||
Inspector serves its RPC endpoints on the address configured in the RPC configuration | |||
rpcConfig.ListenAddress = "tcp://127.0.0.1:26657" | |||
ins := inspect.NewFromConfig(rpcConfig) | |||
go ins.Run(ctx) | |||
The list of available RPC endpoints can then be viewed by navigating to | |||
http://127.0.0.1:26657/ in the web browser. | |||
*/ | |||
package inspect |
@ -0,0 +1,149 @@ | |||
package inspect | |||
import ( | |||
"context" | |||
"errors" | |||
"fmt" | |||
"net" | |||
"github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/inspect/rpc" | |||
"github.com/tendermint/tendermint/libs/log" | |||
tmstrings "github.com/tendermint/tendermint/libs/strings" | |||
rpccore "github.com/tendermint/tendermint/rpc/core" | |||
"github.com/tendermint/tendermint/state" | |||
"github.com/tendermint/tendermint/state/indexer" | |||
"github.com/tendermint/tendermint/state/indexer/sink" | |||
"github.com/tendermint/tendermint/store" | |||
"github.com/tendermint/tendermint/types" | |||
"golang.org/x/sync/errgroup" | |||
) | |||
// Inspector manages an RPC service that exports methods to debug a failed node. | |||
// After a node shuts down due to a consensus failure, it will no longer start | |||
// up its state cannot easily be inspected. An Inspector value provides a similar interface | |||
// to the node, using the underlying Tendermint data stores, without bringing up | |||
// any other components. A caller can query the Inspector service to inspect the | |||
// persisted state and debug the failure. | |||
type Inspector struct { | |||
routes rpccore.RoutesMap | |||
config *config.RPCConfig | |||
indexerService *indexer.Service | |||
eventBus *types.EventBus | |||
logger log.Logger | |||
} | |||
// New returns an Inspector that serves RPC on the specified BlockStore and StateStore. | |||
// The Inspector type does not modify the state or block stores. | |||
// The sinks are used to enable block and transaction querying via the RPC server. | |||
// The caller is responsible for starting and stopping the Inspector service. | |||
/// | |||
//nolint:lll | |||
func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, es []indexer.EventSink, logger log.Logger) *Inspector { | |||
routes := rpc.Routes(*cfg, ss, bs, es, logger) | |||
eb := types.NewEventBus() | |||
eb.SetLogger(logger.With("module", "events")) | |||
is := indexer.NewIndexerService(es, eb) | |||
is.SetLogger(logger.With("module", "txindex")) | |||
return &Inspector{ | |||
routes: routes, | |||
config: cfg, | |||
logger: logger, | |||
eventBus: eb, | |||
indexerService: is, | |||
} | |||
} | |||
// NewFromConfig constructs an Inspector using the values defined in the passed in config. | |||
func NewFromConfig(cfg *config.Config) (*Inspector, error) { | |||
bsDB, err := config.DefaultDBProvider(&config.DBContext{ID: "blockstore", Config: cfg}) | |||
if err != nil { | |||
return nil, err | |||
} | |||
bs := store.NewBlockStore(bsDB) | |||
sDB, err := config.DefaultDBProvider(&config.DBContext{ID: "state", Config: cfg}) | |||
if err != nil { | |||
return nil, err | |||
} | |||
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile()) | |||
if err != nil { | |||
return nil, err | |||
} | |||
sinks, err := sink.EventSinksFromConfig(cfg, config.DefaultDBProvider, genDoc.ChainID) | |||
if err != nil { | |||
return nil, err | |||
} | |||
logger := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false) | |||
ss := state.NewStore(sDB) | |||
return New(cfg.RPC, bs, ss, sinks, logger), nil | |||
} | |||
// Run starts the Inspector servers and blocks until the servers shut down. The passed | |||
// in context is used to control the lifecycle of the servers. | |||
func (ins *Inspector) Run(ctx context.Context) error { | |||
err := ins.eventBus.Start() | |||
if err != nil { | |||
return fmt.Errorf("error starting event bus: %s", err) | |||
} | |||
defer func() { | |||
err := ins.eventBus.Stop() | |||
if err != nil { | |||
ins.logger.Error("event bus stopped with error", "err", err) | |||
} | |||
}() | |||
err = ins.indexerService.Start() | |||
if err != nil { | |||
return fmt.Errorf("error starting indexer service: %s", err) | |||
} | |||
defer func() { | |||
err := ins.indexerService.Stop() | |||
if err != nil { | |||
ins.logger.Error("indexer service stopped with error", "err", err) | |||
} | |||
}() | |||
return startRPCServers(ctx, ins.config, ins.logger, ins.routes) | |||
} | |||
func startRPCServers(ctx context.Context, cfg *config.RPCConfig, logger log.Logger, routes rpccore.RoutesMap) error { | |||
g, tctx := errgroup.WithContext(ctx) | |||
listenAddrs := tmstrings.SplitAndTrimEmpty(cfg.ListenAddress, ",", " ") | |||
rh := rpc.Handler(cfg, routes, logger) | |||
for _, listenerAddr := range listenAddrs { | |||
server := rpc.Server{ | |||
Logger: logger, | |||
Config: cfg, | |||
Handler: rh, | |||
Addr: listenerAddr, | |||
} | |||
if cfg.IsTLSEnabled() { | |||
keyFile := cfg.KeyFile() | |||
certFile := cfg.CertFile() | |||
listenerAddr := listenerAddr | |||
g.Go(func() error { | |||
logger.Info("RPC HTTPS server starting", "address", listenerAddr, | |||
"certfile", certFile, "keyfile", keyFile) | |||
err := server.ListenAndServeTLS(tctx, certFile, keyFile) | |||
if !errors.Is(err, net.ErrClosed) { | |||
return err | |||
} | |||
logger.Info("RPC HTTPS server stopped", "address", listenerAddr) | |||
return nil | |||
}) | |||
} else { | |||
listenerAddr := listenerAddr | |||
g.Go(func() error { | |||
logger.Info("RPC HTTP server starting", "address", listenerAddr) | |||
err := server.ListenAndServe(tctx) | |||
if !errors.Is(err, net.ErrClosed) { | |||
return err | |||
} | |||
logger.Info("RPC HTTP server stopped", "address", listenerAddr) | |||
return nil | |||
}) | |||
} | |||
} | |||
return g.Wait() | |||
} |
@ -0,0 +1,583 @@ | |||
package inspect_test | |||
import ( | |||
"context" | |||
"fmt" | |||
"net" | |||
"os" | |||
"strings" | |||
"sync" | |||
"testing" | |||
"time" | |||
"github.com/fortytw2/leaktest" | |||
"github.com/stretchr/testify/mock" | |||
"github.com/stretchr/testify/require" | |||
abcitypes "github.com/tendermint/tendermint/abci/types" | |||
"github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/inspect" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/libs/pubsub/query" | |||
"github.com/tendermint/tendermint/proto/tendermint/state" | |||
httpclient "github.com/tendermint/tendermint/rpc/client/http" | |||
"github.com/tendermint/tendermint/state/indexer" | |||
indexermocks "github.com/tendermint/tendermint/state/indexer/mocks" | |||
statemocks "github.com/tendermint/tendermint/state/mocks" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
func TestInspectConstructor(t *testing.T) { | |||
cfg := config.ResetTestRoot("test") | |||
t.Cleanup(leaktest.Check(t)) | |||
defer func() { _ = os.RemoveAll(cfg.RootDir) }() | |||
t.Run("from config", func(t *testing.T) { | |||
d, err := inspect.NewFromConfig(cfg) | |||
require.NoError(t, err) | |||
require.NotNil(t, d) | |||
}) | |||
} | |||
func TestInspectRun(t *testing.T) { | |||
cfg := config.ResetTestRoot("test") | |||
t.Cleanup(leaktest.Check(t)) | |||
defer func() { _ = os.RemoveAll(cfg.RootDir) }() | |||
t.Run("from config", func(t *testing.T) { | |||
d, err := inspect.NewFromConfig(cfg) | |||
require.NoError(t, err) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
stoppedWG := &sync.WaitGroup{} | |||
stoppedWG.Add(1) | |||
go func() { | |||
require.NoError(t, d.Run(ctx)) | |||
stoppedWG.Done() | |||
}() | |||
cancel() | |||
stoppedWG.Wait() | |||
}) | |||
} | |||
func TestBlock(t *testing.T) { | |||
testHeight := int64(1) | |||
testBlock := new(types.Block) | |||
testBlock.Header.Height = testHeight | |||
testBlock.Header.LastCommitHash = []byte("test hash") | |||
stateStoreMock := &statemocks.Store{} | |||
blockStoreMock := &statemocks.BlockStore{} | |||
blockStoreMock.On("Height").Return(testHeight) | |||
blockStoreMock.On("Base").Return(int64(0)) | |||
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{}) | |||
blockStoreMock.On("LoadBlock", testHeight).Return(testBlock) | |||
eventSinkMock := &indexermocks.EventSink{} | |||
eventSinkMock.On("Stop").Return(nil) | |||
rpcConfig := config.TestRPCConfig() | |||
l := log.TestingLogger() | |||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
wg := &sync.WaitGroup{} | |||
wg.Add(1) | |||
startedWG := &sync.WaitGroup{} | |||
startedWG.Add(1) | |||
go func() { | |||
startedWG.Done() | |||
defer wg.Done() | |||
require.NoError(t, d.Run(ctx)) | |||
}() | |||
// FIXME: used to induce context switch. | |||
// Determine more deterministic method for prompting a context switch | |||
startedWG.Wait() | |||
requireConnect(t, rpcConfig.ListenAddress, 20) | |||
cli, err := httpclient.New(rpcConfig.ListenAddress) | |||
require.NoError(t, err) | |||
resultBlock, err := cli.Block(context.Background(), &testHeight) | |||
require.NoError(t, err) | |||
require.Equal(t, testBlock.Height, resultBlock.Block.Height) | |||
require.Equal(t, testBlock.LastCommitHash, resultBlock.Block.LastCommitHash) | |||
cancel() | |||
wg.Wait() | |||
blockStoreMock.AssertExpectations(t) | |||
stateStoreMock.AssertExpectations(t) | |||
} | |||
func TestTxSearch(t *testing.T) { | |||
testHash := []byte("test") | |||
testTx := []byte("tx") | |||
testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash)) | |||
testTxResult := &abcitypes.TxResult{ | |||
Height: 1, | |||
Index: 100, | |||
Tx: testTx, | |||
} | |||
stateStoreMock := &statemocks.Store{} | |||
blockStoreMock := &statemocks.BlockStore{} | |||
eventSinkMock := &indexermocks.EventSink{} | |||
eventSinkMock.On("Stop").Return(nil) | |||
eventSinkMock.On("Type").Return(indexer.KV) | |||
eventSinkMock.On("SearchTxEvents", mock.Anything, | |||
mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })). | |||
Return([]*abcitypes.TxResult{testTxResult}, nil) | |||
rpcConfig := config.TestRPCConfig() | |||
l := log.TestingLogger() | |||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
wg := &sync.WaitGroup{} | |||
wg.Add(1) | |||
startedWG := &sync.WaitGroup{} | |||
startedWG.Add(1) | |||
go func() { | |||
startedWG.Done() | |||
defer wg.Done() | |||
require.NoError(t, d.Run(ctx)) | |||
}() | |||
// FIXME: used to induce context switch. | |||
// Determine more deterministic method for prompting a context switch | |||
startedWG.Wait() | |||
requireConnect(t, rpcConfig.ListenAddress, 20) | |||
cli, err := httpclient.New(rpcConfig.ListenAddress) | |||
require.NoError(t, err) | |||
var page = 1 | |||
resultTxSearch, err := cli.TxSearch(context.Background(), testQuery, false, &page, &page, "") | |||
require.NoError(t, err) | |||
require.Len(t, resultTxSearch.Txs, 1) | |||
require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx) | |||
cancel() | |||
wg.Wait() | |||
eventSinkMock.AssertExpectations(t) | |||
stateStoreMock.AssertExpectations(t) | |||
blockStoreMock.AssertExpectations(t) | |||
} | |||
func TestTx(t *testing.T) { | |||
testHash := []byte("test") | |||
testTx := []byte("tx") | |||
stateStoreMock := &statemocks.Store{} | |||
blockStoreMock := &statemocks.BlockStore{} | |||
eventSinkMock := &indexermocks.EventSink{} | |||
eventSinkMock.On("Stop").Return(nil) | |||
eventSinkMock.On("Type").Return(indexer.KV) | |||
eventSinkMock.On("GetTxByHash", testHash).Return(&abcitypes.TxResult{ | |||
Tx: testTx, | |||
}, nil) | |||
rpcConfig := config.TestRPCConfig() | |||
l := log.TestingLogger() | |||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
wg := &sync.WaitGroup{} | |||
wg.Add(1) | |||
startedWG := &sync.WaitGroup{} | |||
startedWG.Add(1) | |||
go func() { | |||
startedWG.Done() | |||
defer wg.Done() | |||
require.NoError(t, d.Run(ctx)) | |||
}() | |||
// FIXME: used to induce context switch. | |||
// Determine more deterministic method for prompting a context switch | |||
startedWG.Wait() | |||
requireConnect(t, rpcConfig.ListenAddress, 20) | |||
cli, err := httpclient.New(rpcConfig.ListenAddress) | |||
require.NoError(t, err) | |||
res, err := cli.Tx(context.Background(), testHash, false) | |||
require.NoError(t, err) | |||
require.Equal(t, types.Tx(testTx), res.Tx) | |||
cancel() | |||
wg.Wait() | |||
eventSinkMock.AssertExpectations(t) | |||
stateStoreMock.AssertExpectations(t) | |||
blockStoreMock.AssertExpectations(t) | |||
} | |||
func TestConsensusParams(t *testing.T) { | |||
testHeight := int64(1) | |||
testMaxGas := int64(55) | |||
stateStoreMock := &statemocks.Store{} | |||
blockStoreMock := &statemocks.BlockStore{} | |||
blockStoreMock.On("Height").Return(testHeight) | |||
blockStoreMock.On("Base").Return(int64(0)) | |||
stateStoreMock.On("LoadConsensusParams", testHeight).Return(types.ConsensusParams{ | |||
Block: types.BlockParams{ | |||
MaxGas: testMaxGas, | |||
}, | |||
}, nil) | |||
eventSinkMock := &indexermocks.EventSink{} | |||
eventSinkMock.On("Stop").Return(nil) | |||
rpcConfig := config.TestRPCConfig() | |||
l := log.TestingLogger() | |||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
wg := &sync.WaitGroup{} | |||
wg.Add(1) | |||
startedWG := &sync.WaitGroup{} | |||
startedWG.Add(1) | |||
go func() { | |||
startedWG.Done() | |||
defer wg.Done() | |||
require.NoError(t, d.Run(ctx)) | |||
}() | |||
// FIXME: used to induce context switch. | |||
// Determine more deterministic method for prompting a context switch | |||
startedWG.Wait() | |||
requireConnect(t, rpcConfig.ListenAddress, 20) | |||
cli, err := httpclient.New(rpcConfig.ListenAddress) | |||
require.NoError(t, err) | |||
params, err := cli.ConsensusParams(context.Background(), &testHeight) | |||
require.NoError(t, err) | |||
require.Equal(t, params.ConsensusParams.Block.MaxGas, testMaxGas) | |||
cancel() | |||
wg.Wait() | |||
blockStoreMock.AssertExpectations(t) | |||
stateStoreMock.AssertExpectations(t) | |||
} | |||
func TestBlockResults(t *testing.T) { | |||
testHeight := int64(1) | |||
testGasUsed := int64(100) | |||
stateStoreMock := &statemocks.Store{} | |||
// tmstate "github.com/tendermint/tendermint/proto/tendermint/state" | |||
stateStoreMock.On("LoadABCIResponses", testHeight).Return(&state.ABCIResponses{ | |||
DeliverTxs: []*abcitypes.ResponseDeliverTx{ | |||
{ | |||
GasUsed: testGasUsed, | |||
}, | |||
}, | |||
EndBlock: &abcitypes.ResponseEndBlock{}, | |||
BeginBlock: &abcitypes.ResponseBeginBlock{}, | |||
}, nil) | |||
blockStoreMock := &statemocks.BlockStore{} | |||
blockStoreMock.On("Base").Return(int64(0)) | |||
blockStoreMock.On("Height").Return(testHeight) | |||
eventSinkMock := &indexermocks.EventSink{} | |||
eventSinkMock.On("Stop").Return(nil) | |||
rpcConfig := config.TestRPCConfig() | |||
l := log.TestingLogger() | |||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
wg := &sync.WaitGroup{} | |||
wg.Add(1) | |||
startedWG := &sync.WaitGroup{} | |||
startedWG.Add(1) | |||
go func() { | |||
startedWG.Done() | |||
defer wg.Done() | |||
require.NoError(t, d.Run(ctx)) | |||
}() | |||
// FIXME: used to induce context switch. | |||
// Determine more deterministic method for prompting a context switch | |||
startedWG.Wait() | |||
requireConnect(t, rpcConfig.ListenAddress, 20) | |||
cli, err := httpclient.New(rpcConfig.ListenAddress) | |||
require.NoError(t, err) | |||
res, err := cli.BlockResults(context.Background(), &testHeight) | |||
require.NoError(t, err) | |||
require.Equal(t, res.TotalGasUsed, testGasUsed) | |||
cancel() | |||
wg.Wait() | |||
blockStoreMock.AssertExpectations(t) | |||
stateStoreMock.AssertExpectations(t) | |||
} | |||
func TestCommit(t *testing.T) { | |||
testHeight := int64(1) | |||
testRound := int32(101) | |||
stateStoreMock := &statemocks.Store{} | |||
blockStoreMock := &statemocks.BlockStore{} | |||
blockStoreMock.On("Base").Return(int64(0)) | |||
blockStoreMock.On("Height").Return(testHeight) | |||
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{}, nil) | |||
blockStoreMock.On("LoadSeenCommit").Return(&types.Commit{ | |||
Height: testHeight, | |||
Round: testRound, | |||
}, nil) | |||
eventSinkMock := &indexermocks.EventSink{} | |||
eventSinkMock.On("Stop").Return(nil) | |||
rpcConfig := config.TestRPCConfig() | |||
l := log.TestingLogger() | |||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
wg := &sync.WaitGroup{} | |||
wg.Add(1) | |||
startedWG := &sync.WaitGroup{} | |||
startedWG.Add(1) | |||
go func() { | |||
startedWG.Done() | |||
defer wg.Done() | |||
require.NoError(t, d.Run(ctx)) | |||
}() | |||
// FIXME: used to induce context switch. | |||
// Determine more deterministic method for prompting a context switch | |||
startedWG.Wait() | |||
requireConnect(t, rpcConfig.ListenAddress, 20) | |||
cli, err := httpclient.New(rpcConfig.ListenAddress) | |||
require.NoError(t, err) | |||
res, err := cli.Commit(context.Background(), &testHeight) | |||
require.NoError(t, err) | |||
require.NotNil(t, res) | |||
require.Equal(t, res.SignedHeader.Commit.Round, testRound) | |||
cancel() | |||
wg.Wait() | |||
blockStoreMock.AssertExpectations(t) | |||
stateStoreMock.AssertExpectations(t) | |||
} | |||
func TestBlockByHash(t *testing.T) { | |||
testHeight := int64(1) | |||
testHash := []byte("test hash") | |||
testBlock := new(types.Block) | |||
testBlock.Header.Height = testHeight | |||
testBlock.Header.LastCommitHash = testHash | |||
stateStoreMock := &statemocks.Store{} | |||
blockStoreMock := &statemocks.BlockStore{} | |||
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{ | |||
BlockID: types.BlockID{ | |||
Hash: testHash, | |||
}, | |||
Header: types.Header{ | |||
Height: testHeight, | |||
}, | |||
}, nil) | |||
blockStoreMock.On("LoadBlockByHash", testHash).Return(testBlock, nil) | |||
eventSinkMock := &indexermocks.EventSink{} | |||
eventSinkMock.On("Stop").Return(nil) | |||
rpcConfig := config.TestRPCConfig() | |||
l := log.TestingLogger() | |||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
wg := &sync.WaitGroup{} | |||
wg.Add(1) | |||
startedWG := &sync.WaitGroup{} | |||
startedWG.Add(1) | |||
go func() { | |||
startedWG.Done() | |||
defer wg.Done() | |||
require.NoError(t, d.Run(ctx)) | |||
}() | |||
// FIXME: used to induce context switch. | |||
// Determine more deterministic method for prompting a context switch | |||
startedWG.Wait() | |||
requireConnect(t, rpcConfig.ListenAddress, 20) | |||
cli, err := httpclient.New(rpcConfig.ListenAddress) | |||
require.NoError(t, err) | |||
res, err := cli.BlockByHash(context.Background(), testHash) | |||
require.NoError(t, err) | |||
require.NotNil(t, res) | |||
require.Equal(t, []byte(res.BlockID.Hash), testHash) | |||
cancel() | |||
wg.Wait() | |||
blockStoreMock.AssertExpectations(t) | |||
stateStoreMock.AssertExpectations(t) | |||
} | |||
func TestBlockchain(t *testing.T) { | |||
testHeight := int64(1) | |||
testBlock := new(types.Block) | |||
testBlockHash := []byte("test hash") | |||
testBlock.Header.Height = testHeight | |||
testBlock.Header.LastCommitHash = testBlockHash | |||
stateStoreMock := &statemocks.Store{} | |||
blockStoreMock := &statemocks.BlockStore{} | |||
blockStoreMock.On("Height").Return(testHeight) | |||
blockStoreMock.On("Base").Return(int64(0)) | |||
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{ | |||
BlockID: types.BlockID{ | |||
Hash: testBlockHash, | |||
}, | |||
}) | |||
eventSinkMock := &indexermocks.EventSink{} | |||
eventSinkMock.On("Stop").Return(nil) | |||
rpcConfig := config.TestRPCConfig() | |||
l := log.TestingLogger() | |||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
wg := &sync.WaitGroup{} | |||
wg.Add(1) | |||
startedWG := &sync.WaitGroup{} | |||
startedWG.Add(1) | |||
go func() { | |||
startedWG.Done() | |||
defer wg.Done() | |||
require.NoError(t, d.Run(ctx)) | |||
}() | |||
// FIXME: used to induce context switch. | |||
// Determine more deterministic method for prompting a context switch | |||
startedWG.Wait() | |||
requireConnect(t, rpcConfig.ListenAddress, 20) | |||
cli, err := httpclient.New(rpcConfig.ListenAddress) | |||
require.NoError(t, err) | |||
res, err := cli.BlockchainInfo(context.Background(), 0, 100) | |||
require.NoError(t, err) | |||
require.NotNil(t, res) | |||
require.Equal(t, testBlockHash, []byte(res.BlockMetas[0].BlockID.Hash)) | |||
cancel() | |||
wg.Wait() | |||
blockStoreMock.AssertExpectations(t) | |||
stateStoreMock.AssertExpectations(t) | |||
} | |||
func TestValidators(t *testing.T) { | |||
testHeight := int64(1) | |||
testVotingPower := int64(100) | |||
testValidators := types.ValidatorSet{ | |||
Validators: []*types.Validator{ | |||
{ | |||
VotingPower: testVotingPower, | |||
}, | |||
}, | |||
} | |||
stateStoreMock := &statemocks.Store{} | |||
stateStoreMock.On("LoadValidators", testHeight).Return(&testValidators, nil) | |||
blockStoreMock := &statemocks.BlockStore{} | |||
blockStoreMock.On("Height").Return(testHeight) | |||
blockStoreMock.On("Base").Return(int64(0)) | |||
eventSinkMock := &indexermocks.EventSink{} | |||
eventSinkMock.On("Stop").Return(nil) | |||
rpcConfig := config.TestRPCConfig() | |||
l := log.TestingLogger() | |||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
wg := &sync.WaitGroup{} | |||
wg.Add(1) | |||
startedWG := &sync.WaitGroup{} | |||
startedWG.Add(1) | |||
go func() { | |||
startedWG.Done() | |||
defer wg.Done() | |||
require.NoError(t, d.Run(ctx)) | |||
}() | |||
// FIXME: used to induce context switch. | |||
// Determine more deterministic method for prompting a context switch | |||
startedWG.Wait() | |||
requireConnect(t, rpcConfig.ListenAddress, 20) | |||
cli, err := httpclient.New(rpcConfig.ListenAddress) | |||
require.NoError(t, err) | |||
testPage := 1 | |||
testPerPage := 100 | |||
res, err := cli.Validators(context.Background(), &testHeight, &testPage, &testPerPage) | |||
require.NoError(t, err) | |||
require.NotNil(t, res) | |||
require.Equal(t, testVotingPower, res.Validators[0].VotingPower) | |||
cancel() | |||
wg.Wait() | |||
blockStoreMock.AssertExpectations(t) | |||
stateStoreMock.AssertExpectations(t) | |||
} | |||
func TestBlockSearch(t *testing.T) { | |||
testHeight := int64(1) | |||
testBlockHash := []byte("test hash") | |||
testQuery := "block.height = 1" | |||
stateStoreMock := &statemocks.Store{} | |||
blockStoreMock := &statemocks.BlockStore{} | |||
eventSinkMock := &indexermocks.EventSink{} | |||
eventSinkMock.On("Stop").Return(nil) | |||
eventSinkMock.On("Type").Return(indexer.KV) | |||
blockStoreMock.On("LoadBlock", testHeight).Return(&types.Block{ | |||
Header: types.Header{ | |||
Height: testHeight, | |||
}, | |||
}, nil) | |||
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{ | |||
BlockID: types.BlockID{ | |||
Hash: testBlockHash, | |||
}, | |||
}) | |||
eventSinkMock.On("SearchBlockEvents", mock.Anything, | |||
mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })). | |||
Return([]int64{testHeight}, nil) | |||
rpcConfig := config.TestRPCConfig() | |||
l := log.TestingLogger() | |||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
wg := &sync.WaitGroup{} | |||
wg.Add(1) | |||
startedWG := &sync.WaitGroup{} | |||
startedWG.Add(1) | |||
go func() { | |||
startedWG.Done() | |||
defer wg.Done() | |||
require.NoError(t, d.Run(ctx)) | |||
}() | |||
// FIXME: used to induce context switch. | |||
// Determine more deterministic method for prompting a context switch | |||
startedWG.Wait() | |||
requireConnect(t, rpcConfig.ListenAddress, 20) | |||
cli, err := httpclient.New(rpcConfig.ListenAddress) | |||
require.NoError(t, err) | |||
testPage := 1 | |||
testPerPage := 100 | |||
testOrderBy := "desc" | |||
res, err := cli.BlockSearch(context.Background(), testQuery, &testPage, &testPerPage, testOrderBy) | |||
require.NoError(t, err) | |||
require.NotNil(t, res) | |||
require.Equal(t, testBlockHash, []byte(res.Blocks[0].BlockID.Hash)) | |||
cancel() | |||
wg.Wait() | |||
blockStoreMock.AssertExpectations(t) | |||
stateStoreMock.AssertExpectations(t) | |||
} | |||
func requireConnect(t testing.TB, addr string, retries int) { | |||
parts := strings.SplitN(addr, "://", 2) | |||
if len(parts) != 2 { | |||
t.Fatalf("malformed address to dial: %s", addr) | |||
} | |||
var err error | |||
for i := 0; i < retries; i++ { | |||
var conn net.Conn | |||
conn, err = net.Dial(parts[0], parts[1]) | |||
if err == nil { | |||
conn.Close() | |||
return | |||
} | |||
// FIXME attempt to yield and let the other goroutine continue execution. | |||
time.Sleep(time.Microsecond * 100) | |||
} | |||
t.Fatalf("unable to connect to server %s after %d tries: %s", addr, retries, err) | |||
} |
@ -0,0 +1,143 @@ | |||
package rpc | |||
import ( | |||
"context" | |||
"net/http" | |||
"time" | |||
"github.com/rs/cors" | |||
"github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/internal/consensus" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/libs/pubsub" | |||
"github.com/tendermint/tendermint/rpc/core" | |||
"github.com/tendermint/tendermint/rpc/jsonrpc/server" | |||
"github.com/tendermint/tendermint/state" | |||
"github.com/tendermint/tendermint/state/indexer" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// Server defines parameters for running an Inspector rpc server. | |||
type Server struct { | |||
Addr string // TCP address to listen on, ":http" if empty | |||
Handler http.Handler | |||
Logger log.Logger | |||
Config *config.RPCConfig | |||
} | |||
// Routes returns the set of routes used by the Inspector server. | |||
// | |||
//nolint: lll | |||
func Routes(cfg config.RPCConfig, s state.Store, bs state.BlockStore, es []indexer.EventSink, logger log.Logger) core.RoutesMap { | |||
env := &core.Environment{ | |||
Config: cfg, | |||
EventSinks: es, | |||
StateStore: s, | |||
BlockStore: bs, | |||
ConsensusReactor: waitSyncCheckerImpl{}, | |||
Logger: logger, | |||
} | |||
return core.RoutesMap{ | |||
"blockchain": server.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true), | |||
"consensus_params": server.NewRPCFunc(env.ConsensusParams, "height", true), | |||
"block": server.NewRPCFunc(env.Block, "height", true), | |||
"block_by_hash": server.NewRPCFunc(env.BlockByHash, "hash", true), | |||
"block_results": server.NewRPCFunc(env.BlockResults, "height", true), | |||
"commit": server.NewRPCFunc(env.Commit, "height", true), | |||
"validators": server.NewRPCFunc(env.Validators, "height,page,per_page", true), | |||
"tx": server.NewRPCFunc(env.Tx, "hash,prove", true), | |||
"tx_search": server.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false), | |||
"block_search": server.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false), | |||
} | |||
} | |||
// Handler returns the http.Handler configured for use with an Inspector server. Handler | |||
// registers the routes on the http.Handler and also registers the websocket handler | |||
// and the CORS handler if specified by the configuration options. | |||
func Handler(rpcConfig *config.RPCConfig, routes core.RoutesMap, logger log.Logger) http.Handler { | |||
mux := http.NewServeMux() | |||
wmLogger := logger.With("protocol", "websocket") | |||
var eventBus types.EventBusSubscriber | |||
websocketDisconnectFn := func(remoteAddr string) { | |||
err := eventBus.UnsubscribeAll(context.Background(), remoteAddr) | |||
if err != nil && err != pubsub.ErrSubscriptionNotFound { | |||
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) | |||
} | |||
} | |||
wm := server.NewWebsocketManager(routes, | |||
server.OnDisconnect(websocketDisconnectFn), | |||
server.ReadLimit(rpcConfig.MaxBodyBytes)) | |||
wm.SetLogger(wmLogger) | |||
mux.HandleFunc("/websocket", wm.WebsocketHandler) | |||
server.RegisterRPCFuncs(mux, routes, logger) | |||
var rootHandler http.Handler = mux | |||
if rpcConfig.IsCorsEnabled() { | |||
rootHandler = addCORSHandler(rpcConfig, mux) | |||
} | |||
return rootHandler | |||
} | |||
func addCORSHandler(rpcConfig *config.RPCConfig, h http.Handler) http.Handler { | |||
corsMiddleware := cors.New(cors.Options{ | |||
AllowedOrigins: rpcConfig.CORSAllowedOrigins, | |||
AllowedMethods: rpcConfig.CORSAllowedMethods, | |||
AllowedHeaders: rpcConfig.CORSAllowedHeaders, | |||
}) | |||
h = corsMiddleware.Handler(h) | |||
return h | |||
} | |||
type waitSyncCheckerImpl struct{} | |||
func (waitSyncCheckerImpl) WaitSync() bool { | |||
return false | |||
} | |||
func (waitSyncCheckerImpl) GetPeerState(peerID types.NodeID) (*consensus.PeerState, bool) { | |||
return nil, false | |||
} | |||
// ListenAndServe listens on the address specified in srv.Addr and handles any | |||
// incoming requests over HTTP using the Inspector rpc handler specified on the server. | |||
func (srv *Server) ListenAndServe(ctx context.Context) error { | |||
listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections) | |||
if err != nil { | |||
return err | |||
} | |||
go func() { | |||
<-ctx.Done() | |||
listener.Close() | |||
}() | |||
return server.Serve(listener, srv.Handler, srv.Logger, serverRPCConfig(srv.Config)) | |||
} | |||
// ListenAndServeTLS listens on the address specified in srv.Addr. ListenAndServeTLS handles | |||
// incoming requests over HTTPS using the Inspector rpc handler specified on the server. | |||
func (srv *Server) ListenAndServeTLS(ctx context.Context, certFile, keyFile string) error { | |||
listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections) | |||
if err != nil { | |||
return err | |||
} | |||
go func() { | |||
<-ctx.Done() | |||
listener.Close() | |||
}() | |||
return server.ServeTLS(listener, srv.Handler, certFile, keyFile, srv.Logger, serverRPCConfig(srv.Config)) | |||
} | |||
func serverRPCConfig(r *config.RPCConfig) *server.Config { | |||
cfg := server.DefaultConfig() | |||
cfg.MaxBodyBytes = r.MaxBodyBytes | |||
cfg.MaxHeaderBytes = r.MaxHeaderBytes | |||
// If necessary adjust global WriteTimeout to ensure it's greater than | |||
// TimeoutBroadcastTxCommit. | |||
// See https://github.com/tendermint/tendermint/issues/3435 | |||
if cfg.WriteTimeout <= r.TimeoutBroadcastTxCommit { | |||
cfg.WriteTimeout = r.TimeoutBroadcastTxCommit + 1*time.Second | |||
} | |||
return cfg | |||
} |
@ -0,0 +1,65 @@ | |||
package sink | |||
import ( | |||
"errors" | |||
"strings" | |||
"github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/state/indexer" | |||
"github.com/tendermint/tendermint/state/indexer/sink/kv" | |||
"github.com/tendermint/tendermint/state/indexer/sink/null" | |||
"github.com/tendermint/tendermint/state/indexer/sink/psql" | |||
) | |||
// EventSinksFromConfig constructs a slice of indexer.EventSink using the provided | |||
// configuration. | |||
// | |||
//nolint:lll | |||
func EventSinksFromConfig(cfg *config.Config, dbProvider config.DBProvider, chainID string) ([]indexer.EventSink, error) { | |||
if len(cfg.TxIndex.Indexer) == 0 { | |||
return []indexer.EventSink{null.NewEventSink()}, nil | |||
} | |||
// check for duplicated sinks | |||
sinks := map[string]struct{}{} | |||
for _, s := range cfg.TxIndex.Indexer { | |||
sl := strings.ToLower(s) | |||
if _, ok := sinks[sl]; ok { | |||
return nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml") | |||
} | |||
sinks[sl] = struct{}{} | |||
} | |||
eventSinks := []indexer.EventSink{} | |||
for k := range sinks { | |||
switch indexer.EventSinkType(k) { | |||
case indexer.NULL: | |||
// When we see null in the config, the eventsinks will be reset with the | |||
// nullEventSink. | |||
return []indexer.EventSink{null.NewEventSink()}, nil | |||
case indexer.KV: | |||
store, err := dbProvider(&config.DBContext{ID: "tx_index", Config: cfg}) | |||
if err != nil { | |||
return nil, err | |||
} | |||
eventSinks = append(eventSinks, kv.NewEventSink(store)) | |||
case indexer.PSQL: | |||
conn := cfg.TxIndex.PsqlConn | |||
if conn == "" { | |||
return nil, errors.New("the psql connection settings cannot be empty") | |||
} | |||
es, _, err := psql.NewEventSink(conn, chainID) | |||
if err != nil { | |||
return nil, err | |||
} | |||
eventSinks = append(eventSinks, es) | |||
default: | |||
return nil, errors.New("unsupported event sink type") | |||
} | |||
} | |||
return eventSinks, nil | |||
} |