Browse Source

more cleanup

pull/8067/head
tycho garen 3 years ago
parent
commit
fa1520dd90
10 changed files with 73 additions and 130 deletions
  1. +0
    -32
      abci/client/creators.go
  2. +3
    -0
      internal/consensus/replay.go
  3. +6
    -2
      internal/consensus/replay_file.go
  4. +2
    -13
      internal/consensus/replay_stubs.go
  5. +12
    -11
      internal/consensus/replay_test.go
  6. +2
    -2
      internal/consensus/wal_generator.go
  7. +16
    -29
      internal/proxy/app_conn.go
  8. +13
    -8
      internal/proxy/client.go
  9. +18
    -32
      internal/proxy/multi_app_conn.go
  10. +1
    -1
      node/node.go

+ 0
- 32
abci/client/creators.go View File

@ -1,33 +1 @@
package abciclient
import (
"fmt"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
)
// Creator creates new ABCI clients.
type Creator func(log.Logger) (Client, error)
// NewLocalCreator returns a Creator for the given app,
// which will be running locally.
func NewLocalCreator(app types.Application) Creator {
return func(logger log.Logger) (Client, error) {
return NewLocalClient(logger, app), nil
}
}
// NewRemoteCreator returns a Creator for the given address (e.g.
// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you
// want the client to connect before reporting success.
func NewRemoteCreator(logger log.Logger, addr, transport string, mustConnect bool) Creator {
return func(log.Logger) (Client, error) {
remoteApp, err := NewClient(logger, addr, transport, mustConnect)
if err != nil {
return nil, fmt.Errorf("failed to connect to proxy: %w", err)
}
return remoteApp, nil
}
}

+ 3
- 0
internal/consensus/replay.go View File

@ -427,6 +427,9 @@ func (h *Handshaker) ReplayBlocks(
if err != nil {
return nil, err
}
if err := mockApp.Start(ctx); err != nil {
return nil, err
}
h.logger.Info("Replay last block using mock app")
state, err = h.replayBlock(ctx, state, storeBlockHeight, mockApp)


+ 6
- 2
internal/consensus/replay_file.go View File

@ -327,8 +327,12 @@ func newConsensusStateForReplay(
}
// Create proxyAppConn connection (consensus, mempool, query)
clientCreator, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics())
client, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
if err != nil {
return nil, err
}
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
err = proxyApp.Start(ctx)
if err != nil {
return nil, fmt.Errorf("starting proxy app conns: %w", err)


+ 2
- 13
internal/consensus/replay_stubs.go View File

@ -62,21 +62,10 @@ func newMockProxyApp(
appHash []byte,
abciResponses *tmstate.ABCIResponses,
) (abciclient.Client, error) {
clientCreator := abciclient.NewLocalCreator(&mockProxyApp{
return proxy.New(abciclient.NewLocalClient(logger, &mockProxyApp{
appHash: appHash,
abciResponses: abciResponses,
})
cli, err := clientCreator(logger)
if err != nil {
return nil, err
}
if err = cli.Start(ctx); err != nil {
return nil, err
}
return proxy.New(clientCreator, logger, proxy.NopMetrics()), nil
}), logger, proxy.NopMetrics()), nil
}
type mockProxyApp struct {


+ 12
- 11
internal/consensus/replay_test.go View File

@ -804,7 +804,7 @@ func testHandshakeReplay(
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int())))
t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) })
clientCreator2 := abciclient.NewLocalCreator(kvstoreApp)
clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp)
if nBlocks > 0 {
// run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state
@ -831,7 +831,8 @@ func testHandshakeReplay(
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
require.True(t, proxyApp.IsRunning())
require.NotNil(t, proxyApp)
t.Cleanup(func() { cancel(); proxyApp.Wait() })
err = handshaker.Handshake(ctx, proxyApp)
@ -958,9 +959,9 @@ func buildTMStateFromChain(
kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger,
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))
defer kvstoreApp.Close()
clientCreator := abciclient.NewLocalCreator(kvstoreApp)
client := abciclient.NewLocalClient(logger, kvstoreApp)
proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics())
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx))
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
@ -1031,8 +1032,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
// - 0x03
{
app := &badApp{numBlocks: 3, allHashesAreWrong: true}
clientCreator := abciclient.NewLocalCreator(app)
proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics())
client := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); proxyApp.Wait() })
@ -1051,8 +1052,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
// - RANDOM HASH
{
app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true}
clientCreator := abciclient.NewLocalCreator(app)
proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics())
client := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); proxyApp.Wait() })
@ -1282,12 +1283,13 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewNopLogger()
votePower := 10 + int64(rand.Uint32())
val, _, err := factory.Validator(ctx, votePower)
require.NoError(t, err)
vals := types.NewValidatorSet([]*types.Validator{val})
app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
clientCreator := abciclient.NewLocalCreator(app)
client := abciclient.NewLocalClient(logger, app)
cfg, err := ResetConfig(t.TempDir(), "handshake_test_")
require.NoError(t, err)
@ -1306,9 +1308,8 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)
logger := log.TestingLogger()
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics())
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
require.NoError(t, handshaker.Handshake(ctx, proxyApp), "error on abci handshake")


