diff --git a/abci/client/creators.go b/abci/client/creators.go deleted file mode 100644 index 1eaa95d64..000000000 --- a/abci/client/creators.go +++ /dev/null @@ -1,33 +0,0 @@ -package abciclient - -import ( - "fmt" - - "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/log" -) - -// Creator creates new ABCI clients. -type Creator func(log.Logger) (Client, error) - -// NewLocalCreator returns a Creator for the given app, -// which will be running locally. -func NewLocalCreator(app types.Application) Creator { - return func(logger log.Logger) (Client, error) { - return NewLocalClient(logger, app), nil - } -} - -// NewRemoteCreator returns a Creator for the given address (e.g. -// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you -// want the client to connect before reporting success. -func NewRemoteCreator(logger log.Logger, addr, transport string, mustConnect bool) Creator { - return func(log.Logger) (Client, error) { - remoteApp, err := NewClient(logger, addr, transport, mustConnect) - if err != nil { - return nil, fmt.Errorf("failed to connect to proxy: %w", err) - } - - return remoteApp, nil - } -} diff --git a/go.mod b/go.mod index 136dd3af3..c795f1319 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,12 @@ require ( pgregory.net/rapid v0.4.7 ) +require ( + github.com/creachadair/atomicfile v0.2.4 + github.com/google/go-cmp v0.5.7 + gotest.tools v2.2.0+incompatible +) + require ( 4d63.com/gochecknoglobals v0.1.0 // indirect github.com/Antonboom/errname v0.1.5 // indirect @@ -67,7 +73,6 @@ require ( github.com/chavacava/garif v0.0.0-20210405164556-e8a0a408d6af // indirect github.com/containerd/continuity v0.2.1 // indirect github.com/daixiang0/gci v0.3.1-0.20220208004058-76d765e3ab48 // indirect - github.com/creachadair/atomicfile v0.2.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/denis-tingajkin/go-header v0.4.2 // indirect github.com/dgraph-io/badger/v2 v2.2007.2 // indirect @@ -107,7 +112,6 @@ require ( github.com/golangci/revgrep v0.0.0-20210930125155-c22e5001d4f2 // indirect github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect github.com/google/btree v1.0.0 // indirect - github.com/google/go-cmp v0.5.7 // indirect github.com/gordonklaus/ineffassign v0.0.0-20210914165742-4cc7213b9bc8 // indirect github.com/gostaticanalysis/analysisutil v0.7.1 // indirect github.com/gostaticanalysis/comment v1.4.2 // indirect diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 00ab14a86..d73a522fa 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -33,7 +33,7 @@ type reactorTestSuite struct { nodes []types.NodeID reactors map[types.NodeID]*Reactor - app map[types.NodeID]proxy.AppConns + app map[types.NodeID]abciclient.Client blockSyncChannels map[types.NodeID]*p2p.Channel peerChans map[types.NodeID]chan p2p.PeerUpdate @@ -64,7 +64,7 @@ func setup( network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}), nodes: make([]types.NodeID, 0, numNodes), reactors: make(map[types.NodeID]*Reactor, numNodes), - app: make(map[types.NodeID]proxy.AppConns, numNodes), + app: make(map[types.NodeID]abciclient.Client, numNodes), blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes), peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), @@ -109,7 +109,7 @@ func (rts *reactorTestSuite) addNode( logger := log.TestingLogger() rts.nodes = append(rts.nodes, nodeID) - rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), logger, proxy.NopMetrics()) + rts.app[nodeID] = proxy.New(abciclient.NewLocalClient(logger, &abci.BaseApplication{}), logger, proxy.NopMetrics()) require.NoError(t, rts.app[nodeID].Start(ctx)) blockDB := dbm.NewMemDB() @@ -124,7 +124,7 @@ func (rts *reactorTestSuite) addNode( blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), - rts.app[nodeID].Consensus(), + rts.app[nodeID], mock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 6250ffc06..4034b2ddf 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -10,6 +10,7 @@ import ( "reflect" "time" + abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/internal/eventbus" @@ -237,10 +238,10 @@ func (h *Handshaker) NBlocks() int { } // TODO: retry the handshake/replay if it fails ? -func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) error { +func (h *Handshaker) Handshake(ctx context.Context, appClient abciclient.Client) error { // Handshake is done via ABCI Info on the query conn. - res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo) + res, err := appClient.Info(ctx, proxy.RequestInfo) if err != nil { return fmt.Errorf("error calling Info: %w", err) } @@ -264,7 +265,7 @@ func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) err } // Replay blocks up to the latest in the blockstore. - _, err = h.ReplayBlocks(ctx, h.initialState, appHash, blockHeight, proxyApp) + _, err = h.ReplayBlocks(ctx, h.initialState, appHash, blockHeight, appClient) if err != nil { return fmt.Errorf("error on replay: %w", err) } @@ -285,7 +286,7 @@ func (h *Handshaker) ReplayBlocks( state sm.State, appHash []byte, appBlockHeight int64, - proxyApp proxy.AppConns, + appClient abciclient.Client, ) ([]byte, error) { storeBlockBase := h.store.Base() storeBlockHeight := h.store.Height() @@ -316,7 +317,7 @@ func (h *Handshaker) ReplayBlocks( Validators: nextVals, AppStateBytes: h.genDoc.AppState, } - res, err := proxyApp.Consensus().InitChain(ctx, req) + res, err := appClient.InitChain(ctx, req) if err != nil { return nil, err } @@ -390,7 +391,7 @@ func (h *Handshaker) ReplayBlocks( // Either the app is asking for replay, or we're all synced up. if appBlockHeight < storeBlockHeight { // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) - return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false) + return h.replayBlocks(ctx, state, appClient, appBlockHeight, storeBlockHeight, false) } else if appBlockHeight == storeBlockHeight { // We're good! @@ -405,7 +406,7 @@ func (h *Handshaker) ReplayBlocks( case appBlockHeight < stateBlockHeight: // the app is further behind than it should be, so replay blocks // but leave the last block to go through the WAL - return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true) + return h.replayBlocks(ctx, state, appClient, appBlockHeight, storeBlockHeight, true) case appBlockHeight == stateBlockHeight: // We haven't run Commit (both the state and app are one block behind), @@ -413,7 +414,7 @@ func (h *Handshaker) ReplayBlocks( // NOTE: We could instead use the cs.WAL on cs.Start, // but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT h.logger.Info("Replay last block using real app") - state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus()) + state, err = h.replayBlock(ctx, state, storeBlockHeight, appClient) return state.AppHash, err case appBlockHeight == storeBlockHeight: @@ -426,6 +427,9 @@ func (h *Handshaker) ReplayBlocks( if err != nil { return nil, err } + if err := mockApp.Start(ctx); err != nil { + return nil, err + } h.logger.Info("Replay last block using mock app") state, err = h.replayBlock(ctx, state, storeBlockHeight, mockApp) @@ -445,7 +449,7 @@ func (h *Handshaker) ReplayBlocks( func (h *Handshaker) replayBlocks( ctx context.Context, state sm.State, - proxyApp proxy.AppConns, + appClient abciclient.Client, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) { @@ -481,16 +485,16 @@ func (h *Handshaker) replayBlocks( // We emit events for the index services at the final block due to the sync issue when // the node shutdown during the block committing status. blockExec := sm.NewBlockExecutor( - h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store) + h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store) blockExec.SetEventBus(h.eventBus) appHash, err = sm.ExecCommitBlock(ctx, - blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) + blockExec, appClient, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) if err != nil { return nil, err } } else { appHash, err = sm.ExecCommitBlock(ctx, - nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) + nil, appClient, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) if err != nil { return nil, err } @@ -501,7 +505,7 @@ func (h *Handshaker) replayBlocks( if mutateState { // sync the final block - state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus()) + state, err = h.replayBlock(ctx, state, storeBlockHeight, appClient) if err != nil { return nil, err } @@ -517,14 +521,14 @@ func (h *Handshaker) replayBlock( ctx context.Context, state sm.State, height int64, - proxyApp proxy.AppConnConsensus, + appClient abciclient.Client, ) (sm.State, error) { block := h.store.LoadBlock(height) meta := h.store.LoadBlockMeta(height) // Use stubs for both mempool and evidence pool since no transactions nor // evidence are needed here - block already exists. - blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store) + blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store) blockExec.SetEventBus(h.eventBus) var err error diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index 96de5ef28..3ab470d36 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -326,9 +326,12 @@ func newConsensusStateForReplay( return nil, err } - // Create proxyAppConn connection (consensus, mempool, query) - clientCreator, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) - proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) + client, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) + if err != nil { + return nil, err + } + + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) err = proxyApp.Start(ctx) if err != nil { return nil, fmt.Errorf("starting proxy app conns: %w", err) @@ -346,7 +349,7 @@ func newConsensusStateForReplay( } mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore) + blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore) consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec, blockStore, mempool, evpool) diff --git a/internal/consensus/replay_stubs.go b/internal/consensus/replay_stubs.go index 721285778..e479d344e 100644 --- a/internal/consensus/replay_stubs.go +++ b/internal/consensus/replay_stubs.go @@ -61,22 +61,11 @@ func newMockProxyApp( logger log.Logger, appHash []byte, abciResponses *tmstate.ABCIResponses, -) (proxy.AppConnConsensus, error) { - - clientCreator := abciclient.NewLocalCreator(&mockProxyApp{ +) (abciclient.Client, error) { + return proxy.New(abciclient.NewLocalClient(logger, &mockProxyApp{ appHash: appHash, abciResponses: abciResponses, - }) - cli, err := clientCreator(logger) - if err != nil { - return nil, err - } - - if err = cli.Start(ctx); err != nil { - return nil, err - } - - return proxy.NewAppConnConsensus(cli, proxy.NopMetrics()), nil + }), logger, proxy.NopMetrics()), nil } type mockProxyApp struct { diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index 946afde93..2bda63d74 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -748,11 +748,11 @@ func testHandshakeReplay( filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int()))) t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) }) - clientCreator2 := abciclient.NewLocalCreator(kvstoreApp) + clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp) if nBlocks > 0 { // run nBlocks against a new client to build up the app state. // use a throwaway tendermint state - proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics()) + proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics()) stateDB1 := dbm.NewMemDB() stateStore := sm.NewStore(stateDB1) err := stateStore.Save(genesisState) @@ -773,9 +773,10 @@ func testHandshakeReplay( genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) require.NoError(t, err) handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) - proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics()) + proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") - + require.True(t, proxyApp.IsRunning()) + require.NotNil(t, proxyApp) t.Cleanup(func() { cancel(); proxyApp.Wait() }) err = handshaker.Handshake(ctx, proxyApp) @@ -786,7 +787,7 @@ func testHandshakeReplay( require.NoError(t, err, "Error on abci handshake") // get the latest app hash from the app - res, err := proxyApp.Query().Info(ctx, abci.RequestInfo{Version: ""}) + res, err := proxyApp.Info(ctx, abci.RequestInfo{Version: ""}) if err != nil { t.Fatal(err) } @@ -819,11 +820,11 @@ func applyBlock( evpool sm.EvidencePool, st sm.State, blk *types.Block, - proxyApp proxy.AppConns, + appClient abciclient.Client, blockStore *mockBlockStore, ) sm.State { testPartSize := types.BlockPartSizeBytes - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mempool, evpool, blockStore) bps, err := blk.MakePartSet(testPartSize) require.NoError(t, err) @@ -836,7 +837,7 @@ func applyBlock( func buildAppStateFromChain( ctx context.Context, t *testing.T, - proxyApp proxy.AppConns, + appClient abciclient.Client, stateStore sm.Store, mempool mempool.Mempool, evpool sm.EvidencePool, @@ -848,11 +849,11 @@ func buildAppStateFromChain( ) { t.Helper() // start a new app without handshake, play nBlocks blocks - require.NoError(t, proxyApp.Start(ctx)) + require.NoError(t, appClient.Start(ctx)) state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version validators := types.TM2PB.ValidatorUpdates(state.Validators) - _, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{ + _, err := appClient.InitChain(ctx, abci.RequestInitChain{ Validators: validators, }) require.NoError(t, err) @@ -863,18 +864,18 @@ func buildAppStateFromChain( case 0: for i := 0; i < nBlocks; i++ { block := chain[i] - state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore) + state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore) } case 1, 2, 3: for i := 0; i < nBlocks-1; i++ { block := chain[i] - state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore) + state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore) } if mode == 2 || mode == 3 { // update the kvstore height and apphash // as if we ran commit but not - state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, blockStore) + state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], appClient, blockStore) } default: require.Fail(t, "unknown mode %v", mode) @@ -902,14 +903,14 @@ func buildTMStateFromChain( kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger, filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode))) defer kvstoreApp.Close() - clientCreator := abciclient.NewLocalCreator(kvstoreApp) + client := abciclient.NewLocalClient(logger, kvstoreApp) - proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version validators := types.TM2PB.ValidatorUpdates(state.Validators) - _, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{ + _, err := proxyApp.InitChain(ctx, abci.RequestInitChain{ Validators: validators, }) require.NoError(t, err) @@ -975,8 +976,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { // - 0x03 { app := &badApp{numBlocks: 3, allHashesAreWrong: true} - clientCreator := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) + client := abciclient.NewLocalClient(logger, app) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) t.Cleanup(func() { cancel(); proxyApp.Wait() }) @@ -995,8 +996,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { // - RANDOM HASH { app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true} - clientCreator := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) + client := abciclient.NewLocalClient(logger, app) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) t.Cleanup(func() { cancel(); proxyApp.Wait() }) @@ -1226,12 +1227,13 @@ func TestHandshakeUpdatesValidators(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := log.NewNopLogger() votePower := 10 + int64(rand.Uint32()) val, _, err := factory.Validator(ctx, votePower) require.NoError(t, err) vals := types.NewValidatorSet([]*types.Validator{val}) app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)} - clientCreator := abciclient.NewLocalCreator(app) + client := abciclient.NewLocalClient(logger, app) cfg, err := ResetConfig(t.TempDir(), "handshake_test_") require.NoError(t, err) @@ -1250,9 +1252,8 @@ func TestHandshakeUpdatesValidators(t *testing.T) { genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) require.NoError(t, err) - logger := log.TestingLogger() handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) - proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") require.NoError(t, handshaker.Handshake(ctx, proxyApp), "error on abci handshake") diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index 493ec1840..9608365c1 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -67,8 +67,8 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr } blockStore := store.NewBlockStore(blockStoreDB) - - proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics()) + proxyLogger := logger.With("module", "proxy") + proxyApp := proxy.New(abciclient.NewLocalClient(logger, app), proxyLogger, proxy.NopMetrics()) if err := proxyApp.Start(ctx); err != nil { t.Fatal(fmt.Errorf("failed to start proxy app connections: %w", err)) } @@ -82,7 +82,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore) consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool) if err != nil { t.Fatal(err) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 544719a1b..6fcfe86c1 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -9,10 +9,10 @@ import ( "sync/atomic" "time" + abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/libs/clist" - "github.com/tendermint/tendermint/internal/proxy" "github.com/tendermint/tendermint/libs/log" tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/types" @@ -31,7 +31,7 @@ type TxMempool struct { logger log.Logger metrics *Metrics config *config.MempoolConfig - proxyAppConn proxy.AppConnMempool + proxyAppConn abciclient.Client // txsAvailable fires once for each height when the mempool is not empty txsAvailable chan struct{} @@ -93,7 +93,7 @@ type TxMempool struct { func NewTxMempool( logger log.Logger, cfg *config.MempoolConfig, - proxyAppConn proxy.AppConnMempool, + proxyAppConn abciclient.Client, options ...TxMempoolOption, ) *TxMempool { @@ -421,7 +421,6 @@ func (txmp *TxMempool) Update( newPreFn PreCheckFunc, newPostFn PostCheckFunc, ) error { - txmp.height = blockHeight txmp.notifiedTxsAvailable = false diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 358192e70..e4f604cb1 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -78,24 +78,24 @@ func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoo var cancel context.CancelFunc ctx, cancel = context.WithCancel(ctx) - app := &application{kvstore.NewApplication()} - cc := abciclient.NewLocalCreator(app) logger := log.TestingLogger() + conn := abciclient.NewLocalClient(logger, &application{ + kvstore.NewApplication(), + }) + cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|")) require.NoError(t, err) cfg.Mempool.CacheSize = cacheSize - appConnMem, err := cc(logger) - require.NoError(t, err) - require.NoError(t, appConnMem.Start(ctx)) + require.NoError(t, conn.Start(ctx)) t.Cleanup(func() { os.RemoveAll(cfg.RootDir) cancel() - appConnMem.Wait() + conn.Wait() }) - return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, options...) + return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, conn, options...) } func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { diff --git a/internal/proxy/app_conn.go b/internal/proxy/app_conn.go deleted file mode 100644 index f30757f45..000000000 --- a/internal/proxy/app_conn.go +++ /dev/null @@ -1,249 +0,0 @@ -package proxy - -import ( - "context" - "time" - - "github.com/go-kit/kit/metrics" - - abciclient "github.com/tendermint/tendermint/abci/client" - "github.com/tendermint/tendermint/abci/types" -) - -//go:generate ../../scripts/mockery_generate.sh AppConnConsensus|AppConnMempool|AppConnQuery|AppConnSnapshot - -//---------------------------------------------------------------------------------------- -// Enforce which abci msgs can be sent on a connection at the type level - -type AppConnConsensus interface { - Error() error - - InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error) - - PrepareProposal(context.Context, types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) - ProcessProposal(context.Context, types.RequestProcessProposal) (*types.ResponseProcessProposal, error) - ExtendVote(context.Context, types.RequestExtendVote) (*types.ResponseExtendVote, error) - VerifyVoteExtension(context.Context, types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) - FinalizeBlock(context.Context, types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) - Commit(context.Context) (*types.ResponseCommit, error) -} - -type AppConnMempool interface { - Error() error - - CheckTx(context.Context, types.RequestCheckTx) (*types.ResponseCheckTx, error) - - Flush(context.Context) error -} - -type AppConnQuery interface { - Error() error - - Echo(context.Context, string) (*types.ResponseEcho, error) - Info(context.Context, types.RequestInfo) (*types.ResponseInfo, error) - Query(context.Context, types.RequestQuery) (*types.ResponseQuery, error) -} - -type AppConnSnapshot interface { - Error() error - - ListSnapshots(context.Context, types.RequestListSnapshots) (*types.ResponseListSnapshots, error) - OfferSnapshot(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) - LoadSnapshotChunk(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) - ApplySnapshotChunk(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) -} - -//----------------------------------------------------------------------------------------- -// Implements AppConnConsensus (subset of abciclient.Client) - -type appConnConsensus struct { - metrics *Metrics - appConn abciclient.Client -} - -var _ AppConnConsensus = (*appConnConsensus)(nil) - -func NewAppConnConsensus(appConn abciclient.Client, metrics *Metrics) AppConnConsensus { - return &appConnConsensus{ - metrics: metrics, - appConn: appConn, - } -} - -func (app *appConnConsensus) Error() error { - return app.appConn.Error() -} - -func (app *appConnConsensus) InitChain( - ctx context.Context, - req types.RequestInitChain, -) (*types.ResponseInitChain, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))() - return app.appConn.InitChain(ctx, req) -} - -func (app *appConnConsensus) PrepareProposal( - ctx context.Context, - req types.RequestPrepareProposal, -) (*types.ResponsePrepareProposal, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))() - return app.appConn.PrepareProposal(ctx, req) -} - -func (app *appConnConsensus) ProcessProposal( - ctx context.Context, - req types.RequestProcessProposal, -) (*types.ResponseProcessProposal, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))() - return app.appConn.ProcessProposal(ctx, req) -} - -func (app *appConnConsensus) ExtendVote( - ctx context.Context, - req types.RequestExtendVote, -) (*types.ResponseExtendVote, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))() - return app.appConn.ExtendVote(ctx, req) -} - -func (app *appConnConsensus) VerifyVoteExtension( - ctx context.Context, - req types.RequestVerifyVoteExtension, -) (*types.ResponseVerifyVoteExtension, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))() - return app.appConn.VerifyVoteExtension(ctx, req) -} - -func (app *appConnConsensus) FinalizeBlock( - ctx context.Context, - req types.RequestFinalizeBlock, -) (*types.ResponseFinalizeBlock, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))() - return app.appConn.FinalizeBlock(ctx, req) -} - -func (app *appConnConsensus) Commit(ctx context.Context) (*types.ResponseCommit, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))() - return app.appConn.Commit(ctx) -} - -//------------------------------------------------ -// Implements AppConnMempool (subset of abciclient.Client) - -type appConnMempool struct { - metrics *Metrics - appConn abciclient.Client -} - -func NewAppConnMempool(appConn abciclient.Client, metrics *Metrics) AppConnMempool { - return &appConnMempool{ - metrics: metrics, - appConn: appConn, - } -} - -func (app *appConnMempool) Error() error { - return app.appConn.Error() -} - -func (app *appConnMempool) Flush(ctx context.Context) error { - defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))() - return app.appConn.Flush(ctx) -} - -func (app *appConnMempool) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() - return app.appConn.CheckTx(ctx, req) -} - -//------------------------------------------------ -// Implements AppConnQuery (subset of abciclient.Client) - -type appConnQuery struct { - metrics *Metrics - appConn abciclient.Client -} - -func NewAppConnQuery(appConn abciclient.Client, metrics *Metrics) AppConnQuery { - return &appConnQuery{ - metrics: metrics, - appConn: appConn, - } -} - -func (app *appConnQuery) Error() error { - return app.appConn.Error() -} - -func (app *appConnQuery) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))() - return app.appConn.Echo(ctx, msg) -} - -func (app *appConnQuery) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))() - return app.appConn.Info(ctx, req) -} - -func (app *appConnQuery) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))() - return app.appConn.Query(ctx, reqQuery) -} - -//------------------------------------------------ -// Implements AppConnSnapshot (subset of abciclient.Client) - -type appConnSnapshot struct { - metrics *Metrics - appConn abciclient.Client -} - -func NewAppConnSnapshot(appConn abciclient.Client, metrics *Metrics) AppConnSnapshot { - return &appConnSnapshot{ - metrics: metrics, - appConn: appConn, - } -} - -func (app *appConnSnapshot) Error() error { - return app.appConn.Error() -} - -func (app *appConnSnapshot) ListSnapshots( - ctx context.Context, - req types.RequestListSnapshots, -) (*types.ResponseListSnapshots, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))() - return app.appConn.ListSnapshots(ctx, req) -} - -func (app *appConnSnapshot) OfferSnapshot( - ctx context.Context, - req types.RequestOfferSnapshot, -) (*types.ResponseOfferSnapshot, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))() - return app.appConn.OfferSnapshot(ctx, req) -} - -func (app *appConnSnapshot) LoadSnapshotChunk( - ctx context.Context, - req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))() - return app.appConn.LoadSnapshotChunk(ctx, req) -} - -func (app *appConnSnapshot) ApplySnapshotChunk( - ctx context.Context, - req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))() - return app.appConn.ApplySnapshotChunk(ctx, req) -} - -// addTimeSample returns a function that, when called, adds an observation to m. -// The observation added to m is the number of seconds ellapsed since addTimeSample -// was initially called. addTimeSample is meant to be called in a defer to calculate -// the amount of time a function takes to complete. -func addTimeSample(m metrics.Histogram) func() { - start := time.Now() - return func() { m.Observe(time.Since(start).Seconds()) } -} diff --git a/internal/proxy/client.go b/internal/proxy/client.go index d01634bdf..7444c841e 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -1,42 +1,213 @@ package proxy import ( + "context" "io" + "os" + "syscall" + "time" + "github.com/go-kit/kit/metrics" abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" e2e "github.com/tendermint/tendermint/test/e2e/app" ) -// DefaultClientCreator returns a default ClientCreator, which will create a -// local client if addr is one of: 'kvstore', -// 'persistent_kvstore', 'e2e', or 'noop', otherwise - a remote client. +// ClientFactory returns a client object, which will create a local +// client if addr is one of: 'kvstore', 'persistent_kvstore', 'e2e', +// or 'noop', otherwise - a remote client. // // The Closer is a noop except for persistent_kvstore applications, // which will clean up the store. -func DefaultClientCreator(logger log.Logger, addr, transport, dbDir string) (abciclient.Creator, io.Closer) { +func ClientFactory(logger log.Logger, addr, transport, dbDir string) (abciclient.Client, io.Closer, error) { switch addr { case "kvstore": - return abciclient.NewLocalCreator(kvstore.NewApplication()), noopCloser{} + return abciclient.NewLocalClient(logger, kvstore.NewApplication()), noopCloser{}, nil case "persistent_kvstore": app := kvstore.NewPersistentKVStoreApplication(logger, dbDir) - return abciclient.NewLocalCreator(app), app + return abciclient.NewLocalClient(logger, app), app, nil case "e2e": app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir)) if err != nil { - panic(err) + return nil, noopCloser{}, err } - return abciclient.NewLocalCreator(app), noopCloser{} + return abciclient.NewLocalClient(logger, app), noopCloser{}, nil case "noop": - return abciclient.NewLocalCreator(types.NewBaseApplication()), noopCloser{} + return abciclient.NewLocalClient(logger, types.NewBaseApplication()), noopCloser{}, nil default: - mustConnect := false // loop retrying - return abciclient.NewRemoteCreator(logger, addr, transport, mustConnect), noopCloser{} + const mustConnect = false // loop retrying + client, err := abciclient.NewClient(logger, addr, transport, mustConnect) + if err != nil { + return nil, noopCloser{}, err + } + + return client, noopCloser{}, nil } } type noopCloser struct{} func (noopCloser) Close() error { return nil } + +// proxyClient provides the application connection. +type proxyClient struct { + service.BaseService + logger log.Logger + + client abciclient.Client + metrics *Metrics +} + +// New creates a proxy application interface. +func New(client abciclient.Client, logger log.Logger, metrics *Metrics) abciclient.Client { + conn := &proxyClient{ + logger: logger, + metrics: metrics, + client: client, + } + conn.BaseService = *service.NewBaseService(logger, "proxyClient", conn) + return conn +} + +func (app *proxyClient) OnStop() { tryCallStop(app.client) } +func (app *proxyClient) Error() error { return app.client.Error() } + +func tryCallStop(client abciclient.Client) { + if c, ok := client.(interface{ Stop() }); ok { + c.Stop() + } +} + +func (app *proxyClient) OnStart(ctx context.Context) error { + var err error + defer func() { + if err != nil { + tryCallStop(app.client) + } + }() + + // Kill Tendermint if the ABCI application crashes. + go func() { + if !app.client.IsRunning() { + return + } + app.client.Wait() + if ctx.Err() != nil { + return + } + + if err := app.client.Error(); err != nil { + app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint", + "err", err) + + if killErr := kill(); killErr != nil { + app.logger.Error("Failed to kill this process - please do so manually", + "err", killErr) + } + } + + }() + + return app.client.Start(ctx) +} + +func kill() error { + p, err := os.FindProcess(os.Getpid()) + if err != nil { + return err + } + + return p.Signal(syscall.SIGABRT) +} + +func (app *proxyClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))() + return app.client.InitChain(ctx, req) +} + +func (app *proxyClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))() + return app.client.PrepareProposal(ctx, req) +} + +func (app *proxyClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))() + return app.client.ProcessProposal(ctx, req) +} + +func (app *proxyClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))() + return app.client.ExtendVote(ctx, req) +} + +func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))() + return app.client.VerifyVoteExtension(ctx, req) +} + +func (app *proxyClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))() + return app.client.FinalizeBlock(ctx, req) +} + +func (app *proxyClient) Commit(ctx context.Context) (*types.ResponseCommit, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))() + return app.client.Commit(ctx) +} + +func (app *proxyClient) Flush(ctx context.Context) error { + defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))() + return app.client.Flush(ctx) +} + +func (app *proxyClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() + return app.client.CheckTx(ctx, req) +} + +func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))() + return app.client.Echo(ctx, msg) +} + +func (app *proxyClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))() + return app.client.Info(ctx, req) +} + +func (app *proxyClient) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))() + return app.client.Query(ctx, reqQuery) +} + +func (app *proxyClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))() + return app.client.ListSnapshots(ctx, req) +} + +func (app *proxyClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))() + return app.client.OfferSnapshot(ctx, req) +} + +func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))() + return app.client.LoadSnapshotChunk(ctx, req) +} + +func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))() + return app.client.ApplySnapshotChunk(ctx, req) +} + +// addTimeSample returns a function that, when called, adds an observation to m. +// The observation added to m is the number of seconds ellapsed since addTimeSample +// was initially called. addTimeSample is meant to be called in a defer to calculate +// the amount of time a function takes to complete. +func addTimeSample(m metrics.Histogram) func() { + start := time.Now() + return func() { m.Observe(time.Since(start).Seconds()) } +} diff --git a/internal/proxy/app_conn_test.go b/internal/proxy/client_test.go similarity index 60% rename from internal/proxy/app_conn_test.go rename to internal/proxy/client_test.go index 22f519657..ca32b99e8 100644 --- a/internal/proxy/app_conn_test.go +++ b/internal/proxy/client_test.go @@ -2,18 +2,26 @@ package proxy import ( "context" + "errors" "fmt" + "os" + "os/signal" "strings" + "syscall" "testing" + "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" abciclient "github.com/tendermint/tendermint/abci/client" + abcimocks "github.com/tendermint/tendermint/abci/client/mocks" "github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/server" "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" + "gotest.tools/assert" ) //---------------------------------------- @@ -51,7 +59,10 @@ var SOCKET = "socket" func TestEcho(t *testing.T) { sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) logger := log.TestingLogger() - clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true) + client, err := abciclient.NewClient(logger, sockPath, SOCKET, true) + if err != nil { + t.Fatal(err) + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -62,12 +73,9 @@ func TestEcho(t *testing.T) { t.Cleanup(func() { cancel(); s.Wait() }) // Start client - cli, err := clientCreator(logger.With("module", "abci-client")) - require.NoError(t, err, "Error creating ABCI client:") - - require.NoError(t, cli.Start(ctx), "Error starting ABCI client") + require.NoError(t, client.Start(ctx), "Error starting ABCI client") - proxy := newAppConnTest(cli) + proxy := newAppConnTest(client) t.Log("Connected") for i := 0; i < 1000; i++ { @@ -91,7 +99,10 @@ func BenchmarkEcho(b *testing.B) { b.StopTimer() // Initialize sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) logger := log.TestingLogger() - clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true) + client, err := abciclient.NewClient(logger, sockPath, SOCKET, true) + if err != nil { + b.Fatal(err) + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -102,12 +113,9 @@ func BenchmarkEcho(b *testing.B) { b.Cleanup(func() { cancel(); s.Wait() }) // Start client - cli, err := clientCreator(logger.With("module", "abci-client")) - require.NoError(b, err, "Error creating ABCI client") - - require.NoError(b, cli.Start(ctx), "Error starting ABCI client") + require.NoError(b, client.Start(ctx), "Error starting ABCI client") - proxy := newAppConnTest(cli) + proxy := newAppConnTest(client) b.Log("Connected") echoString := strings.Repeat(" ", 200) b.StartTimer() // Start benchmarking tests @@ -139,7 +147,10 @@ func TestInfo(t *testing.T) { sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) logger := log.TestingLogger() - clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true) + client, err := abciclient.NewClient(logger, sockPath, SOCKET, true) + if err != nil { + t.Fatal(err) + } // Start server s := server.NewSocketServer(logger.With("module", "abci-server"), sockPath, kvstore.NewApplication()) @@ -147,12 +158,9 @@ func TestInfo(t *testing.T) { t.Cleanup(func() { cancel(); s.Wait() }) // Start client - cli, err := clientCreator(logger.With("module", "abci-client")) - require.NoError(t, err, "Error creating ABCI client") - - require.NoError(t, cli.Start(ctx), "Error starting ABCI client") + require.NoError(t, client.Start(ctx), "Error starting ABCI client") - proxy := newAppConnTest(cli) + proxy := newAppConnTest(client) t.Log("Connected") resInfo, err := proxy.Info(ctx, RequestInfo) @@ -162,3 +170,65 @@ func TestInfo(t *testing.T) { t.Error("Expected ResponseInfo with one element '{\"size\":0}' but got something else") } } + +type noopStoppableClientImpl struct { + abciclient.Client + count int +} + +func (c *noopStoppableClientImpl) Stop() { c.count++ } + +func TestAppConns_Start_Stop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clientMock := &abcimocks.Client{} + clientMock.On("Start", mock.Anything).Return(nil) + clientMock.On("Error").Return(nil) + clientMock.On("IsRunning").Return(true) + clientMock.On("Wait").Return(nil).Times(1) + cl := &noopStoppableClientImpl{Client: clientMock} + + appConns := New(cl, log.TestingLogger(), NopMetrics()) + + err := appConns.Start(ctx) + require.NoError(t, err) + + time.Sleep(200 * time.Millisecond) + + cancel() + appConns.Wait() + + clientMock.AssertExpectations(t) + assert.Equal(t, 1, cl.count) +} + +// Upon failure, we call tmos.Kill +func TestAppConns_Failure(t *testing.T) { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGTERM, syscall.SIGABRT) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + clientMock := &abcimocks.Client{} + clientMock.On("SetLogger", mock.Anything).Return() + clientMock.On("Start", mock.Anything).Return(nil) + clientMock.On("IsRunning").Return(true) + clientMock.On("Wait").Return(nil) + clientMock.On("Error").Return(errors.New("EOF")) + cl := &noopStoppableClientImpl{Client: clientMock} + + appConns := New(cl, log.TestingLogger(), NopMetrics()) + + err := appConns.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { cancel(); appConns.Wait() }) + + select { + case sig := <-c: + t.Logf("signal %q successfully received", sig) + case <-ctx.Done(): + t.Fatal("expected process to receive SIGTERM signal") + } +} diff --git a/internal/proxy/multi_app_conn.go b/internal/proxy/multi_app_conn.go deleted file mode 100644 index 61e9c9ff2..000000000 --- a/internal/proxy/multi_app_conn.go +++ /dev/null @@ -1,131 +0,0 @@ -package proxy - -import ( - "context" - "os" - "syscall" - - abciclient "github.com/tendermint/tendermint/abci/client" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/service" -) - -// AppConns is the Tendermint's interface to the application that consists of -// multiple connections. -type AppConns interface { - service.Service - - // Mempool connection - Mempool() AppConnMempool - // Consensus connection - Consensus() AppConnConsensus - // Query connection - Query() AppConnQuery - // Snapshot connection - Snapshot() AppConnSnapshot -} - -// NewAppConns calls NewMultiAppConn. -func NewAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns { - return NewMultiAppConn(clientCreator, logger, metrics) -} - -// multiAppConn implements AppConns. -// -// A multiAppConn is made of a few appConns and manages their underlying abci -// clients. -// TODO: on app restart, clients must reboot together -type multiAppConn struct { - service.BaseService - logger log.Logger - - metrics *Metrics - consensusConn AppConnConsensus - mempoolConn AppConnMempool - queryConn AppConnQuery - snapshotConn AppConnSnapshot - - client stoppableClient - - clientCreator abciclient.Creator -} - -// TODO: this is a totally internal and quasi permanent shim for -// clients. eventually we can have a single client and have some kind -// of reasonable lifecycle witout needing an explicit stop method. -type stoppableClient interface { - abciclient.Client - Stop() -} - -// NewMultiAppConn makes all necessary abci connections to the application. -func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns { - multiAppConn := &multiAppConn{ - logger: logger, - metrics: metrics, - clientCreator: clientCreator, - } - multiAppConn.BaseService = *service.NewBaseService(logger, "multiAppConn", multiAppConn) - return multiAppConn -} - -func (app *multiAppConn) Mempool() AppConnMempool { return app.mempoolConn } -func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn } -func (app *multiAppConn) Query() AppConnQuery { return app.queryConn } -func (app *multiAppConn) Snapshot() AppConnSnapshot { return app.snapshotConn } - -func (app *multiAppConn) OnStart(ctx context.Context) error { - var err error - defer func() { - if err != nil { - app.client.Stop() - } - }() - - var client abciclient.Client - client, err = app.clientCreator(app.logger) - if err != nil { - return err - } - - app.queryConn = NewAppConnQuery(client, app.metrics) - app.snapshotConn = NewAppConnSnapshot(client, app.metrics) - app.mempoolConn = NewAppConnMempool(client, app.metrics) - app.consensusConn = NewAppConnConsensus(client, app.metrics) - - app.client = client.(stoppableClient) - - // Kill Tendermint if the ABCI application crashes. - go func() { - if !client.IsRunning() { - return - } - app.client.Wait() - if ctx.Err() != nil { - return - } - - if err := app.client.Error(); err != nil { - app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint", - "err", err) - if killErr := kill(); killErr != nil { - app.logger.Error("Failed to kill this process - please do so manually", - "err", killErr) - } - } - - }() - - return client.Start(ctx) -} - -func (app *multiAppConn) OnStop() { app.client.Stop() } - -func kill() error { - p, err := os.FindProcess(os.Getpid()) - if err != nil { - return err - } - - return p.Signal(syscall.SIGTERM) -} diff --git a/internal/proxy/multi_app_conn_test.go b/internal/proxy/multi_app_conn_test.go deleted file mode 100644 index efbb3f56f..000000000 --- a/internal/proxy/multi_app_conn_test.go +++ /dev/null @@ -1,99 +0,0 @@ -package proxy - -import ( - "context" - "errors" - "os" - "os/signal" - "syscall" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - abciclient "github.com/tendermint/tendermint/abci/client" - abcimocks "github.com/tendermint/tendermint/abci/client/mocks" - "github.com/tendermint/tendermint/libs/log" -) - -type noopStoppableClientImpl struct { - abciclient.Client - count int -} - -func (c *noopStoppableClientImpl) Stop() { c.count++ } - -func TestAppConns_Start_Stop(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - clientMock := &abcimocks.Client{} - clientMock.On("Start", mock.Anything).Return(nil) - clientMock.On("Error").Return(nil) - clientMock.On("IsRunning").Return(true) - clientMock.On("Wait").Return(nil).Times(1) - cl := &noopStoppableClientImpl{Client: clientMock} - - creatorCallCount := 0 - creator := func(logger log.Logger) (abciclient.Client, error) { - creatorCallCount++ - return cl, nil - } - - appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics()) - - err := appConns.Start(ctx) - require.NoError(t, err) - - time.Sleep(200 * time.Millisecond) - - cancel() - appConns.Wait() - - clientMock.AssertExpectations(t) - assert.Equal(t, 1, cl.count) - assert.Equal(t, 1, creatorCallCount) -} - -// Upon failure, we call tmos.Kill -func TestAppConns_Failure(t *testing.T) { - ok := make(chan struct{}) - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGTERM) - go func() { - for range c { - close(ok) - return - } - }() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - clientMock := &abcimocks.Client{} - clientMock.On("SetLogger", mock.Anything).Return() - clientMock.On("Start", mock.Anything).Return(nil) - clientMock.On("IsRunning").Return(true) - clientMock.On("Wait").Return(nil) - clientMock.On("Error").Return(errors.New("EOF")) - cl := &noopStoppableClientImpl{Client: clientMock} - - creator := func(log.Logger) (abciclient.Client, error) { - return cl, nil - } - - appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics()) - - err := appConns.Start(ctx) - require.NoError(t, err) - t.Cleanup(func() { cancel(); appConns.Wait() }) - - select { - case <-ok: - t.Log("SIGTERM successfully received") - case <-time.After(5 * time.Second): - t.Fatal("expected process to receive SIGTERM signal") - } -} diff --git a/internal/rpc/core/abci.go b/internal/rpc/core/abci.go index cbd27a09d..8f5e61d55 100644 --- a/internal/rpc/core/abci.go +++ b/internal/rpc/core/abci.go @@ -18,7 +18,7 @@ func (env *Environment) ABCIQuery( height int64, prove bool, ) (*coretypes.ResultABCIQuery, error) { - resQuery, err := env.ProxyAppQuery.Query(ctx, abci.RequestQuery{ + resQuery, err := env.ProxyApp.Query(ctx, abci.RequestQuery{ Path: path, Data: data, Height: height, @@ -34,7 +34,7 @@ func (env *Environment) ABCIQuery( // ABCIInfo gets some info about the application. // More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info func (env *Environment) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo, error) { - resInfo, err := env.ProxyAppQuery.Info(ctx, proxy.RequestInfo) + resInfo, err := env.ProxyApp.Info(ctx, proxy.RequestInfo) if err != nil { return nil, err } diff --git a/internal/rpc/core/env.go b/internal/rpc/core/env.go index 8f590298b..24f43a4a7 100644 --- a/internal/rpc/core/env.go +++ b/internal/rpc/core/env.go @@ -11,6 +11,7 @@ import ( "github.com/rs/cors" + abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/internal/blocksync" @@ -19,7 +20,6 @@ import ( "github.com/tendermint/tendermint/internal/eventlog" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" - "github.com/tendermint/tendermint/internal/proxy" tmpubsub "github.com/tendermint/tendermint/internal/pubsub" "github.com/tendermint/tendermint/internal/pubsub/query" sm "github.com/tendermint/tendermint/internal/state" @@ -67,8 +67,7 @@ type peerManager interface { // to be setup once during startup. type Environment struct { // external, thread safe interfaces - ProxyAppQuery proxy.AppConnQuery - ProxyAppMempool proxy.AppConnMempool + ProxyApp abciclient.Client // interfaces defined in types and above StateStore sm.Store diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 92df22052..c2a9084db 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -158,7 +158,7 @@ func (env *Environment) NumUnconfirmedTxs(ctx context.Context) (*coretypes.Resul // be added to the mempool either. // More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx func (env *Environment) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultCheckTx, error) { - res, err := env.ProxyAppMempool.CheckTx(ctx, abci.RequestCheckTx{Tx: tx}) + res, err := env.ProxyApp.CheckTx(ctx, abci.RequestCheckTx{Tx: tx}) if err != nil { return nil, err } diff --git a/internal/state/execution.go b/internal/state/execution.go index 5da2a1f49..f7c9e02d1 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -5,11 +5,11 @@ import ( "fmt" "time" + abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/encoding" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/mempool" - "github.com/tendermint/tendermint/internal/proxy" "github.com/tendermint/tendermint/libs/log" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" tmtypes "github.com/tendermint/tendermint/proto/tendermint/types" @@ -30,7 +30,7 @@ type BlockExecutor struct { blockStore BlockStore // execute the app against this - proxyApp proxy.AppConnConsensus + appClient abciclient.Client // events eventBus types.BlockEventPublisher @@ -60,7 +60,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption { func NewBlockExecutor( stateStore Store, logger log.Logger, - proxyApp proxy.AppConnConsensus, + appClient abciclient.Client, pool mempool.Mempool, evpool EvidencePool, blockStore BlockStore, @@ -68,7 +68,7 @@ func NewBlockExecutor( ) *BlockExecutor { res := &BlockExecutor{ store: stateStore, - proxyApp: proxyApp, + appClient: appClient, eventBus: eventbus.NopEventBus{}, mempool: pool, evpool: evpool, @@ -119,7 +119,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock( txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas) - preparedProposal, err := blockExec.proxyApp.PrepareProposal( + preparedProposal, err := blockExec.appClient.PrepareProposal( ctx, abci.RequestPrepareProposal{ BlockData: txs.ToSliceOfBytes(), @@ -166,7 +166,7 @@ func (blockExec *BlockExecutor) ProcessProposal( ByzantineValidators: block.Evidence.ToABCI(), } - resp, err := blockExec.proxyApp.ProcessProposal(ctx, req) + resp, err := blockExec.appClient.ProcessProposal(ctx, req) if err != nil { return false, ErrInvalidBlock(err) } @@ -214,7 +214,7 @@ func (blockExec *BlockExecutor) ApplyBlock( } startTime := time.Now().UnixNano() pbh := block.Header.ToProto() - finalizeBlockResponse, err := blockExec.proxyApp.FinalizeBlock( + finalizeBlockResponse, err := blockExec.appClient.FinalizeBlock( ctx, abci.RequestFinalizeBlock{ Hash: block.Hash(), @@ -299,7 +299,7 @@ func (blockExec *BlockExecutor) ExtendVote(ctx context.Context, vote *types.Vote Vote: vote.ToProto(), } - resp, err := blockExec.proxyApp.ExtendVote(ctx, req) + resp, err := blockExec.appClient.ExtendVote(ctx, req) if err != nil { return types.VoteExtension{}, err } @@ -311,7 +311,7 @@ func (blockExec *BlockExecutor) VerifyVoteExtension(ctx context.Context, vote *t Vote: vote.ToProto(), } - resp, err := blockExec.proxyApp.VerifyVoteExtension(ctx, req) + resp, err := blockExec.appClient.VerifyVoteExtension(ctx, req) if err != nil { return err } @@ -347,7 +347,7 @@ func (blockExec *BlockExecutor) Commit( } // Commit block, get hash back - res, err := blockExec.proxyApp.Commit(ctx) + res, err := blockExec.appClient.Commit(ctx) if err != nil { blockExec.logger.Error("client error during proxyAppConn.Commit", "err", err) return nil, 0, err @@ -580,7 +580,7 @@ func fireEvents( func ExecCommitBlock( ctx context.Context, be *BlockExecutor, - appConnConsensus proxy.AppConnConsensus, + appConnConsensus abciclient.Client, block *types.Block, logger log.Logger, store Store, diff --git a/internal/state/execution_test.go b/internal/state/execution_test.go index 5a37b304f..8dd384b61 100644 --- a/internal/state/execution_test.go +++ b/internal/state/execution_test.go @@ -38,9 +38,9 @@ var ( func TestApplyBlock(t *testing.T) { app := &testApp{} - cc := abciclient.NewLocalCreator(app) logger := log.TestingLogger() - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + cc := abciclient.NewLocalClient(logger, app) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -51,7 +51,7 @@ func TestApplyBlock(t *testing.T) { state, stateDB, _ := makeState(t, 1, 1) stateStore := sm.NewStore(stateDB) blockStore := store.NewBlockStore(dbm.NewMemDB()) - blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), + blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore) block, err := sf.MakeBlock(state, 1, new(types.Commit)) @@ -74,11 +74,12 @@ func TestFinalizeBlockDecidedLastCommit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := log.TestingLogger() app := &testApp{} - cc := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics()) + cc := abciclient.NewLocalClient(logger, app) + appClient := proxy.New(cc, logger, proxy.NopMetrics()) - err := proxyApp.Start(ctx) + err := appClient.Start(ctx) require.NoError(t, err) state, stateDB, privVals := makeState(t, 7, 1) @@ -102,7 +103,7 @@ func TestFinalizeBlockDecidedLastCommit(t *testing.T) { evpool.On("Update", ctx, mock.Anything, mock.Anything).Return() evpool.On("CheckEvidence", ctx, mock.Anything).Return(nil) - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mmock.Mempool{}, evpool, blockStore) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mmock.Mempool{}, evpool, blockStore) state, _, lastCommit := makeAndCommitGoodBlock(ctx, t, state, 1, new(types.Commit), state.NextValidators.Validators[0].Address, blockExec, privVals, nil) for idx, isAbsent := range tc.absentCommitSigs { @@ -135,8 +136,9 @@ func TestFinalizeBlockByzantineValidators(t *testing.T) { defer cancel() app := &testApp{} - cc := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics()) + logger := log.TestingLogger() + cc := abciclient.NewLocalClient(logger, app) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) @@ -212,7 +214,7 @@ func TestFinalizeBlockByzantineValidators(t *testing.T) { blockStore := store.NewBlockStore(dbm.NewMemDB()) - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mmock.Mempool{}, evpool, blockStore) block, err := sf.MakeBlock(state, 1, new(types.Commit)) @@ -238,9 +240,9 @@ func TestProcessProposal(t *testing.T) { defer cancel() app := abcimocks.NewBaseMock() - cc := abciclient.NewLocalCreator(app) logger := log.TestingLogger() - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + cc := abciclient.NewLocalClient(logger, app) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) @@ -251,7 +253,7 @@ func TestProcessProposal(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, logger, - proxyApp.Consensus(), + proxyApp, mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, @@ -441,9 +443,9 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) { defer cancel() app := &testApp{} - cc := abciclient.NewLocalCreator(app) logger := log.TestingLogger() - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + cc := abciclient.NewLocalClient(logger, app) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) @@ -454,7 +456,7 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, logger, - proxyApp.Consensus(), + proxyApp, mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, @@ -516,9 +518,9 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { defer cancel() app := &testApp{} - cc := abciclient.NewLocalCreator(app) logger := log.TestingLogger() - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + cc := abciclient.NewLocalClient(logger, app) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) @@ -528,7 +530,7 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), - proxyApp.Consensus(), + proxyApp, mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, diff --git a/internal/state/helpers_test.go b/internal/state/helpers_test.go index 4df5b874f..6df958908 100644 --- a/internal/state/helpers_test.go +++ b/internal/state/helpers_test.go @@ -11,16 +11,13 @@ import ( "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" - abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/encoding" - "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" sf "github.com/tendermint/tendermint/internal/state/test/factory" "github.com/tendermint/tendermint/internal/test/factory" - "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" tmtime "github.com/tendermint/tendermint/libs/time" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" @@ -33,12 +30,6 @@ type paramsChangeTestCase struct { params types.ConsensusParams } -func newTestApp() proxy.AppConns { - app := &testApp{} - cc := abciclient.NewLocalCreator(app) - return proxy.NewAppConns(cc, log.NewNopLogger(), proxy.NopMetrics()) -} - func makeAndCommitGoodBlock( ctx context.Context, t *testing.T, diff --git a/internal/state/validation_test.go b/internal/state/validation_test.go index 4d78fde74..d24b5098a 100644 --- a/internal/state/validation_test.go +++ b/internal/state/validation_test.go @@ -10,10 +10,12 @@ import ( "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" + abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/tmhash" memmock "github.com/tendermint/tendermint/internal/mempool/mock" + "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/state/mocks" statefactory "github.com/tendermint/tendermint/internal/state/test/factory" @@ -30,8 +32,8 @@ const validationTestsStopHeight int64 = 10 func TestValidateBlockHeader(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - proxyApp := newTestApp() + logger := log.TestingLogger() + proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) state, stateDB, privVals := makeState(t, 3, 1) @@ -39,8 +41,8 @@ func TestValidateBlockHeader(t *testing.T) { blockStore := store.NewBlockStore(dbm.NewMemDB()) blockExec := sm.NewBlockExecutor( stateStore, - log.TestingLogger(), - proxyApp.Consensus(), + logger, + proxyApp, memmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, @@ -119,7 +121,8 @@ func TestValidateBlockCommit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - proxyApp := newTestApp() + logger := log.TestingLogger() + proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) state, stateDB, privVals := makeState(t, 1, 1) @@ -127,8 +130,8 @@ func TestValidateBlockCommit(t *testing.T) { blockStore := store.NewBlockStore(dbm.NewMemDB()) blockExec := sm.NewBlockExecutor( stateStore, - log.TestingLogger(), - proxyApp.Consensus(), + logger, + proxyApp, memmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, @@ -245,7 +248,8 @@ func TestValidateBlockEvidence(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - proxyApp := newTestApp() + logger := log.TestingLogger() + proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) state, stateDB, privVals := makeState(t, 4, 1) @@ -263,7 +267,7 @@ func TestValidateBlockEvidence(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), - proxyApp.Consensus(), + proxyApp, memmock.Mempool{}, evpool, blockStore, diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 1f65a8c0c..51f626027 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -11,11 +11,11 @@ import ( "sync" "time" + abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/p2p" - "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/store" "github.com/tendermint/tendermint/libs/log" @@ -135,8 +135,7 @@ type Reactor struct { stateStore sm.Store blockStore *store.BlockStore - conn proxy.AppConnSnapshot - connQuery proxy.AppConnQuery + conn abciclient.Client tempDir string snapshotCh *p2p.Channel chunkCh *p2p.Channel @@ -173,8 +172,7 @@ func NewReactor( initialHeight int64, cfg config.StateSyncConfig, logger log.Logger, - conn proxy.AppConnSnapshot, - connQuery proxy.AppConnQuery, + conn abciclient.Client, channelCreator p2p.ChannelCreator, peerUpdates *p2p.PeerUpdates, stateStore sm.Store, @@ -209,7 +207,6 @@ func NewReactor( initialHeight: initialHeight, cfg: cfg, conn: conn, - connQuery: connQuery, snapshotCh: snapshotCh, chunkCh: chunkCh, blockCh: blockCh, @@ -287,7 +284,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { r.cfg, r.logger, r.conn, - r.connQuery, r.stateProvider, r.snapshotCh, r.chunkCh, diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index f259cfa58..dc7a73bf9 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -13,11 +13,11 @@ import ( "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" + clientmocks "github.com/tendermint/tendermint/abci/client/mocks" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/proxy" - proxymocks "github.com/tendermint/tendermint/internal/proxy/mocks" smmocks "github.com/tendermint/tendermint/internal/state/mocks" "github.com/tendermint/tendermint/internal/statesync/mocks" "github.com/tendermint/tendermint/internal/store" @@ -37,8 +37,7 @@ type reactorTestSuite struct { reactor *Reactor syncer *syncer - conn *proxymocks.AppConnSnapshot - connQuery *proxymocks.AppConnQuery + conn *clientmocks.Client stateProvider *mocks.StateProvider snapshotChannel *p2p.Channel @@ -71,21 +70,14 @@ type reactorTestSuite struct { func setup( ctx context.Context, t *testing.T, - conn *proxymocks.AppConnSnapshot, - connQuery *proxymocks.AppConnQuery, + conn *clientmocks.Client, stateProvider *mocks.StateProvider, chBuf uint, ) *reactorTestSuite { t.Helper() if conn == nil { - conn = &proxymocks.AppConnSnapshot{} - } - if connQuery == nil { - connQuery = &proxymocks.AppConnQuery{} - } - if stateProvider == nil { - stateProvider = &mocks.StateProvider{} + conn = &clientmocks.Client{} } rts := &reactorTestSuite{ @@ -102,7 +94,6 @@ func setup( paramsOutCh: make(chan p2p.Envelope, chBuf), paramsPeerErrCh: make(chan p2p.PeerError, chBuf), conn: conn, - connQuery: connQuery, stateProvider: stateProvider, } @@ -171,7 +162,6 @@ func setup( *cfg, logger.With("component", "reactor"), conn, - connQuery, chCreator, rts.peerUpdates, rts.stateStore, @@ -186,7 +176,6 @@ func setup( *cfg, logger.With("component", "syncer"), conn, - connQuery, stateProvider, rts.snapshotChannel, rts.chunkChannel, @@ -211,7 +200,7 @@ func TestReactor_Sync(t *testing.T) { defer cancel() const snapshotHeight = 7 - rts := setup(ctx, t, nil, nil, nil, 2) + rts := setup(ctx, t, nil, nil, 2) chain := buildLightBlockChain(ctx, t, 1, 10, time.Now()) // app accepts any snapshot rts.conn.On("OfferSnapshot", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")). @@ -222,7 +211,7 @@ func TestReactor_Sync(t *testing.T) { Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) // app query returns valid state app hash - rts.connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{ + rts.conn.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{ AppVersion: testAppVersion, LastBlockHeight: snapshotHeight, LastBlockAppHash: chain[snapshotHeight+1].AppHash, @@ -237,8 +226,8 @@ func TestReactor_Sync(t *testing.T) { defer close(closeCh) go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0) - go graduallyAddPeers(t, rts.peerUpdateCh, closeCh, 1*time.Second) - go handleSnapshotRequests(t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{ + go graduallyAddPeers(ctx, t, rts.peerUpdateCh, closeCh, 1*time.Second) + go handleSnapshotRequests(ctx, t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{ { Height: uint64(snapshotHeight), Format: 1, @@ -246,7 +235,7 @@ func TestReactor_Sync(t *testing.T) { }, }) - go handleChunkRequests(t, rts.chunkOutCh, rts.chunkInCh, closeCh, []byte("abc")) + go handleChunkRequests(ctx, t, rts.chunkOutCh, rts.chunkInCh, closeCh, []byte("abc")) go handleConsensusParamsRequest(ctx, t, rts.paramsOutCh, rts.paramsInCh, closeCh) @@ -265,7 +254,7 @@ func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setup(ctx, t, nil, nil, nil, 2) + rts := setup(ctx, t, nil, nil, 2) rts.chunkInCh <- p2p.Envelope{ From: types.NodeID("aa"), @@ -316,14 +305,14 @@ func TestReactor_ChunkRequest(t *testing.T) { defer cancel() // mock ABCI connection to return local snapshots - conn := &proxymocks.AppConnSnapshot{} + conn := &clientmocks.Client{} conn.On("LoadSnapshotChunk", mock.Anything, abci.RequestLoadSnapshotChunk{ Height: tc.request.Height, Format: tc.request.Format, Chunk: tc.request.Index, }).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil) - rts := setup(ctx, t, conn, nil, nil, 2) + rts := setup(ctx, t, conn, nil, 2) rts.chunkInCh <- p2p.Envelope{ From: types.NodeID("aa"), @@ -343,7 +332,7 @@ func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setup(ctx, t, nil, nil, nil, 2) + rts := setup(ctx, t, nil, nil, 2) rts.snapshotInCh <- p2p.Envelope{ From: types.NodeID("aa"), @@ -403,12 +392,12 @@ func TestReactor_SnapshotsRequest(t *testing.T) { defer cancel() // mock ABCI connection to return local snapshots - conn := &proxymocks.AppConnSnapshot{} + conn := &clientmocks.Client{} conn.On("ListSnapshots", mock.Anything, abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{ Snapshots: tc.snapshots, }, nil) - rts := setup(ctx, t, conn, nil, nil, 100) + rts := setup(ctx, t, conn, nil, 100) rts.snapshotInCh <- p2p.Envelope{ From: types.NodeID("aa"), @@ -435,7 +424,7 @@ func TestReactor_LightBlockResponse(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setup(ctx, t, nil, nil, nil, 2) + rts := setup(ctx, t, nil, nil, 2) var height int64 = 10 // generates a random header @@ -492,7 +481,7 @@ func TestReactor_BlockProviders(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setup(ctx, t, nil, nil, nil, 2) + rts := setup(ctx, t, nil, nil, 2) rts.peerUpdateCh <- p2p.PeerUpdate{ NodeID: types.NodeID("aa"), Status: p2p.PeerStatusUp, @@ -559,7 +548,7 @@ func TestReactor_StateProviderP2P(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setup(ctx, t, nil, nil, nil, 2) + rts := setup(ctx, t, nil, nil, 2) // make syncer non nil else test won't think we are state syncing rts.reactor.syncer = rts.syncer peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength)) @@ -636,7 +625,7 @@ func TestReactor_Backfill(t *testing.T) { defer cancel() t.Cleanup(leaktest.CheckTimeout(t, 1*time.Minute)) - rts := setup(ctx, t, nil, nil, nil, 21) + rts := setup(ctx, t, nil, nil, 21) var ( startHeight int64 = 20 @@ -860,6 +849,7 @@ func mockLB(ctx context.Context, t *testing.T, height int64, time time.Time, las // graduallyAddPeers delivers a new randomly-generated peer update on peerUpdateCh once // per interval, until closeCh is closed. Each peer update is assigned a random node ID. func graduallyAddPeers( + ctx context.Context, t *testing.T, peerUpdateCh chan p2p.PeerUpdate, closeCh chan struct{}, @@ -868,6 +858,10 @@ func graduallyAddPeers( ticker := time.NewTicker(interval) for { select { + case <-ctx.Done(): + return + case <-closeCh: + return case <-ticker.C: peerUpdateCh <- p2p.PeerUpdate{ NodeID: factory.RandomNodeID(t), @@ -879,13 +873,12 @@ func graduallyAddPeers( ParamsChannel: struct{}{}, }, } - case <-closeCh: - return } } } func handleSnapshotRequests( + ctx context.Context, t *testing.T, receivingCh chan p2p.Envelope, sendingCh chan p2p.Envelope, @@ -895,6 +888,10 @@ func handleSnapshotRequests( t.Helper() for { select { + case <-ctx.Done(): + return + case <-closeCh: + return case envelope := <-receivingCh: _, ok := envelope.Message.(*ssproto.SnapshotsRequest) require.True(t, ok) @@ -910,13 +907,12 @@ func handleSnapshotRequests( }, } } - case <-closeCh: - return } } } func handleChunkRequests( + ctx context.Context, t *testing.T, receivingCh chan p2p.Envelope, sendingCh chan p2p.Envelope, @@ -926,6 +922,10 @@ func handleChunkRequests( t.Helper() for { select { + case <-ctx.Done(): + return + case <-closeCh: + return case envelope := <-receivingCh: msg, ok := envelope.Message.(*ssproto.ChunkRequest) require.True(t, ok) @@ -940,8 +940,6 @@ func handleChunkRequests( }, } - case <-closeCh: - return } } } diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index e2e41586c..78eb8d53a 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -8,6 +8,7 @@ import ( "sync" "time" + abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/p2p" @@ -54,8 +55,7 @@ var ( type syncer struct { logger log.Logger stateProvider StateProvider - conn proxy.AppConnSnapshot - connQuery proxy.AppConnQuery + conn abciclient.Client snapshots *snapshotPool snapshotCh *p2p.Channel chunkCh *p2p.Channel @@ -76,8 +76,7 @@ type syncer struct { func newSyncer( cfg config.StateSyncConfig, logger log.Logger, - conn proxy.AppConnSnapshot, - connQuery proxy.AppConnQuery, + conn abciclient.Client, stateProvider StateProvider, snapshotCh *p2p.Channel, chunkCh *p2p.Channel, @@ -88,7 +87,6 @@ func newSyncer( logger: logger, stateProvider: stateProvider, conn: conn, - connQuery: connQuery, snapshots: newSnapshotPool(), snapshotCh: snapshotCh, chunkCh: chunkCh, @@ -547,7 +545,7 @@ func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunk uin // verifyApp verifies the sync, checking the app hash, last block height and app version func (s *syncer) verifyApp(ctx context.Context, snapshot *snapshot, appVersion uint64) error { - resp, err := s.connQuery.Info(ctx, proxy.RequestInfo) + resp, err := s.conn.Info(ctx, proxy.RequestInfo) if err != nil { return fmt.Errorf("failed to query ABCI app for appHash: %w", err) } diff --git a/internal/statesync/syncer_test.go b/internal/statesync/syncer_test.go index b199fc982..e3bf49259 100644 --- a/internal/statesync/syncer_test.go +++ b/internal/statesync/syncer_test.go @@ -11,9 +11,9 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + clientmocks "github.com/tendermint/tendermint/abci/client/mocks" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/proxy" - proxymocks "github.com/tendermint/tendermint/internal/proxy/mocks" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/statesync/mocks" ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" @@ -62,13 +62,12 @@ func TestSyncer_SyncAny(t *testing.T) { stateProvider.On("AppHash", mock.Anything, uint64(2)).Return([]byte("app_hash_2"), nil) stateProvider.On("Commit", mock.Anything, uint64(1)).Return(commit, nil) stateProvider.On("State", mock.Anything, uint64(1)).Return(state, nil) - connSnapshot := &proxymocks.AppConnSnapshot{} - connQuery := &proxymocks.AppConnQuery{} + conn := &clientmocks.Client{} peerAID := types.NodeID("aa") peerBID := types.NodeID("bb") peerCID := types.NodeID("cc") - rts := setup(ctx, t, connSnapshot, connQuery, stateProvider, 4) + rts := setup(ctx, t, conn, stateProvider, 4) rts.reactor.syncer = rts.syncer @@ -110,7 +109,7 @@ func TestSyncer_SyncAny(t *testing.T) { // We start a sync, with peers sending back chunks when requested. We first reject the snapshot // with height 2 format 2, and accept the snapshot at height 1. - connSnapshot.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{ + conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{ Snapshot: &abci.Snapshot{ Height: 2, Format: 2, @@ -119,7 +118,7 @@ func TestSyncer_SyncAny(t *testing.T) { }, AppHash: []byte("app_hash_2"), }).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil) - connSnapshot.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{ + conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{ Snapshot: &abci.Snapshot{ Height: s.Height, Format: s.Format, @@ -171,7 +170,7 @@ func TestSyncer_SyncAny(t *testing.T) { // The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1, // which should cause it to keep the existing chunk 0 and 2, and restart restoration from // beginning. We also wait for a little while, to exercise the retry logic in fetchChunks(). - connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{ + conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{ Index: 2, Chunk: []byte{1, 1, 2}, }).Once().Run(func(args mock.Arguments) { time.Sleep(1 * time.Second) }).Return( &abci.ResponseApplySnapshotChunk{ @@ -179,16 +178,16 @@ func TestSyncer_SyncAny(t *testing.T) { RefetchChunks: []uint32{1}, }, nil) - connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{ + conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{ Index: 0, Chunk: []byte{1, 1, 0}, }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) - connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{ + conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{ Index: 1, Chunk: []byte{1, 1, 1}, }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) - connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{ + conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{ Index: 2, Chunk: []byte{1, 1, 2}, }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) - connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{ + conn.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{ AppVersion: testAppVersion, LastBlockHeight: 1, LastBlockAppHash: []byte("app_hash"), @@ -217,8 +216,7 @@ func TestSyncer_SyncAny(t *testing.T) { require.Equal(t, int64(len(rts.syncer.snapshots.snapshots)), rts.reactor.TotalSnapshots()) require.Equal(t, int64(0), rts.reactor.SnapshotChunksCount()) - connSnapshot.AssertExpectations(t) - connQuery.AssertExpectations(t) + conn.AssertExpectations(t) } func TestSyncer_SyncAny_noSnapshots(t *testing.T) { @@ -228,7 +226,7 @@ func TestSyncer_SyncAny_noSnapshots(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setup(ctx, t, nil, nil, stateProvider, 2) + rts := setup(ctx, t, nil, stateProvider, 2) _, _, err := rts.syncer.SyncAny(ctx, 0, func() error { return nil }) require.Equal(t, errNoSnapshots, err) @@ -241,7 +239,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setup(ctx, t, nil, nil, stateProvider, 2) + rts := setup(ctx, t, nil, stateProvider, 2) s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} peerID := types.NodeID("aa") @@ -265,7 +263,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setup(ctx, t, nil, nil, stateProvider, 2) + rts := setup(ctx, t, nil, stateProvider, 2) // s22 is tried first, then s12, then s11, then errNoSnapshots s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} @@ -307,7 +305,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setup(ctx, t, nil, nil, stateProvider, 2) + rts := setup(ctx, t, nil, stateProvider, 2) // s22 is tried first, which reject s22 and s12, then s11 will abort. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} @@ -345,7 +343,7 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setup(ctx, t, nil, nil, stateProvider, 2) + rts := setup(ctx, t, nil, stateProvider, 2) peerAID := types.NodeID("aa") peerBID := types.NodeID("bb") @@ -394,7 +392,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rts := setup(ctx, t, nil, nil, stateProvider, 2) + rts := setup(ctx, t, nil, stateProvider, 2) errBoom := errors.New("boom") s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} @@ -444,7 +442,7 @@ func TestSyncer_offerSnapshot(t *testing.T) { stateProvider := &mocks.StateProvider{} stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - rts := setup(ctx, t, nil, nil, stateProvider, 2) + rts := setup(ctx, t, nil, stateProvider, 2) s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")} rts.conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{ @@ -497,7 +495,7 @@ func TestSyncer_applyChunks_Results(t *testing.T) { stateProvider := &mocks.StateProvider{} stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - rts := setup(ctx, t, nil, nil, stateProvider, 2) + rts := setup(ctx, t, nil, stateProvider, 2) body := []byte{1, 2, 3} chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, t.TempDir()) @@ -557,7 +555,7 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) { stateProvider := &mocks.StateProvider{} stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - rts := setup(ctx, t, nil, nil, stateProvider, 2) + rts := setup(ctx, t, nil, stateProvider, 2) chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, t.TempDir()) require.NoError(t, err) @@ -628,7 +626,7 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) { stateProvider := &mocks.StateProvider{} stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - rts := setup(ctx, t, nil, nil, stateProvider, 2) + rts := setup(ctx, t, nil, stateProvider, 2) // Set up three peers across two snapshots, and ask for one of them to be banned. // It should be banned from all snapshots. @@ -761,9 +759,9 @@ func TestSyncer_verifyApp(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - rts := setup(ctx, t, nil, nil, nil, 2) + rts := setup(ctx, t, nil, nil, 2) - rts.connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(tc.response, tc.err) + rts.conn.On("Info", mock.Anything, proxy.RequestInfo).Return(tc.response, tc.err) err := rts.syncer.verifyApp(ctx, s, appVersion) unwrapped := errors.Unwrap(err) if unwrapped != nil { diff --git a/node/node.go b/node/node.go index 18f592796..e57b7c4db 100644 --- a/node/node.go +++ b/node/node.go @@ -100,7 +100,10 @@ func newDefaultNode( return nil, err } - appClient, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) + appClient, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) + if err != nil { + return nil, err + } return makeNode( ctx, @@ -120,7 +123,7 @@ func makeNode( cfg *config.Config, filePrivval *privval.FilePV, nodeKey types.NodeKey, - clientCreator abciclient.Creator, + client abciclient.Client, genesisDocProvider genesisDocProvider, dbProvider config.DBProvider, logger log.Logger, @@ -155,7 +158,7 @@ func makeNode( nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). - proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), nodeMetrics.proxy) + proxyApp := proxy.New(client, logger.With("module", "proxy"), nodeMetrics.proxy) if err := proxyApp.Start(ctx); err != nil { return nil, fmt.Errorf("error starting proxy app connections: %w", err) } @@ -281,7 +284,7 @@ func makeNode( blockExec := sm.NewBlockExecutor( stateStore, logger.With("module", "state"), - proxyApp.Consensus(), + proxyApp, mp, evPool, blockStore, @@ -339,8 +342,7 @@ func makeNode( genDoc.InitialHeight, *cfg.StateSync, logger.With("module", "statesync"), - proxyApp.Snapshot(), - proxyApp.Query(), + proxyApp, router.OpenChannel, peerManager.Subscribe(ctx), stateStore, @@ -390,14 +392,13 @@ func makeNode( shutdownOps: makeCloser(closers), rpcEnv: &rpccore.Environment{ - ProxyAppQuery: proxyApp.Query(), - ProxyAppMempool: proxyApp.Mempool(), - - StateStore: stateStore, - BlockStore: blockStore, + ProxyApp: proxyApp, EvidencePool: evPool, ConsensusState: csState, + StateStore: stateStore, + BlockStore: blockStore, + ConsensusReactor: csReactor, BlockSyncReactor: bcReactor, @@ -752,14 +753,14 @@ func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.Gene return state, nil } -func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOptions { +func getRouterConfig(conf *config.Config, appClient abciclient.Client) p2p.RouterOptions { opts := p2p.RouterOptions{ QueueType: conf.P2P.QueueType, } - if conf.FilterPeers && proxyApp != nil { + if conf.FilterPeers && appClient != nil { opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error { - res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{ + res, err := appClient.Query(ctx, abci.RequestQuery{ Path: fmt.Sprintf("/p2p/filter/id/%s", id), }) if err != nil { @@ -773,7 +774,7 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt } opts.FilterPeerByIP = func(ctx context.Context, ip net.IP, port uint16) error { - res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{ + res, err := appClient.Query(ctx, abci.RequestQuery{ Path: fmt.Sprintf("/p2p/filter/addr/%s", net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))), }) if err != nil { diff --git a/node/node_test.go b/node/node_test.go index 5fbf80e00..690da1a4d 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -273,8 +273,8 @@ func TestCreateProposalBlock(t *testing.T) { logger := log.NewNopLogger() - cc := abciclient.NewLocalCreator(kvstore.NewApplication()) - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + cc := abciclient.NewLocalClient(logger, kvstore.NewApplication()) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err = proxyApp.Start(ctx) require.NoError(t, err) @@ -291,7 +291,7 @@ func TestCreateProposalBlock(t *testing.T) { mp := mempool.NewTxMempool( logger.With("module", "mempool"), cfg.Mempool, - proxyApp.Mempool(), + proxyApp, ) // Make EvidencePool @@ -327,7 +327,7 @@ func TestCreateProposalBlock(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, logger, - proxyApp.Consensus(), + proxyApp, mp, evidencePool, blockStore, @@ -371,8 +371,8 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { logger := log.NewNopLogger() - cc := abciclient.NewLocalCreator(kvstore.NewApplication()) - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + cc := abciclient.NewLocalClient(logger, kvstore.NewApplication()) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err = proxyApp.Start(ctx) require.NoError(t, err) @@ -390,7 +390,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { mp := mempool.NewTxMempool( logger.With("module", "mempool"), cfg.Mempool, - proxyApp.Mempool(), + proxyApp, ) // fill the mempool with one txs just below the maximum size @@ -402,7 +402,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, logger, - proxyApp.Consensus(), + proxyApp, mp, sm.EmptyEvidencePool{}, blockStore, @@ -438,8 +438,8 @@ func TestMaxProposalBlockSize(t *testing.T) { logger := log.NewNopLogger() - cc := abciclient.NewLocalCreator(kvstore.NewApplication()) - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + cc := abciclient.NewLocalClient(logger, kvstore.NewApplication()) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err = proxyApp.Start(ctx) require.NoError(t, err) @@ -454,7 +454,7 @@ func TestMaxProposalBlockSize(t *testing.T) { mp := mempool.NewTxMempool( logger.With("module", "mempool"), cfg.Mempool, - proxyApp.Mempool(), + proxyApp, ) // fill the mempool with one txs just below the maximum size @@ -473,7 +473,7 @@ func TestMaxProposalBlockSize(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, logger, - proxyApp.Consensus(), + proxyApp, mp, sm.EmptyEvidencePool{}, blockStore, diff --git a/node/public.go b/node/public.go index 0d6f1d93e..af3aece8e 100644 --- a/node/public.go +++ b/node/public.go @@ -35,7 +35,7 @@ func New( ctx context.Context, conf *config.Config, logger log.Logger, - cf abciclient.Creator, + cf abciclient.Client, gen *types.GenesisDoc, ) (service.Service, error) { nodeKey, err := types.LoadOrGenNodeKey(conf.NodeKeyFile()) diff --git a/node/setup.go b/node/setup.go index bc071f3cf..7a473fae9 100644 --- a/node/setup.go +++ b/node/setup.go @@ -10,6 +10,7 @@ import ( dbm "github.com/tendermint/tm-db" + abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/internal/blocksync" @@ -20,7 +21,6 @@ import ( "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/internal/p2p/pex" - "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/internal/state/indexer/sink" @@ -171,7 +171,7 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { func createMempoolReactor( ctx context.Context, cfg *config.Config, - proxyApp proxy.AppConns, + appClient abciclient.Client, store sm.Store, memplMetrics *mempool.Metrics, peerManager *p2p.PeerManager, @@ -183,7 +183,7 @@ func createMempoolReactor( mp := mempool.NewTxMempool( logger, cfg.Mempool, - proxyApp.Mempool(), + appClient, mempool.WithMetrics(memplMetrics), mempool.WithPreCheck(sm.TxPreCheckFromStore(store)), mempool.WithPostCheck(sm.TxPostCheckFromStore(store)), @@ -387,7 +387,7 @@ func createRouter( nodeKey types.NodeKey, peerManager *p2p.PeerManager, cfg *config.Config, - proxyApp proxy.AppConns, + appClient abciclient.Client, ) (*p2p.Router, error) { p2pLogger := logger.With("module", "p2p") @@ -418,7 +418,7 @@ func createRouter( peerManager, []p2p.Transport{transport}, []p2p.Endpoint{ep}, - getRouterConfig(cfg, proxyApp), + getRouterConfig(cfg, appClient), ) } diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 19deac607..4c13b322a 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -98,7 +98,7 @@ func StartTendermint( } } - papp := abciclient.NewLocalCreator(app) + papp := abciclient.NewLocalClient(logger, app) tmNode, err := node.New(ctx, conf, logger, papp, nil) if err != nil { return nil, func(_ context.Context) error { cancel(); return nil }, err diff --git a/test/e2e/node/main.go b/test/e2e/node/main.go index 704cc06bb..7f46eea89 100644 --- a/test/e2e/node/main.go +++ b/test/e2e/node/main.go @@ -136,7 +136,7 @@ func startNode(ctx context.Context, cfg *Config) error { ctx, tmcfg, nodeLogger, - abciclient.NewLocalCreator(app), + abciclient.NewLocalClient(nodeLogger, app), nil, ) if err != nil { diff --git a/test/fuzz/mempool/checktx.go b/test/fuzz/mempool/checktx.go index a6e7006d0..8be90f0c2 100644 --- a/test/fuzz/mempool/checktx.go +++ b/test/fuzz/mempool/checktx.go @@ -15,9 +15,9 @@ var getMp func() mempool.Mempool func init() { app := kvstore.NewApplication() - cc := abciclient.NewLocalCreator(app) - appConnMem, _ := cc(log.NewNopLogger()) - err := appConnMem.Start(context.TODO()) + logger := log.NewNopLogger() + conn := abciclient.NewLocalClient(logger, app) + err := conn.Start(context.TODO()) if err != nil { panic(err) } @@ -27,12 +27,7 @@ func init() { getMp = func() mempool.Mempool { if mp == nil { - mp = mempool.NewTxMempool( - log.NewNopLogger(), - cfg, - appConnMem, - ) - + mp = mempool.NewTxMempool(logger, cfg, conn) } return mp }