From fa1520dd90e355e2445d87f2ea595593e5db4b13 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 4 Mar 2022 14:59:43 -0500 Subject: [PATCH] more cleanup --- abci/client/creators.go | 32 ------------------ internal/consensus/replay.go | 3 ++ internal/consensus/replay_file.go | 8 +++-- internal/consensus/replay_stubs.go | 15 ++------- internal/consensus/replay_test.go | 23 ++++++------- internal/consensus/wal_generator.go | 4 +-- internal/proxy/app_conn.go | 45 +++++++++----------------- internal/proxy/client.go | 21 +++++++----- internal/proxy/multi_app_conn.go | 50 +++++++++++------------------ node/node.go | 2 +- 10 files changed, 73 insertions(+), 130 deletions(-) diff --git a/abci/client/creators.go b/abci/client/creators.go index 1eaa95d64..33c5d1a5f 100644 --- a/abci/client/creators.go +++ b/abci/client/creators.go @@ -1,33 +1 @@ package abciclient - -import ( - "fmt" - - "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/log" -) - -// Creator creates new ABCI clients. -type Creator func(log.Logger) (Client, error) - -// NewLocalCreator returns a Creator for the given app, -// which will be running locally. -func NewLocalCreator(app types.Application) Creator { - return func(logger log.Logger) (Client, error) { - return NewLocalClient(logger, app), nil - } -} - -// NewRemoteCreator returns a Creator for the given address (e.g. -// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you -// want the client to connect before reporting success. -func NewRemoteCreator(logger log.Logger, addr, transport string, mustConnect bool) Creator { - return func(log.Logger) (Client, error) { - remoteApp, err := NewClient(logger, addr, transport, mustConnect) - if err != nil { - return nil, fmt.Errorf("failed to connect to proxy: %w", err) - } - - return remoteApp, nil - } -} diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 9381cdd29..34c18c1fe 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -427,6 +427,9 @@ func (h *Handshaker) ReplayBlocks( if err != nil { return nil, err } + if err := mockApp.Start(ctx); err != nil { + return nil, err + } h.logger.Info("Replay last block using mock app") state, err = h.replayBlock(ctx, state, storeBlockHeight, mockApp) diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index c6919e3e8..94e3b96df 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -327,8 +327,12 @@ func newConsensusStateForReplay( } // Create proxyAppConn connection (consensus, mempool, query) - clientCreator, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) - proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics()) + client, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) + if err != nil { + return nil, err + } + + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) err = proxyApp.Start(ctx) if err != nil { return nil, fmt.Errorf("starting proxy app conns: %w", err) diff --git a/internal/consensus/replay_stubs.go b/internal/consensus/replay_stubs.go index c6cbd470d..a2f669eac 100644 --- a/internal/consensus/replay_stubs.go +++ b/internal/consensus/replay_stubs.go @@ -62,21 +62,10 @@ func newMockProxyApp( appHash []byte, abciResponses *tmstate.ABCIResponses, ) (abciclient.Client, error) { - - clientCreator := abciclient.NewLocalCreator(&mockProxyApp{ + return proxy.New(abciclient.NewLocalClient(logger, &mockProxyApp{ appHash: appHash, abciResponses: abciResponses, - }) - cli, err := clientCreator(logger) - if err != nil { - return nil, err - } - - if err = cli.Start(ctx); err != nil { - return nil, err - } - - return proxy.New(clientCreator, logger, proxy.NopMetrics()), nil + }), logger, proxy.NopMetrics()), nil } type mockProxyApp struct { diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index fd84e71d7..4c2830071 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -804,7 +804,7 @@ func testHandshakeReplay( filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int()))) t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) }) - clientCreator2 := abciclient.NewLocalCreator(kvstoreApp) + clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp) if nBlocks > 0 { // run nBlocks against a new client to build up the app state. // use a throwaway tendermint state @@ -831,7 +831,8 @@ func testHandshakeReplay( handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") - + require.True(t, proxyApp.IsRunning()) + require.NotNil(t, proxyApp) t.Cleanup(func() { cancel(); proxyApp.Wait() }) err = handshaker.Handshake(ctx, proxyApp) @@ -958,9 +959,9 @@ func buildTMStateFromChain( kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger, filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode))) defer kvstoreApp.Close() - clientCreator := abciclient.NewLocalCreator(kvstoreApp) + client := abciclient.NewLocalClient(logger, kvstoreApp) - proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics()) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version @@ -1031,8 +1032,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { // - 0x03 { app := &badApp{numBlocks: 3, allHashesAreWrong: true} - clientCreator := abciclient.NewLocalCreator(app) - proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics()) + client := abciclient.NewLocalClient(logger, app) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) t.Cleanup(func() { cancel(); proxyApp.Wait() }) @@ -1051,8 +1052,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { // - RANDOM HASH { app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true} - clientCreator := abciclient.NewLocalCreator(app) - proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics()) + client := abciclient.NewLocalClient(logger, app) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) t.Cleanup(func() { cancel(); proxyApp.Wait() }) @@ -1282,12 +1283,13 @@ func TestHandshakeUpdatesValidators(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := log.NewNopLogger() votePower := 10 + int64(rand.Uint32()) val, _, err := factory.Validator(ctx, votePower) require.NoError(t, err) vals := types.NewValidatorSet([]*types.Validator{val}) app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)} - clientCreator := abciclient.NewLocalCreator(app) + client := abciclient.NewLocalClient(logger, app) cfg, err := ResetConfig(t.TempDir(), "handshake_test_") require.NoError(t, err) @@ -1306,9 +1308,8 @@ func TestHandshakeUpdatesValidators(t *testing.T) { genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) require.NoError(t, err) - logger := log.TestingLogger() handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) - proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics()) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") require.NoError(t, handshaker.Handshake(ctx, proxyApp), "error on abci handshake") diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index 4927458d1..9608365c1 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -67,8 +67,8 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr } blockStore := store.NewBlockStore(blockStoreDB) - - proxyApp := proxy.New(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics()) + proxyLogger := logger.With("module", "proxy") + proxyApp := proxy.New(abciclient.NewLocalClient(logger, app), proxyLogger, proxy.NopMetrics()) if err := proxyApp.Start(ctx); err != nil { t.Fatal(fmt.Errorf("failed to start proxy app connections: %w", err)) } diff --git a/internal/proxy/app_conn.go b/internal/proxy/app_conn.go index 99226a743..3d023564f 100644 --- a/internal/proxy/app_conn.go +++ b/internal/proxy/app_conn.go @@ -6,100 +6,87 @@ import ( "github.com/go-kit/kit/metrics" - abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/types" ) -type proxyClient struct { - metrics *Metrics - abciclient.Client -} - -func newProxyClient(appConn abciclient.Client, metrics *Metrics) abciclient.Client { - return &proxyClient{ - metrics: metrics, - Client: appConn, - } -} - func (app *proxyClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))() - return app.Client.InitChain(ctx, req) + return app.client.InitChain(ctx, req) } func (app *proxyClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))() - return app.Client.PrepareProposal(ctx, req) + return app.client.PrepareProposal(ctx, req) } func (app *proxyClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))() - return app.Client.ProcessProposal(ctx, req) + return app.client.ProcessProposal(ctx, req) } func (app *proxyClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))() - return app.Client.ExtendVote(ctx, req) + return app.client.ExtendVote(ctx, req) } func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))() - return app.Client.VerifyVoteExtension(ctx, req) + return app.client.VerifyVoteExtension(ctx, req) } func (app *proxyClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))() - return app.Client.FinalizeBlock(ctx, req) + return app.client.FinalizeBlock(ctx, req) } func (app *proxyClient) Commit(ctx context.Context) (*types.ResponseCommit, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))() - return app.Client.Commit(ctx) + return app.client.Commit(ctx) } func (app *proxyClient) Flush(ctx context.Context) error { defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))() - return app.Client.Flush(ctx) + return app.client.Flush(ctx) } func (app *proxyClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() - return app.Client.CheckTx(ctx, req) + return app.client.CheckTx(ctx, req) } func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))() - return app.Client.Echo(ctx, msg) + return app.client.Echo(ctx, msg) } func (app *proxyClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))() - return app.Client.Info(ctx, req) + return app.client.Info(ctx, req) } func (app *proxyClient) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))() - return app.Client.Query(ctx, reqQuery) + return app.client.Query(ctx, reqQuery) } func (app *proxyClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))() - return app.Client.ListSnapshots(ctx, req) + return app.client.ListSnapshots(ctx, req) } func (app *proxyClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))() - return app.Client.OfferSnapshot(ctx, req) + return app.client.OfferSnapshot(ctx, req) } func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))() - return app.Client.LoadSnapshotChunk(ctx, req) + return app.client.LoadSnapshotChunk(ctx, req) } func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))() - return app.Client.ApplySnapshotChunk(ctx, req) + return app.client.ApplySnapshotChunk(ctx, req) } // addTimeSample returns a function that, when called, adds an observation to m. diff --git a/internal/proxy/client.go b/internal/proxy/client.go index d01634bdf..9983c725e 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -10,30 +10,35 @@ import ( e2e "github.com/tendermint/tendermint/test/e2e/app" ) -// DefaultClientCreator returns a default ClientCreator, which will create a +// ClientFactory returns a default ClientCreator, which will create a // local client if addr is one of: 'kvstore', // 'persistent_kvstore', 'e2e', or 'noop', otherwise - a remote client. // // The Closer is a noop except for persistent_kvstore applications, // which will clean up the store. -func DefaultClientCreator(logger log.Logger, addr, transport, dbDir string) (abciclient.Creator, io.Closer) { +func ClientFactory(logger log.Logger, addr, transport, dbDir string) (abciclient.Client, io.Closer, error) { switch addr { case "kvstore": - return abciclient.NewLocalCreator(kvstore.NewApplication()), noopCloser{} + return abciclient.NewLocalClient(logger, kvstore.NewApplication()), noopCloser{}, nil case "persistent_kvstore": app := kvstore.NewPersistentKVStoreApplication(logger, dbDir) - return abciclient.NewLocalCreator(app), app + return abciclient.NewLocalClient(logger, app), app, nil case "e2e": app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir)) if err != nil { - panic(err) + return nil, noopCloser{}, err } - return abciclient.NewLocalCreator(app), noopCloser{} + return abciclient.NewLocalClient(logger, app), noopCloser{}, nil case "noop": - return abciclient.NewLocalCreator(types.NewBaseApplication()), noopCloser{} + return abciclient.NewLocalClient(logger, types.NewBaseApplication()), noopCloser{}, nil default: mustConnect := false // loop retrying - return abciclient.NewRemoteCreator(logger, addr, transport, mustConnect), noopCloser{} + client, err := abciclient.NewClient(logger, addr, transport, mustConnect) + if err != nil { + return nil, noopCloser{}, err + } + + return client, noopCloser{}, nil } } diff --git a/internal/proxy/multi_app_conn.go b/internal/proxy/multi_app_conn.go index 0dbbd3e2e..8ab27b99a 100644 --- a/internal/proxy/multi_app_conn.go +++ b/internal/proxy/multi_app_conn.go @@ -11,32 +11,27 @@ import ( ) // New creates a proxy application interface. -func New(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) abciclient.Client { - multiAppConn := &proxyConn{ - logger: logger, - metrics: metrics, - clientCreator: clientCreator, +func New(client abciclient.Client, logger log.Logger, metrics *Metrics) abciclient.Client { + conn := &proxyClient{ + logger: logger, + metrics: metrics, + client: client, } - multiAppConn.BaseService = *service.NewBaseService(logger, "multiAppConn", multiAppConn) - return multiAppConn + conn.BaseService = *service.NewBaseService(logger, "proxyClient", conn) + return conn } -// proxyConn implements provides the application connection. -type proxyConn struct { +// proxyClient implements provides the application connection. +type proxyClient struct { service.BaseService - abciclient.Client - logger log.Logger + client abciclient.Client metrics *Metrics - - clientCreator abciclient.Creator } -func (app *proxyConn) OnStop() { tryCallStop(app.Client) } -func (app *proxyConn) IsRunning() bool { return app.Client.IsRunning() } -func (app *proxyConn) Start(ctx context.Context) error { return app.BaseService.Start(ctx) } -func (app *proxyConn) Wait() { app.BaseService.Wait() } +func (app *proxyClient) OnStop() { tryCallStop(app.client) } +func (app *proxyClient) Error() error { return app.client.Error() } func tryCallStop(client abciclient.Client) { switch c := client.(type) { @@ -44,37 +39,28 @@ func tryCallStop(client abciclient.Client) { return case interface{ Stop() }: c.Stop() - case *proxyClient: - tryCallStop(c.Client) } } -func (app *proxyConn) OnStart(ctx context.Context) error { +func (app *proxyClient) OnStart(ctx context.Context) error { var err error defer func() { if err != nil { - tryCallStop(app.Client) + tryCallStop(app.client) } }() - var client abciclient.Client - client, err = app.clientCreator(app.logger) - if err != nil { - return err - } - - app.Client = newProxyClient(client, app.metrics) // Kill Tendermint if the ABCI application crashes. go func() { - if !app.Client.IsRunning() { + if !app.client.IsRunning() { return } - app.Client.Wait() + app.client.Wait() if ctx.Err() != nil { return } - if err := app.Client.Error(); err != nil { + if err := app.client.Error(); err != nil { app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint", "err", err) if killErr := kill(); killErr != nil { @@ -85,7 +71,7 @@ func (app *proxyConn) OnStart(ctx context.Context) error { }() - return client.Start(ctx) + return app.client.Start(ctx) } func kill() error { diff --git a/node/node.go b/node/node.go index aa697df46..13c37d3b5 100644 --- a/node/node.go +++ b/node/node.go @@ -100,7 +100,7 @@ func newDefaultNode( return nil, err } - appClient, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) + appClient, _ := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) return makeNode( ctx,