+ 2
- 2
internal/consensus/wal_generator.go View File

@ -67,8 +67,8 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
}
blockStore := store.NewBlockStore(blockStoreDB)
proxyApp := proxy.New(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics())
proxyLogger := logger.With("module", "proxy")
proxyApp := proxy.New(abciclient.NewLocalClient(logger, app), proxyLogger, proxy.NopMetrics())
if err := proxyApp.Start(ctx); err != nil {
t.Fatal(fmt.Errorf("failed to start proxy app connections: %w", err))
}


+ 16
- 29
internal/proxy/app_conn.go View File

@ -6,100 +6,87 @@ import (
"github.com/go-kit/kit/metrics"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/types"
)
type proxyClient struct {
metrics *Metrics
abciclient.Client
}
func newProxyClient(appConn abciclient.Client, metrics *Metrics) abciclient.Client {
return &proxyClient{
metrics: metrics,
Client: appConn,
}
}
func (app *proxyClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))()
return app.Client.InitChain(ctx, req)
return app.client.InitChain(ctx, req)
}
func (app *proxyClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))()
return app.Client.PrepareProposal(ctx, req)
return app.client.PrepareProposal(ctx, req)
}
func (app *proxyClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))()
return app.Client.ProcessProposal(ctx, req)
return app.client.ProcessProposal(ctx, req)
}
func (app *proxyClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))()
return app.Client.ExtendVote(ctx, req)
return app.client.ExtendVote(ctx, req)
}
func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))()
return app.Client.VerifyVoteExtension(ctx, req)
return app.client.VerifyVoteExtension(ctx, req)
}
func (app *proxyClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))()
return app.Client.FinalizeBlock(ctx, req)
return app.client.FinalizeBlock(ctx, req)
}
func (app *proxyClient) Commit(ctx context.Context) (*types.ResponseCommit, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))()
return app.Client.Commit(ctx)
return app.client.Commit(ctx)
}
func (app *proxyClient) Flush(ctx context.Context) error {
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))()
return app.Client.Flush(ctx)
return app.client.Flush(ctx)
}
func (app *proxyClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))()
return app.Client.CheckTx(ctx, req)
return app.client.CheckTx(ctx, req)
}
func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))()
return app.Client.Echo(ctx, msg)
return app.client.Echo(ctx, msg)
}
func (app *proxyClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))()
return app.Client.Info(ctx, req)
return app.client.Info(ctx, req)
}
func (app *proxyClient) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))()
return app.Client.Query(ctx, reqQuery)
return app.client.Query(ctx, reqQuery)
}
func (app *proxyClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))()
return app.Client.ListSnapshots(ctx, req)
return app.client.ListSnapshots(ctx, req)
}
func (app *proxyClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))()
return app.Client.OfferSnapshot(ctx, req)
return app.client.OfferSnapshot(ctx, req)
}
func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))()
return app.Client.LoadSnapshotChunk(ctx, req)
return app.client.LoadSnapshotChunk(ctx, req)
}
func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))()
return app.Client.ApplySnapshotChunk(ctx, req)
return app.client.ApplySnapshotChunk(ctx, req)
}
// addTimeSample returns a function that, when called, adds an observation to m.


+ 13
- 8
internal/proxy/client.go View File

@ -10,30 +10,35 @@ import (
e2e "github.com/tendermint/tendermint/test/e2e/app"
)
// DefaultClientCreator returns a default ClientCreator, which will create a
// ClientFactory returns a default ClientCreator, which will create a
// local client if addr is one of: 'kvstore',
// 'persistent_kvstore', 'e2e', or 'noop', otherwise - a remote client.
//
// The Closer is a noop except for persistent_kvstore applications,
// which will clean up the store.
func DefaultClientCreator(logger log.Logger, addr, transport, dbDir string) (abciclient.Creator, io.Closer) {
func ClientFactory(logger log.Logger, addr, transport, dbDir string) (abciclient.Client, io.Closer, error) {
switch addr {
case "kvstore":
return abciclient.NewLocalCreator(kvstore.NewApplication()), noopCloser{}
return abciclient.NewLocalClient(logger, kvstore.NewApplication()), noopCloser{}, nil
case "persistent_kvstore":
app := kvstore.NewPersistentKVStoreApplication(logger, dbDir)
return abciclient.NewLocalCreator(app), app
return abciclient.NewLocalClient(logger, app), app, nil
case "e2e":
app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir))
if err != nil {
panic(err)
return nil, noopCloser{}, err
}
return abciclient.NewLocalCreator(app), noopCloser{}
return abciclient.NewLocalClient(logger, app), noopCloser{}, nil
case "noop":
return abciclient.NewLocalCreator(types.NewBaseApplication()), noopCloser{}
return abciclient.NewLocalClient(logger, types.NewBaseApplication()), noopCloser{}, nil
default:
mustConnect := false // loop retrying
return abciclient.NewRemoteCreator(logger, addr, transport, mustConnect), noopCloser{}
client, err := abciclient.NewClient(logger, addr, transport, mustConnect)
if err != nil {
return nil, noopCloser{}, err
}
return client, noopCloser{}, nil
}
}


+ 18
- 32
internal/proxy/multi_app_conn.go View File

@ -11,32 +11,27 @@ import (
)
// New creates a proxy application interface.
func New(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) abciclient.Client {
multiAppConn := &proxyConn{
logger: logger,
metrics: metrics,
clientCreator: clientCreator,
func New(client abciclient.Client, logger log.Logger, metrics *Metrics) abciclient.Client {
conn := &proxyClient{
logger: logger,
metrics: metrics,
client: client,
}
multiAppConn.BaseService = *service.NewBaseService(logger, "multiAppConn", multiAppConn)
return multiAppConn
conn.BaseService = *service.NewBaseService(logger, "proxyClient", conn)
return conn
}
// proxyConn implements provides the application connection.
type proxyConn struct {
// proxyClient implements provides the application connection.
type proxyClient struct {
service.BaseService
abciclient.Client
logger log.Logger
client abciclient.Client
metrics *Metrics
clientCreator abciclient.Creator
}
func (app *proxyConn) OnStop() { tryCallStop(app.Client) }
func (app *proxyConn) IsRunning() bool { return app.Client.IsRunning() }
func (app *proxyConn) Start(ctx context.Context) error { return app.BaseService.Start(ctx) }
func (app *proxyConn) Wait() { app.BaseService.Wait() }
func (app *proxyClient) OnStop() { tryCallStop(app.client) }
func (app *proxyClient) Error() error { return app.client.Error() }
func tryCallStop(client abciclient.Client) {
switch c := client.(type) {
@ -44,37 +39,28 @@ func tryCallStop(client abciclient.Client) {
return
case interface{ Stop() }:
c.Stop()
case *proxyClient:
tryCallStop(c.Client)
}
}
func (app *proxyConn) OnStart(ctx context.Context) error {
func (app *proxyClient) OnStart(ctx context.Context) error {
var err error
defer func() {
if err != nil {
tryCallStop(app.Client)
tryCallStop(app.client)
}
}()
var client abciclient.Client
client, err = app.clientCreator(app.logger)
if err != nil {
return err
}
app.Client = newProxyClient(client, app.metrics)
// Kill Tendermint if the ABCI application crashes.
go func() {
if !app.Client.IsRunning() {
if !app.client.IsRunning() {
return
}
app.Client.Wait()
app.client.Wait()
if ctx.Err() != nil {
return
}
if err := app.Client.Error(); err != nil {
if err := app.client.Error(); err != nil {
app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint",
"err", err)
if killErr := kill(); killErr != nil {
@ -85,7 +71,7 @@ func (app *proxyConn) OnStart(ctx context.Context) error {
}()
return client.Start(ctx)
return app.client.Start(ctx)
}
func kill() error {


+ 1
- 1
node/node.go View File

@ -100,7 +100,7 @@ func newDefaultNode(
return nil, err
}
appClient, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
appClient, _ := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
return makeNode(
ctx,


Loading…
Cancel
Save