Browse Source

proxy: collapse triforcated abci.Client (#8067)

pull/8076/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
8df7b6103f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 500 additions and 789 deletions
  1. +0
    -33
      abci/client/creators.go
  2. +6
    -2
      go.mod
  3. +4
    -4
      internal/blocksync/reactor_test.go
  4. +19
    -15
      internal/consensus/replay.go
  5. +7
    -4
      internal/consensus/replay_file.go
  6. +3
    -14
      internal/consensus/replay_stubs.go
  7. +24
    -23
      internal/consensus/replay_test.go
  8. +3
    -3
      internal/consensus/wal_generator.go
  9. +3
    -4
      internal/mempool/mempool.go
  10. +7
    -7
      internal/mempool/mempool_test.go
  11. +0
    -249
      internal/proxy/app_conn.go
  12. +182
    -11
      internal/proxy/client.go
  13. +88
    -18
      internal/proxy/client_test.go
  14. +0
    -131
      internal/proxy/multi_app_conn.go
  15. +0
    -99
      internal/proxy/multi_app_conn_test.go
  16. +2
    -2
      internal/rpc/core/abci.go
  17. +2
    -3
      internal/rpc/core/env.go
  18. +1
    -1
      internal/rpc/core/mempool.go
  19. +11
    -11
      internal/state/execution.go
  20. +21
    -19
      internal/state/execution_test.go
  21. +0
    -9
      internal/state/helpers_test.go
  22. +13
    -9
      internal/state/validation_test.go
  23. +3
    -7
      internal/statesync/reactor.go
  24. +34
    -36
      internal/statesync/reactor_test.go
  25. +4
    -6
      internal/statesync/syncer.go
  26. +23
    -25
      internal/statesync/syncer_test.go
  27. +16
    -15
      node/node.go
  28. +12
    -12
      node/node_test.go
  29. +1
    -1
      node/public.go
  30. +5
    -5
      node/setup.go
  31. +1
    -1
      rpc/test/helpers.go
  32. +1
    -1
      test/e2e/node/main.go
  33. +4
    -9
      test/fuzz/mempool/checktx.go

+ 0
- 33
abci/client/creators.go View File

@ -1,33 +0,0 @@
package abciclient
import (
"fmt"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
)
// Creator creates new ABCI clients.
type Creator func(log.Logger) (Client, error)
// NewLocalCreator returns a Creator for the given app,
// which will be running locally.
func NewLocalCreator(app types.Application) Creator {
return func(logger log.Logger) (Client, error) {
return NewLocalClient(logger, app), nil
}
}
// NewRemoteCreator returns a Creator for the given address (e.g.
// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you
// want the client to connect before reporting success.
func NewRemoteCreator(logger log.Logger, addr, transport string, mustConnect bool) Creator {
return func(log.Logger) (Client, error) {
remoteApp, err := NewClient(logger, addr, transport, mustConnect)
if err != nil {
return nil, fmt.Errorf("failed to connect to proxy: %w", err)
}
return remoteApp, nil
}
}

+ 6
- 2
go.mod View File

@ -39,6 +39,12 @@ require (
pgregory.net/rapid v0.4.7 pgregory.net/rapid v0.4.7
) )
require (
github.com/creachadair/atomicfile v0.2.4
github.com/google/go-cmp v0.5.7
gotest.tools v2.2.0+incompatible
)
require ( require (
4d63.com/gochecknoglobals v0.1.0 // indirect 4d63.com/gochecknoglobals v0.1.0 // indirect
github.com/Antonboom/errname v0.1.5 // indirect github.com/Antonboom/errname v0.1.5 // indirect
@ -67,7 +73,6 @@ require (
github.com/chavacava/garif v0.0.0-20210405164556-e8a0a408d6af // indirect github.com/chavacava/garif v0.0.0-20210405164556-e8a0a408d6af // indirect
github.com/containerd/continuity v0.2.1 // indirect github.com/containerd/continuity v0.2.1 // indirect
github.com/daixiang0/gci v0.3.1-0.20220208004058-76d765e3ab48 // indirect github.com/daixiang0/gci v0.3.1-0.20220208004058-76d765e3ab48 // indirect
github.com/creachadair/atomicfile v0.2.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denis-tingajkin/go-header v0.4.2 // indirect github.com/denis-tingajkin/go-header v0.4.2 // indirect
github.com/dgraph-io/badger/v2 v2.2007.2 // indirect github.com/dgraph-io/badger/v2 v2.2007.2 // indirect
@ -107,7 +112,6 @@ require (
github.com/golangci/revgrep v0.0.0-20210930125155-c22e5001d4f2 // indirect github.com/golangci/revgrep v0.0.0-20210930125155-c22e5001d4f2 // indirect
github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect
github.com/google/btree v1.0.0 // indirect github.com/google/btree v1.0.0 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/gordonklaus/ineffassign v0.0.0-20210914165742-4cc7213b9bc8 // indirect github.com/gordonklaus/ineffassign v0.0.0-20210914165742-4cc7213b9bc8 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
github.com/gostaticanalysis/comment v1.4.2 // indirect github.com/gostaticanalysis/comment v1.4.2 // indirect


+ 4
- 4
internal/blocksync/reactor_test.go View File

@ -33,7 +33,7 @@ type reactorTestSuite struct {
nodes []types.NodeID nodes []types.NodeID
reactors map[types.NodeID]*Reactor reactors map[types.NodeID]*Reactor
app map[types.NodeID]proxy.AppConns
app map[types.NodeID]abciclient.Client
blockSyncChannels map[types.NodeID]*p2p.Channel blockSyncChannels map[types.NodeID]*p2p.Channel
peerChans map[types.NodeID]chan p2p.PeerUpdate peerChans map[types.NodeID]chan p2p.PeerUpdate
@ -64,7 +64,7 @@ func setup(
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}), network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
nodes: make([]types.NodeID, 0, numNodes), nodes: make([]types.NodeID, 0, numNodes),
reactors: make(map[types.NodeID]*Reactor, numNodes), reactors: make(map[types.NodeID]*Reactor, numNodes),
app: make(map[types.NodeID]proxy.AppConns, numNodes),
app: make(map[types.NodeID]abciclient.Client, numNodes),
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes), blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
@ -109,7 +109,7 @@ func (rts *reactorTestSuite) addNode(
logger := log.TestingLogger() logger := log.TestingLogger()
rts.nodes = append(rts.nodes, nodeID) rts.nodes = append(rts.nodes, nodeID)
rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), logger, proxy.NopMetrics())
rts.app[nodeID] = proxy.New(abciclient.NewLocalClient(logger, &abci.BaseApplication{}), logger, proxy.NopMetrics())
require.NoError(t, rts.app[nodeID].Start(ctx)) require.NoError(t, rts.app[nodeID].Start(ctx))
blockDB := dbm.NewMemDB() blockDB := dbm.NewMemDB()
@ -124,7 +124,7 @@ func (rts *reactorTestSuite) addNode(
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
log.TestingLogger(), log.TestingLogger(),
rts.app[nodeID].Consensus(),
rts.app[nodeID],
mock.Mempool{}, mock.Mempool{},
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore, blockStore,


+ 19
- 15
internal/consensus/replay.go View File

@ -10,6 +10,7 @@ import (
"reflect" "reflect"
"time" "time"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/eventbus"
@ -237,10 +238,10 @@ func (h *Handshaker) NBlocks() int {
} }
// TODO: retry the handshake/replay if it fails ? // TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) error {
func (h *Handshaker) Handshake(ctx context.Context, appClient abciclient.Client) error {
// Handshake is done via ABCI Info on the query conn. // Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo)
res, err := appClient.Info(ctx, proxy.RequestInfo)
if err != nil { if err != nil {
return fmt.Errorf("error calling Info: %w", err) return fmt.Errorf("error calling Info: %w", err)
} }
@ -264,7 +265,7 @@ func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) err
} }
// Replay blocks up to the latest in the blockstore. // Replay blocks up to the latest in the blockstore.
_, err = h.ReplayBlocks(ctx, h.initialState, appHash, blockHeight, proxyApp)
_, err = h.ReplayBlocks(ctx, h.initialState, appHash, blockHeight, appClient)
if err != nil { if err != nil {
return fmt.Errorf("error on replay: %w", err) return fmt.Errorf("error on replay: %w", err)
} }
@ -285,7 +286,7 @@ func (h *Handshaker) ReplayBlocks(
state sm.State, state sm.State,
appHash []byte, appHash []byte,
appBlockHeight int64, appBlockHeight int64,
proxyApp proxy.AppConns,
appClient abciclient.Client,
) ([]byte, error) { ) ([]byte, error) {
storeBlockBase := h.store.Base() storeBlockBase := h.store.Base()
storeBlockHeight := h.store.Height() storeBlockHeight := h.store.Height()
@ -316,7 +317,7 @@ func (h *Handshaker) ReplayBlocks(
Validators: nextVals, Validators: nextVals,
AppStateBytes: h.genDoc.AppState, AppStateBytes: h.genDoc.AppState,
} }
res, err := proxyApp.Consensus().InitChain(ctx, req)
res, err := appClient.InitChain(ctx, req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -390,7 +391,7 @@ func (h *Handshaker) ReplayBlocks(
// Either the app is asking for replay, or we're all synced up. // Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight { if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false)
return h.replayBlocks(ctx, state, appClient, appBlockHeight, storeBlockHeight, false)
} else if appBlockHeight == storeBlockHeight { } else if appBlockHeight == storeBlockHeight {
// We're good! // We're good!
@ -405,7 +406,7 @@ func (h *Handshaker) ReplayBlocks(
case appBlockHeight < stateBlockHeight: case appBlockHeight < stateBlockHeight:
// the app is further behind than it should be, so replay blocks // the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL // but leave the last block to go through the WAL
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true)
return h.replayBlocks(ctx, state, appClient, appBlockHeight, storeBlockHeight, true)
case appBlockHeight == stateBlockHeight: case appBlockHeight == stateBlockHeight:
// We haven't run Commit (both the state and app are one block behind), // We haven't run Commit (both the state and app are one block behind),
@ -413,7 +414,7 @@ func (h *Handshaker) ReplayBlocks(
// NOTE: We could instead use the cs.WAL on cs.Start, // NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT // but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
h.logger.Info("Replay last block using real app") h.logger.Info("Replay last block using real app")
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
state, err = h.replayBlock(ctx, state, storeBlockHeight, appClient)
return state.AppHash, err return state.AppHash, err
case appBlockHeight == storeBlockHeight: case appBlockHeight == storeBlockHeight:
@ -426,6 +427,9 @@ func (h *Handshaker) ReplayBlocks(
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := mockApp.Start(ctx); err != nil {
return nil, err
}
h.logger.Info("Replay last block using mock app") h.logger.Info("Replay last block using mock app")
state, err = h.replayBlock(ctx, state, storeBlockHeight, mockApp) state, err = h.replayBlock(ctx, state, storeBlockHeight, mockApp)
@ -445,7 +449,7 @@ func (h *Handshaker) ReplayBlocks(
func (h *Handshaker) replayBlocks( func (h *Handshaker) replayBlocks(
ctx context.Context, ctx context.Context,
state sm.State, state sm.State,
proxyApp proxy.AppConns,
appClient abciclient.Client,
appBlockHeight, appBlockHeight,
storeBlockHeight int64, storeBlockHeight int64,
mutateState bool) ([]byte, error) { mutateState bool) ([]byte, error) {
@ -481,16 +485,16 @@ func (h *Handshaker) replayBlocks(
// We emit events for the index services at the final block due to the sync issue when // We emit events for the index services at the final block due to the sync issue when
// the node shutdown during the block committing status. // the node shutdown during the block committing status.
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus) blockExec.SetEventBus(h.eventBus)
appHash, err = sm.ExecCommitBlock(ctx, appHash, err = sm.ExecCommitBlock(ctx,
blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
blockExec, appClient, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} else { } else {
appHash, err = sm.ExecCommitBlock(ctx, appHash, err = sm.ExecCommitBlock(ctx,
nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
nil, appClient, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -501,7 +505,7 @@ func (h *Handshaker) replayBlocks(
if mutateState { if mutateState {
// sync the final block // sync the final block
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
state, err = h.replayBlock(ctx, state, storeBlockHeight, appClient)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -517,14 +521,14 @@ func (h *Handshaker) replayBlock(
ctx context.Context, ctx context.Context,
state sm.State, state sm.State,
height int64, height int64,
proxyApp proxy.AppConnConsensus,
appClient abciclient.Client,
) (sm.State, error) { ) (sm.State, error) {
block := h.store.LoadBlock(height) block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height) meta := h.store.LoadBlockMeta(height)
// Use stubs for both mempool and evidence pool since no transactions nor // Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists. // evidence are needed here - block already exists.
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus) blockExec.SetEventBus(h.eventBus)
var err error var err error


+ 7
- 4
internal/consensus/replay_file.go View File

@ -326,9 +326,12 @@ func newConsensusStateForReplay(
return nil, err return nil, err
} }
// Create proxyAppConn connection (consensus, mempool, query)
clientCreator, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
client, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
if err != nil {
return nil, err
}
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
err = proxyApp.Start(ctx) err = proxyApp.Start(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("starting proxy app conns: %w", err) return nil, fmt.Errorf("starting proxy app conns: %w", err)
@ -346,7 +349,7 @@ func newConsensusStateForReplay(
} }
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore)
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore)
consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec, consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec,
blockStore, mempool, evpool) blockStore, mempool, evpool)


+ 3
- 14
internal/consensus/replay_stubs.go View File

@ -61,22 +61,11 @@ func newMockProxyApp(
logger log.Logger, logger log.Logger,
appHash []byte, appHash []byte,
abciResponses *tmstate.ABCIResponses, abciResponses *tmstate.ABCIResponses,
) (proxy.AppConnConsensus, error) {
clientCreator := abciclient.NewLocalCreator(&mockProxyApp{
) (abciclient.Client, error) {
return proxy.New(abciclient.NewLocalClient(logger, &mockProxyApp{
appHash: appHash, appHash: appHash,
abciResponses: abciResponses, abciResponses: abciResponses,
})
cli, err := clientCreator(logger)
if err != nil {
return nil, err
}
if err = cli.Start(ctx); err != nil {
return nil, err
}
return proxy.NewAppConnConsensus(cli, proxy.NopMetrics()), nil
}), logger, proxy.NopMetrics()), nil
} }
type mockProxyApp struct { type mockProxyApp struct {


+ 24
- 23
internal/consensus/replay_test.go View File

@ -748,11 +748,11 @@ func testHandshakeReplay(
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int()))) filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int())))
t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) }) t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) })
clientCreator2 := abciclient.NewLocalCreator(kvstoreApp)
clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp)
if nBlocks > 0 { if nBlocks > 0 {
// run nBlocks against a new client to build up the app state. // run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state // use a throwaway tendermint state
proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics())
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
stateDB1 := dbm.NewMemDB() stateDB1 := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB1) stateStore := sm.NewStore(stateDB1)
err := stateStore.Save(genesisState) err := stateStore.Save(genesisState)
@ -773,9 +773,10 @@ func testHandshakeReplay(
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err) require.NoError(t, err)
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics())
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
require.True(t, proxyApp.IsRunning())
require.NotNil(t, proxyApp)
t.Cleanup(func() { cancel(); proxyApp.Wait() }) t.Cleanup(func() { cancel(); proxyApp.Wait() })
err = handshaker.Handshake(ctx, proxyApp) err = handshaker.Handshake(ctx, proxyApp)
@ -786,7 +787,7 @@ func testHandshakeReplay(
require.NoError(t, err, "Error on abci handshake") require.NoError(t, err, "Error on abci handshake")
// get the latest app hash from the app // get the latest app hash from the app
res, err := proxyApp.Query().Info(ctx, abci.RequestInfo{Version: ""})
res, err := proxyApp.Info(ctx, abci.RequestInfo{Version: ""})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -819,11 +820,11 @@ func applyBlock(
evpool sm.EvidencePool, evpool sm.EvidencePool,
st sm.State, st sm.State,
blk *types.Block, blk *types.Block,
proxyApp proxy.AppConns,
appClient abciclient.Client,
blockStore *mockBlockStore, blockStore *mockBlockStore,
) sm.State { ) sm.State {
testPartSize := types.BlockPartSizeBytes testPartSize := types.BlockPartSizeBytes
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mempool, evpool, blockStore)
bps, err := blk.MakePartSet(testPartSize) bps, err := blk.MakePartSet(testPartSize)
require.NoError(t, err) require.NoError(t, err)
@ -836,7 +837,7 @@ func applyBlock(
func buildAppStateFromChain( func buildAppStateFromChain(
ctx context.Context, ctx context.Context,
t *testing.T, t *testing.T,
proxyApp proxy.AppConns,
appClient abciclient.Client,
stateStore sm.Store, stateStore sm.Store,
mempool mempool.Mempool, mempool mempool.Mempool,
evpool sm.EvidencePool, evpool sm.EvidencePool,
@ -848,11 +849,11 @@ func buildAppStateFromChain(
) { ) {
t.Helper() t.Helper()
// start a new app without handshake, play nBlocks blocks // start a new app without handshake, play nBlocks blocks
require.NoError(t, proxyApp.Start(ctx))
require.NoError(t, appClient.Start(ctx))
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
validators := types.TM2PB.ValidatorUpdates(state.Validators) validators := types.TM2PB.ValidatorUpdates(state.Validators)
_, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{
_, err := appClient.InitChain(ctx, abci.RequestInitChain{
Validators: validators, Validators: validators,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -863,18 +864,18 @@ func buildAppStateFromChain(
case 0: case 0:
for i := 0; i < nBlocks; i++ { for i := 0; i < nBlocks; i++ {
block := chain[i] block := chain[i]
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore)
} }
case 1, 2, 3: case 1, 2, 3:
for i := 0; i < nBlocks-1; i++ { for i := 0; i < nBlocks-1; i++ {
block := chain[i] block := chain[i]
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore)
} }
if mode == 2 || mode == 3 { if mode == 2 || mode == 3 {
// update the kvstore height and apphash // update the kvstore height and apphash
// as if we ran commit but not // as if we ran commit but not
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, blockStore)
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], appClient, blockStore)
} }
default: default:
require.Fail(t, "unknown mode %v", mode) require.Fail(t, "unknown mode %v", mode)
@ -902,14 +903,14 @@ func buildTMStateFromChain(
kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger, kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger,
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode))) filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))
defer kvstoreApp.Close() defer kvstoreApp.Close()
clientCreator := abciclient.NewLocalCreator(kvstoreApp)
client := abciclient.NewLocalClient(logger, kvstoreApp)
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx)) require.NoError(t, proxyApp.Start(ctx))
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
validators := types.TM2PB.ValidatorUpdates(state.Validators) validators := types.TM2PB.ValidatorUpdates(state.Validators)
_, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{
_, err := proxyApp.InitChain(ctx, abci.RequestInitChain{
Validators: validators, Validators: validators,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -975,8 +976,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
// - 0x03 // - 0x03
{ {
app := &badApp{numBlocks: 3, allHashesAreWrong: true} app := &badApp{numBlocks: 3, allHashesAreWrong: true}
clientCreator := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
client := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx) err := proxyApp.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { cancel(); proxyApp.Wait() }) t.Cleanup(func() { cancel(); proxyApp.Wait() })
@ -995,8 +996,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
// - RANDOM HASH // - RANDOM HASH
{ {
app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true} app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true}
clientCreator := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
client := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx) err := proxyApp.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { cancel(); proxyApp.Wait() }) t.Cleanup(func() { cancel(); proxyApp.Wait() })
@ -1226,12 +1227,13 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
logger := log.NewNopLogger()
votePower := 10 + int64(rand.Uint32()) votePower := 10 + int64(rand.Uint32())
val, _, err := factory.Validator(ctx, votePower) val, _, err := factory.Validator(ctx, votePower)
require.NoError(t, err) require.NoError(t, err)
vals := types.NewValidatorSet([]*types.Validator{val}) vals := types.NewValidatorSet([]*types.Validator{val})
app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)} app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
clientCreator := abciclient.NewLocalCreator(app)
client := abciclient.NewLocalClient(logger, app)
cfg, err := ResetConfig(t.TempDir(), "handshake_test_") cfg, err := ResetConfig(t.TempDir(), "handshake_test_")
require.NoError(t, err) require.NoError(t, err)
@ -1250,9 +1252,8 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err) require.NoError(t, err)
logger := log.TestingLogger()
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
require.NoError(t, handshaker.Handshake(ctx, proxyApp), "error on abci handshake") require.NoError(t, handshaker.Handshake(ctx, proxyApp), "error on abci handshake")


+ 3
- 3
internal/consensus/wal_generator.go View File

@ -67,8 +67,8 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
} }
blockStore := store.NewBlockStore(blockStoreDB) blockStore := store.NewBlockStore(blockStoreDB)
proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics())
proxyLogger := logger.With("module", "proxy")
proxyApp := proxy.New(abciclient.NewLocalClient(logger, app), proxyLogger, proxy.NopMetrics())
if err := proxyApp.Start(ctx); err != nil { if err := proxyApp.Start(ctx); err != nil {
t.Fatal(fmt.Errorf("failed to start proxy app connections: %w", err)) t.Fatal(fmt.Errorf("failed to start proxy app connections: %w", err))
} }
@ -82,7 +82,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
mempool := emptyMempool{} mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{} evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore)
consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool) consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)


+ 3
- 4
internal/mempool/mempool.go View File

@ -9,10 +9,10 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/libs/clist" "github.com/tendermint/tendermint/internal/libs/clist"
"github.com/tendermint/tendermint/internal/proxy"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math" tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -31,7 +31,7 @@ type TxMempool struct {
logger log.Logger logger log.Logger
metrics *Metrics metrics *Metrics
config *config.MempoolConfig config *config.MempoolConfig
proxyAppConn proxy.AppConnMempool
proxyAppConn abciclient.Client
// txsAvailable fires once for each height when the mempool is not empty // txsAvailable fires once for each height when the mempool is not empty
txsAvailable chan struct{} txsAvailable chan struct{}
@ -93,7 +93,7 @@ type TxMempool struct {
func NewTxMempool( func NewTxMempool(
logger log.Logger, logger log.Logger,
cfg *config.MempoolConfig, cfg *config.MempoolConfig,
proxyAppConn proxy.AppConnMempool,
proxyAppConn abciclient.Client,
options ...TxMempoolOption, options ...TxMempoolOption,
) *TxMempool { ) *TxMempool {
@ -421,7 +421,6 @@ func (txmp *TxMempool) Update(
newPreFn PreCheckFunc, newPreFn PreCheckFunc,
newPostFn PostCheckFunc, newPostFn PostCheckFunc,
) error { ) error {
txmp.height = blockHeight txmp.height = blockHeight
txmp.notifiedTxsAvailable = false txmp.notifiedTxsAvailable = false


+ 7
- 7
internal/mempool/mempool_test.go View File

@ -78,24 +78,24 @@ func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoo
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx) ctx, cancel = context.WithCancel(ctx)
app := &application{kvstore.NewApplication()}
cc := abciclient.NewLocalCreator(app)
logger := log.TestingLogger() logger := log.TestingLogger()
conn := abciclient.NewLocalClient(logger, &application{
kvstore.NewApplication(),
})
cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|")) cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|"))
require.NoError(t, err) require.NoError(t, err)
cfg.Mempool.CacheSize = cacheSize cfg.Mempool.CacheSize = cacheSize
appConnMem, err := cc(logger)
require.NoError(t, err)
require.NoError(t, appConnMem.Start(ctx))
require.NoError(t, conn.Start(ctx))
t.Cleanup(func() { t.Cleanup(func() {
os.RemoveAll(cfg.RootDir) os.RemoveAll(cfg.RootDir)
cancel() cancel()
appConnMem.Wait()
conn.Wait()
}) })
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, options...)
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, conn, options...)
} }
func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {


+ 0
- 249
internal/proxy/app_conn.go View File

@ -1,249 +0,0 @@
package proxy
import (
"context"
"time"
"github.com/go-kit/kit/metrics"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/types"
)
//go:generate ../../scripts/mockery_generate.sh AppConnConsensus|AppConnMempool|AppConnQuery|AppConnSnapshot
//----------------------------------------------------------------------------------------
// Enforce which abci msgs can be sent on a connection at the type level
type AppConnConsensus interface {
Error() error
InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error)
PrepareProposal(context.Context, types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error)
ProcessProposal(context.Context, types.RequestProcessProposal) (*types.ResponseProcessProposal, error)
ExtendVote(context.Context, types.RequestExtendVote) (*types.ResponseExtendVote, error)
VerifyVoteExtension(context.Context, types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error)
FinalizeBlock(context.Context, types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error)
Commit(context.Context) (*types.ResponseCommit, error)
}
type AppConnMempool interface {
Error() error
CheckTx(context.Context, types.RequestCheckTx) (*types.ResponseCheckTx, error)
Flush(context.Context) error
}
type AppConnQuery interface {
Error() error
Echo(context.Context, string) (*types.ResponseEcho, error)
Info(context.Context, types.RequestInfo) (*types.ResponseInfo, error)
Query(context.Context, types.RequestQuery) (*types.ResponseQuery, error)
}
type AppConnSnapshot interface {
Error() error
ListSnapshots(context.Context, types.RequestListSnapshots) (*types.ResponseListSnapshots, error)
OfferSnapshot(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
LoadSnapshotChunk(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
ApplySnapshotChunk(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
}
//-----------------------------------------------------------------------------------------
// Implements AppConnConsensus (subset of abciclient.Client)
type appConnConsensus struct {
metrics *Metrics
appConn abciclient.Client
}
var _ AppConnConsensus = (*appConnConsensus)(nil)
func NewAppConnConsensus(appConn abciclient.Client, metrics *Metrics) AppConnConsensus {
return &appConnConsensus{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnConsensus) Error() error {
return app.appConn.Error()
}
func (app *appConnConsensus) InitChain(
ctx context.Context,
req types.RequestInitChain,
) (*types.ResponseInitChain, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))()
return app.appConn.InitChain(ctx, req)
}
func (app *appConnConsensus) PrepareProposal(
ctx context.Context,
req types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))()
return app.appConn.PrepareProposal(ctx, req)
}
func (app *appConnConsensus) ProcessProposal(
ctx context.Context,
req types.RequestProcessProposal,
) (*types.ResponseProcessProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))()
return app.appConn.ProcessProposal(ctx, req)
}
func (app *appConnConsensus) ExtendVote(
ctx context.Context,
req types.RequestExtendVote,
) (*types.ResponseExtendVote, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))()
return app.appConn.ExtendVote(ctx, req)
}
func (app *appConnConsensus) VerifyVoteExtension(
ctx context.Context,
req types.RequestVerifyVoteExtension,
) (*types.ResponseVerifyVoteExtension, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))()
return app.appConn.VerifyVoteExtension(ctx, req)
}
func (app *appConnConsensus) FinalizeBlock(
ctx context.Context,
req types.RequestFinalizeBlock,
) (*types.ResponseFinalizeBlock, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))()
return app.appConn.FinalizeBlock(ctx, req)
}
func (app *appConnConsensus) Commit(ctx context.Context) (*types.ResponseCommit, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))()
return app.appConn.Commit(ctx)
}
//------------------------------------------------
// Implements AppConnMempool (subset of abciclient.Client)
type appConnMempool struct {
metrics *Metrics
appConn abciclient.Client
}
func NewAppConnMempool(appConn abciclient.Client, metrics *Metrics) AppConnMempool {
return &appConnMempool{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnMempool) Error() error {
return app.appConn.Error()
}
func (app *appConnMempool) Flush(ctx context.Context) error {
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))()
return app.appConn.Flush(ctx)
}
func (app *appConnMempool) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))()
return app.appConn.CheckTx(ctx, req)
}
//------------------------------------------------
// Implements AppConnQuery (subset of abciclient.Client)
type appConnQuery struct {
metrics *Metrics
appConn abciclient.Client
}
func NewAppConnQuery(appConn abciclient.Client, metrics *Metrics) AppConnQuery {
return &appConnQuery{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnQuery) Error() error {
return app.appConn.Error()
}
func (app *appConnQuery) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))()
return app.appConn.Echo(ctx, msg)
}
func (app *appConnQuery) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))()
return app.appConn.Info(ctx, req)
}
func (app *appConnQuery) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))()
return app.appConn.Query(ctx, reqQuery)
}
//------------------------------------------------
// Implements AppConnSnapshot (subset of abciclient.Client)
type appConnSnapshot struct {
metrics *Metrics
appConn abciclient.Client
}
func NewAppConnSnapshot(appConn abciclient.Client, metrics *Metrics) AppConnSnapshot {
return &appConnSnapshot{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnSnapshot) Error() error {
return app.appConn.Error()
}
func (app *appConnSnapshot) ListSnapshots(
ctx context.Context,
req types.RequestListSnapshots,
) (*types.ResponseListSnapshots, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))()
return app.appConn.ListSnapshots(ctx, req)
}
func (app *appConnSnapshot) OfferSnapshot(
ctx context.Context,
req types.RequestOfferSnapshot,
) (*types.ResponseOfferSnapshot, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))()
return app.appConn.OfferSnapshot(ctx, req)
}
func (app *appConnSnapshot) LoadSnapshotChunk(
ctx context.Context,
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))()
return app.appConn.LoadSnapshotChunk(ctx, req)
}
func (app *appConnSnapshot) ApplySnapshotChunk(
ctx context.Context,
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))()
return app.appConn.ApplySnapshotChunk(ctx, req)
}
// addTimeSample returns a function that, when called, adds an observation to m.
// The observation added to m is the number of seconds ellapsed since addTimeSample
// was initially called. addTimeSample is meant to be called in a defer to calculate
// the amount of time a function takes to complete.
func addTimeSample(m metrics.Histogram) func() {
start := time.Now()
return func() { m.Observe(time.Since(start).Seconds()) }
}

+ 182
- 11
internal/proxy/client.go View File

@ -1,42 +1,213 @@
package proxy package proxy
import ( import (
"context"
"io" "io"
"os"
"syscall"
"time"
"github.com/go-kit/kit/metrics"
abciclient "github.com/tendermint/tendermint/abci/client" abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
e2e "github.com/tendermint/tendermint/test/e2e/app" e2e "github.com/tendermint/tendermint/test/e2e/app"
) )
// DefaultClientCreator returns a default ClientCreator, which will create a
// local client if addr is one of: 'kvstore',
// 'persistent_kvstore', 'e2e', or 'noop', otherwise - a remote client.
// ClientFactory returns a client object, which will create a local
// client if addr is one of: 'kvstore', 'persistent_kvstore', 'e2e',
// or 'noop', otherwise - a remote client.
// //
// The Closer is a noop except for persistent_kvstore applications, // The Closer is a noop except for persistent_kvstore applications,
// which will clean up the store. // which will clean up the store.
func DefaultClientCreator(logger log.Logger, addr, transport, dbDir string) (abciclient.Creator, io.Closer) {
func ClientFactory(logger log.Logger, addr, transport, dbDir string) (abciclient.Client, io.Closer, error) {
switch addr { switch addr {
case "kvstore": case "kvstore":
return abciclient.NewLocalCreator(kvstore.NewApplication()), noopCloser{}
return abciclient.NewLocalClient(logger, kvstore.NewApplication()), noopCloser{}, nil
case "persistent_kvstore": case "persistent_kvstore":
app := kvstore.NewPersistentKVStoreApplication(logger, dbDir) app := kvstore.NewPersistentKVStoreApplication(logger, dbDir)
return abciclient.NewLocalCreator(app), app
return abciclient.NewLocalClient(logger, app), app, nil
case "e2e": case "e2e":
app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir)) app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir))
if err != nil { if err != nil {
panic(err)
return nil, noopCloser{}, err
} }
return abciclient.NewLocalCreator(app), noopCloser{}
return abciclient.NewLocalClient(logger, app), noopCloser{}, nil
case "noop": case "noop":
return abciclient.NewLocalCreator(types.NewBaseApplication()), noopCloser{}
return abciclient.NewLocalClient(logger, types.NewBaseApplication()), noopCloser{}, nil
default: default:
mustConnect := false // loop retrying
return abciclient.NewRemoteCreator(logger, addr, transport, mustConnect), noopCloser{}
const mustConnect = false // loop retrying
client, err := abciclient.NewClient(logger, addr, transport, mustConnect)
if err != nil {
return nil, noopCloser{}, err
}
return client, noopCloser{}, nil
} }
} }
type noopCloser struct{} type noopCloser struct{}
func (noopCloser) Close() error { return nil } func (noopCloser) Close() error { return nil }
// proxyClient provides the application connection.
type proxyClient struct {
service.BaseService
logger log.Logger
client abciclient.Client
metrics *Metrics
}
// New creates a proxy application interface.
func New(client abciclient.Client, logger log.Logger, metrics *Metrics) abciclient.Client {
conn := &proxyClient{
logger: logger,
metrics: metrics,
client: client,
}
conn.BaseService = *service.NewBaseService(logger, "proxyClient", conn)
return conn
}
func (app *proxyClient) OnStop() { tryCallStop(app.client) }
func (app *proxyClient) Error() error { return app.client.Error() }
func tryCallStop(client abciclient.Client) {
if c, ok := client.(interface{ Stop() }); ok {
c.Stop()
}
}
func (app *proxyClient) OnStart(ctx context.Context) error {
var err error
defer func() {
if err != nil {
tryCallStop(app.client)
}
}()
// Kill Tendermint if the ABCI application crashes.
go func() {
if !app.client.IsRunning() {
return
}
app.client.Wait()
if ctx.Err() != nil {
return
}
if err := app.client.Error(); err != nil {
app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint",
"err", err)
if killErr := kill(); killErr != nil {
app.logger.Error("Failed to kill this process - please do so manually",
"err", killErr)
}
}
}()
return app.client.Start(ctx)
}
func kill() error {
p, err := os.FindProcess(os.Getpid())
if err != nil {
return err
}
return p.Signal(syscall.SIGABRT)
}
func (app *proxyClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))()
return app.client.InitChain(ctx, req)
}
func (app *proxyClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))()
return app.client.PrepareProposal(ctx, req)
}
func (app *proxyClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))()
return app.client.ProcessProposal(ctx, req)
}
func (app *proxyClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))()
return app.client.ExtendVote(ctx, req)
}
func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))()
return app.client.VerifyVoteExtension(ctx, req)
}
func (app *proxyClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))()
return app.client.FinalizeBlock(ctx, req)
}
func (app *proxyClient) Commit(ctx context.Context) (*types.ResponseCommit, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))()
return app.client.Commit(ctx)
}
func (app *proxyClient) Flush(ctx context.Context) error {
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))()
return app.client.Flush(ctx)
}
func (app *proxyClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))()
return app.client.CheckTx(ctx, req)
}
func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))()
return app.client.Echo(ctx, msg)
}
func (app *proxyClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))()
return app.client.Info(ctx, req)
}
func (app *proxyClient) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))()
return app.client.Query(ctx, reqQuery)
}
func (app *proxyClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))()
return app.client.ListSnapshots(ctx, req)
}
func (app *proxyClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))()
return app.client.OfferSnapshot(ctx, req)
}
func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))()
return app.client.LoadSnapshotChunk(ctx, req)
}
func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))()
return app.client.ApplySnapshotChunk(ctx, req)
}
// addTimeSample returns a function that, when called, adds an observation to m.
// The observation added to m is the number of seconds ellapsed since addTimeSample
// was initially called. addTimeSample is meant to be called in a defer to calculate
// the amount of time a function takes to complete.
func addTimeSample(m metrics.Histogram) func() {
start := time.Now()
return func() { m.Observe(time.Since(start).Seconds()) }
}

internal/proxy/app_conn_test.go → internal/proxy/client_test.go View File


+ 0
- 131
internal/proxy/multi_app_conn.go View File

@ -1,131 +0,0 @@
package proxy
import (
"context"
"os"
"syscall"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
// AppConns is the Tendermint's interface to the application that consists of
// multiple connections.
type AppConns interface {
service.Service
// Mempool connection
Mempool() AppConnMempool
// Consensus connection
Consensus() AppConnConsensus
// Query connection
Query() AppConnQuery
// Snapshot connection
Snapshot() AppConnSnapshot
}
// NewAppConns calls NewMultiAppConn.
func NewAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns {
return NewMultiAppConn(clientCreator, logger, metrics)
}
// multiAppConn implements AppConns.
//
// A multiAppConn is made of a few appConns and manages their underlying abci
// clients.
// TODO: on app restart, clients must reboot together
type multiAppConn struct {
service.BaseService
logger log.Logger
metrics *Metrics
consensusConn AppConnConsensus
mempoolConn AppConnMempool
queryConn AppConnQuery
snapshotConn AppConnSnapshot
client stoppableClient
clientCreator abciclient.Creator
}
// TODO: this is a totally internal and quasi permanent shim for
// clients. eventually we can have a single client and have some kind
// of reasonable lifecycle witout needing an explicit stop method.
type stoppableClient interface {
abciclient.Client
Stop()
}
// NewMultiAppConn makes all necessary abci connections to the application.
func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns {
multiAppConn := &multiAppConn{
logger: logger,
metrics: metrics,
clientCreator: clientCreator,
}
multiAppConn.BaseService = *service.NewBaseService(logger, "multiAppConn", multiAppConn)
return multiAppConn
}
func (app *multiAppConn) Mempool() AppConnMempool { return app.mempoolConn }
func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn }
func (app *multiAppConn) Query() AppConnQuery { return app.queryConn }
func (app *multiAppConn) Snapshot() AppConnSnapshot { return app.snapshotConn }
func (app *multiAppConn) OnStart(ctx context.Context) error {
var err error
defer func() {
if err != nil {
app.client.Stop()
}
}()
var client abciclient.Client
client, err = app.clientCreator(app.logger)
if err != nil {
return err
}
app.queryConn = NewAppConnQuery(client, app.metrics)
app.snapshotConn = NewAppConnSnapshot(client, app.metrics)
app.mempoolConn = NewAppConnMempool(client, app.metrics)
app.consensusConn = NewAppConnConsensus(client, app.metrics)
app.client = client.(stoppableClient)
// Kill Tendermint if the ABCI application crashes.
go func() {
if !client.IsRunning() {
return
}
app.client.Wait()
if ctx.Err() != nil {
return
}
if err := app.client.Error(); err != nil {
app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint",
"err", err)
if killErr := kill(); killErr != nil {
app.logger.Error("Failed to kill this process - please do so manually",
"err", killErr)
}
}
}()
return client.Start(ctx)
}
func (app *multiAppConn) OnStop() { app.client.Stop() }
func kill() error {
p, err := os.FindProcess(os.Getpid())
if err != nil {
return err
}
return p.Signal(syscall.SIGTERM)
}

+ 0
- 99
internal/proxy/multi_app_conn_test.go View File

@ -1,99 +0,0 @@
package proxy
import (
"context"
"errors"
"os"
"os/signal"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
abciclient "github.com/tendermint/tendermint/abci/client"
abcimocks "github.com/tendermint/tendermint/abci/client/mocks"
"github.com/tendermint/tendermint/libs/log"
)
type noopStoppableClientImpl struct {
abciclient.Client
count int
}
func (c *noopStoppableClientImpl) Stop() { c.count++ }
func TestAppConns_Start_Stop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
clientMock := &abcimocks.Client{}
clientMock.On("Start", mock.Anything).Return(nil)
clientMock.On("Error").Return(nil)
clientMock.On("IsRunning").Return(true)
clientMock.On("Wait").Return(nil).Times(1)
cl := &noopStoppableClientImpl{Client: clientMock}
creatorCallCount := 0
creator := func(logger log.Logger) (abciclient.Client, error) {
creatorCallCount++
return cl, nil
}
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics())
err := appConns.Start(ctx)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
cancel()
appConns.Wait()
clientMock.AssertExpectations(t)
assert.Equal(t, 1, cl.count)
assert.Equal(t, 1, creatorCallCount)
}
// Upon failure, we call tmos.Kill
func TestAppConns_Failure(t *testing.T) {
ok := make(chan struct{})
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM)
go func() {
for range c {
close(ok)
return
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
clientMock := &abcimocks.Client{}
clientMock.On("SetLogger", mock.Anything).Return()
clientMock.On("Start", mock.Anything).Return(nil)
clientMock.On("IsRunning").Return(true)
clientMock.On("Wait").Return(nil)
clientMock.On("Error").Return(errors.New("EOF"))
cl := &noopStoppableClientImpl{Client: clientMock}
creator := func(log.Logger) (abciclient.Client, error) {
return cl, nil
}
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics())
err := appConns.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); appConns.Wait() })
select {
case <-ok:
t.Log("SIGTERM successfully received")
case <-time.After(5 * time.Second):
t.Fatal("expected process to receive SIGTERM signal")
}
}

+ 2
- 2
internal/rpc/core/abci.go View File

@ -18,7 +18,7 @@ func (env *Environment) ABCIQuery(
height int64, height int64,
prove bool, prove bool,
) (*coretypes.ResultABCIQuery, error) { ) (*coretypes.ResultABCIQuery, error) {
resQuery, err := env.ProxyAppQuery.Query(ctx, abci.RequestQuery{
resQuery, err := env.ProxyApp.Query(ctx, abci.RequestQuery{
Path: path, Path: path,
Data: data, Data: data,
Height: height, Height: height,
@ -34,7 +34,7 @@ func (env *Environment) ABCIQuery(
// ABCIInfo gets some info about the application. // ABCIInfo gets some info about the application.
// More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info // More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info
func (env *Environment) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo, error) { func (env *Environment) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo, error) {
resInfo, err := env.ProxyAppQuery.Info(ctx, proxy.RequestInfo)
resInfo, err := env.ProxyApp.Info(ctx, proxy.RequestInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 2
- 3
internal/rpc/core/env.go View File

@ -11,6 +11,7 @@ import (
"github.com/rs/cors" "github.com/rs/cors"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/internal/blocksync" "github.com/tendermint/tendermint/internal/blocksync"
@ -19,7 +20,6 @@ import (
"github.com/tendermint/tendermint/internal/eventlog" "github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy"
tmpubsub "github.com/tendermint/tendermint/internal/pubsub" tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
"github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/internal/pubsub/query"
sm "github.com/tendermint/tendermint/internal/state" sm "github.com/tendermint/tendermint/internal/state"
@ -67,8 +67,7 @@ type peerManager interface {
// to be setup once during startup. // to be setup once during startup.
type Environment struct { type Environment struct {
// external, thread safe interfaces // external, thread safe interfaces
ProxyAppQuery proxy.AppConnQuery
ProxyAppMempool proxy.AppConnMempool
ProxyApp abciclient.Client
// interfaces defined in types and above // interfaces defined in types and above
StateStore sm.Store StateStore sm.Store


+ 1
- 1
internal/rpc/core/mempool.go View File

@ -158,7 +158,7 @@ func (env *Environment) NumUnconfirmedTxs(ctx context.Context) (*coretypes.Resul
// be added to the mempool either. // be added to the mempool either.
// More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx // More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx
func (env *Environment) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultCheckTx, error) { func (env *Environment) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultCheckTx, error) {
res, err := env.ProxyAppMempool.CheckTx(ctx, abci.RequestCheckTx{Tx: tx})
res, err := env.ProxyApp.CheckTx(ctx, abci.RequestCheckTx{Tx: tx})
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 11
- 11
internal/state/execution.go View File

@ -5,11 +5,11 @@ import (
"fmt" "fmt"
"time" "time"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/encoding" "github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/proxy"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state" tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
tmtypes "github.com/tendermint/tendermint/proto/tendermint/types" tmtypes "github.com/tendermint/tendermint/proto/tendermint/types"
@ -30,7 +30,7 @@ type BlockExecutor struct {
blockStore BlockStore blockStore BlockStore
// execute the app against this // execute the app against this
proxyApp proxy.AppConnConsensus
appClient abciclient.Client
// events // events
eventBus types.BlockEventPublisher eventBus types.BlockEventPublisher
@ -60,7 +60,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
func NewBlockExecutor( func NewBlockExecutor(
stateStore Store, stateStore Store,
logger log.Logger, logger log.Logger,
proxyApp proxy.AppConnConsensus,
appClient abciclient.Client,
pool mempool.Mempool, pool mempool.Mempool,
evpool EvidencePool, evpool EvidencePool,
blockStore BlockStore, blockStore BlockStore,
@ -68,7 +68,7 @@ func NewBlockExecutor(
) *BlockExecutor { ) *BlockExecutor {
res := &BlockExecutor{ res := &BlockExecutor{
store: stateStore, store: stateStore,
proxyApp: proxyApp,
appClient: appClient,
eventBus: eventbus.NopEventBus{}, eventBus: eventbus.NopEventBus{},
mempool: pool, mempool: pool,
evpool: evpool, evpool: evpool,
@ -119,7 +119,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas) txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
preparedProposal, err := blockExec.proxyApp.PrepareProposal(
preparedProposal, err := blockExec.appClient.PrepareProposal(
ctx, ctx,
abci.RequestPrepareProposal{ abci.RequestPrepareProposal{
BlockData: txs.ToSliceOfBytes(), BlockData: txs.ToSliceOfBytes(),
@ -166,7 +166,7 @@ func (blockExec *BlockExecutor) ProcessProposal(
ByzantineValidators: block.Evidence.ToABCI(), ByzantineValidators: block.Evidence.ToABCI(),
} }
resp, err := blockExec.proxyApp.ProcessProposal(ctx, req)
resp, err := blockExec.appClient.ProcessProposal(ctx, req)
if err != nil { if err != nil {
return false, ErrInvalidBlock(err) return false, ErrInvalidBlock(err)
} }
@ -214,7 +214,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
} }
startTime := time.Now().UnixNano() startTime := time.Now().UnixNano()
pbh := block.Header.ToProto() pbh := block.Header.ToProto()
finalizeBlockResponse, err := blockExec.proxyApp.FinalizeBlock(
finalizeBlockResponse, err := blockExec.appClient.FinalizeBlock(
ctx, ctx,
abci.RequestFinalizeBlock{ abci.RequestFinalizeBlock{
Hash: block.Hash(), Hash: block.Hash(),
@ -299,7 +299,7 @@ func (blockExec *BlockExecutor) ExtendVote(ctx context.Context, vote *types.Vote
Vote: vote.ToProto(), Vote: vote.ToProto(),
} }
resp, err := blockExec.proxyApp.ExtendVote(ctx, req)
resp, err := blockExec.appClient.ExtendVote(ctx, req)
if err != nil { if err != nil {
return types.VoteExtension{}, err return types.VoteExtension{}, err
} }
@ -311,7 +311,7 @@ func (blockExec *BlockExecutor) VerifyVoteExtension(ctx context.Context, vote *t
Vote: vote.ToProto(), Vote: vote.ToProto(),
} }
resp, err := blockExec.proxyApp.VerifyVoteExtension(ctx, req)
resp, err := blockExec.appClient.VerifyVoteExtension(ctx, req)
if err != nil { if err != nil {
return err return err
} }
@ -347,7 +347,7 @@ func (blockExec *BlockExecutor) Commit(
} }
// Commit block, get hash back // Commit block, get hash back
res, err := blockExec.proxyApp.Commit(ctx)
res, err := blockExec.appClient.Commit(ctx)
if err != nil { if err != nil {
blockExec.logger.Error("client error during proxyAppConn.Commit", "err", err) blockExec.logger.Error("client error during proxyAppConn.Commit", "err", err)
return nil, 0, err return nil, 0, err
@ -580,7 +580,7 @@ func fireEvents(
func ExecCommitBlock( func ExecCommitBlock(
ctx context.Context, ctx context.Context,
be *BlockExecutor, be *BlockExecutor,
appConnConsensus proxy.AppConnConsensus,
appConnConsensus abciclient.Client,
block *types.Block, block *types.Block,
logger log.Logger, logger log.Logger,
store Store, store Store,


+ 21
- 19
internal/state/execution_test.go View File

@ -38,9 +38,9 @@ var (
func TestApplyBlock(t *testing.T) { func TestApplyBlock(t *testing.T) {
app := &testApp{} app := &testApp{}
cc := abciclient.NewLocalCreator(app)
logger := log.TestingLogger() logger := log.TestingLogger()
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -51,7 +51,7 @@ func TestApplyBlock(t *testing.T) {
state, stateDB, _ := makeState(t, 1, 1) state, stateDB, _ := makeState(t, 1, 1)
stateStore := sm.NewStore(stateDB) stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB()) blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(),
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp,
mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore) mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore)
block, err := sf.MakeBlock(state, 1, new(types.Commit)) block, err := sf.MakeBlock(state, 1, new(types.Commit))
@ -74,11 +74,12 @@ func TestFinalizeBlockDecidedLastCommit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
logger := log.TestingLogger()
app := &testApp{} app := &testApp{}
cc := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, app)
appClient := proxy.New(cc, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx)
err := appClient.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
state, stateDB, privVals := makeState(t, 7, 1) state, stateDB, privVals := makeState(t, 7, 1)
@ -102,7 +103,7 @@ func TestFinalizeBlockDecidedLastCommit(t *testing.T) {
evpool.On("Update", ctx, mock.Anything, mock.Anything).Return() evpool.On("Update", ctx, mock.Anything, mock.Anything).Return()
evpool.On("CheckEvidence", ctx, mock.Anything).Return(nil) evpool.On("CheckEvidence", ctx, mock.Anything).Return(nil)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mmock.Mempool{}, evpool, blockStore)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mmock.Mempool{}, evpool, blockStore)
state, _, lastCommit := makeAndCommitGoodBlock(ctx, t, state, 1, new(types.Commit), state.NextValidators.Validators[0].Address, blockExec, privVals, nil) state, _, lastCommit := makeAndCommitGoodBlock(ctx, t, state, 1, new(types.Commit), state.NextValidators.Validators[0].Address, blockExec, privVals, nil)
for idx, isAbsent := range tc.absentCommitSigs { for idx, isAbsent := range tc.absentCommitSigs {
@ -135,8 +136,9 @@ func TestFinalizeBlockByzantineValidators(t *testing.T) {
defer cancel() defer cancel()
app := &testApp{} app := &testApp{}
cc := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
logger := log.TestingLogger()
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx) err := proxyApp.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -212,7 +214,7 @@ func TestFinalizeBlockByzantineValidators(t *testing.T) {
blockStore := store.NewBlockStore(dbm.NewMemDB()) blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp,
mmock.Mempool{}, evpool, blockStore) mmock.Mempool{}, evpool, blockStore)
block, err := sf.MakeBlock(state, 1, new(types.Commit)) block, err := sf.MakeBlock(state, 1, new(types.Commit))
@ -238,9 +240,9 @@ func TestProcessProposal(t *testing.T) {
defer cancel() defer cancel()
app := abcimocks.NewBaseMock() app := abcimocks.NewBaseMock()
cc := abciclient.NewLocalCreator(app)
logger := log.TestingLogger() logger := log.TestingLogger()
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx) err := proxyApp.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -251,7 +253,7 @@ func TestProcessProposal(t *testing.T) {
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
logger, logger,
proxyApp.Consensus(),
proxyApp,
mmock.Mempool{}, mmock.Mempool{},
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore, blockStore,
@ -441,9 +443,9 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) {
defer cancel() defer cancel()
app := &testApp{} app := &testApp{}
cc := abciclient.NewLocalCreator(app)
logger := log.TestingLogger() logger := log.TestingLogger()
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx) err := proxyApp.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -454,7 +456,7 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) {
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
logger, logger,
proxyApp.Consensus(),
proxyApp,
mmock.Mempool{}, mmock.Mempool{},
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore, blockStore,
@ -516,9 +518,9 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
defer cancel() defer cancel()
app := &testApp{} app := &testApp{}
cc := abciclient.NewLocalCreator(app)
logger := log.TestingLogger() logger := log.TestingLogger()
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx) err := proxyApp.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -528,7 +530,7 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
log.TestingLogger(), log.TestingLogger(),
proxyApp.Consensus(),
proxyApp,
mmock.Mempool{}, mmock.Mempool{},
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore, blockStore,


+ 0
- 9
internal/state/helpers_test.go View File

@ -11,16 +11,13 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db" dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/encoding" "github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state" sm "github.com/tendermint/tendermint/internal/state"
sf "github.com/tendermint/tendermint/internal/state/test/factory" sf "github.com/tendermint/tendermint/internal/state/test/factory"
"github.com/tendermint/tendermint/internal/test/factory" "github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand" tmrand "github.com/tendermint/tendermint/libs/rand"
tmtime "github.com/tendermint/tendermint/libs/time" tmtime "github.com/tendermint/tendermint/libs/time"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state" tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
@ -33,12 +30,6 @@ type paramsChangeTestCase struct {
params types.ConsensusParams params types.ConsensusParams
} }
func newTestApp() proxy.AppConns {
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
return proxy.NewAppConns(cc, log.NewNopLogger(), proxy.NopMetrics())
}
func makeAndCommitGoodBlock( func makeAndCommitGoodBlock(
ctx context.Context, ctx context.Context,
t *testing.T, t *testing.T,


+ 13
- 9
internal/state/validation_test.go View File

@ -10,10 +10,12 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db" dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/crypto/tmhash"
memmock "github.com/tendermint/tendermint/internal/mempool/mock" memmock "github.com/tendermint/tendermint/internal/mempool/mock"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state" sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/mocks" "github.com/tendermint/tendermint/internal/state/mocks"
statefactory "github.com/tendermint/tendermint/internal/state/test/factory" statefactory "github.com/tendermint/tendermint/internal/state/test/factory"
@ -30,8 +32,8 @@ const validationTestsStopHeight int64 = 10
func TestValidateBlockHeader(t *testing.T) { func TestValidateBlockHeader(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
proxyApp := newTestApp()
logger := log.TestingLogger()
proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx)) require.NoError(t, proxyApp.Start(ctx))
state, stateDB, privVals := makeState(t, 3, 1) state, stateDB, privVals := makeState(t, 3, 1)
@ -39,8 +41,8 @@ func TestValidateBlockHeader(t *testing.T) {
blockStore := store.NewBlockStore(dbm.NewMemDB()) blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
log.TestingLogger(),
proxyApp.Consensus(),
logger,
proxyApp,
memmock.Mempool{}, memmock.Mempool{},
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore, blockStore,
@ -119,7 +121,8 @@ func TestValidateBlockCommit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
proxyApp := newTestApp()
logger := log.TestingLogger()
proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx)) require.NoError(t, proxyApp.Start(ctx))
state, stateDB, privVals := makeState(t, 1, 1) state, stateDB, privVals := makeState(t, 1, 1)
@ -127,8 +130,8 @@ func TestValidateBlockCommit(t *testing.T) {
blockStore := store.NewBlockStore(dbm.NewMemDB()) blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
log.TestingLogger(),
proxyApp.Consensus(),
logger,
proxyApp,
memmock.Mempool{}, memmock.Mempool{},
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore, blockStore,
@ -245,7 +248,8 @@ func TestValidateBlockEvidence(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
proxyApp := newTestApp()
logger := log.TestingLogger()
proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx)) require.NoError(t, proxyApp.Start(ctx))
state, stateDB, privVals := makeState(t, 4, 1) state, stateDB, privVals := makeState(t, 4, 1)
@ -263,7 +267,7 @@ func TestValidateBlockEvidence(t *testing.T) {
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
log.TestingLogger(), log.TestingLogger(),
proxyApp.Consensus(),
proxyApp,
memmock.Mempool{}, memmock.Mempool{},
evpool, evpool,
blockStore, blockStore,


+ 3
- 7
internal/statesync/reactor.go View File

@ -11,11 +11,11 @@ import (
"sync" "sync"
"time" "time"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state" sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/store" "github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -135,8 +135,7 @@ type Reactor struct {
stateStore sm.Store stateStore sm.Store
blockStore *store.BlockStore blockStore *store.BlockStore
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
conn abciclient.Client
tempDir string tempDir string
snapshotCh *p2p.Channel snapshotCh *p2p.Channel
chunkCh *p2p.Channel chunkCh *p2p.Channel
@ -173,8 +172,7 @@ func NewReactor(
initialHeight int64, initialHeight int64,
cfg config.StateSyncConfig, cfg config.StateSyncConfig,
logger log.Logger, logger log.Logger,
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
conn abciclient.Client,
channelCreator p2p.ChannelCreator, channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates, peerUpdates *p2p.PeerUpdates,
stateStore sm.Store, stateStore sm.Store,
@ -209,7 +207,6 @@ func NewReactor(
initialHeight: initialHeight, initialHeight: initialHeight,
cfg: cfg, cfg: cfg,
conn: conn, conn: conn,
connQuery: connQuery,
snapshotCh: snapshotCh, snapshotCh: snapshotCh,
chunkCh: chunkCh, chunkCh: chunkCh,
blockCh: blockCh, blockCh: blockCh,
@ -287,7 +284,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
r.cfg, r.cfg,
r.logger, r.logger,
r.conn, r.conn,
r.connQuery,
r.stateProvider, r.stateProvider,
r.snapshotCh, r.snapshotCh,
r.chunkCh, r.chunkCh,


+ 34
- 36
internal/statesync/reactor_test.go View File

@ -13,11 +13,11 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db" dbm "github.com/tendermint/tm-db"
clientmocks "github.com/tendermint/tendermint/abci/client/mocks"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy" "github.com/tendermint/tendermint/internal/proxy"
proxymocks "github.com/tendermint/tendermint/internal/proxy/mocks"
smmocks "github.com/tendermint/tendermint/internal/state/mocks" smmocks "github.com/tendermint/tendermint/internal/state/mocks"
"github.com/tendermint/tendermint/internal/statesync/mocks" "github.com/tendermint/tendermint/internal/statesync/mocks"
"github.com/tendermint/tendermint/internal/store" "github.com/tendermint/tendermint/internal/store"
@ -37,8 +37,7 @@ type reactorTestSuite struct {
reactor *Reactor reactor *Reactor
syncer *syncer syncer *syncer
conn *proxymocks.AppConnSnapshot
connQuery *proxymocks.AppConnQuery
conn *clientmocks.Client
stateProvider *mocks.StateProvider stateProvider *mocks.StateProvider
snapshotChannel *p2p.Channel snapshotChannel *p2p.Channel
@ -71,21 +70,14 @@ type reactorTestSuite struct {
func setup( func setup(
ctx context.Context, ctx context.Context,
t *testing.T, t *testing.T,
conn *proxymocks.AppConnSnapshot,
connQuery *proxymocks.AppConnQuery,
conn *clientmocks.Client,
stateProvider *mocks.StateProvider, stateProvider *mocks.StateProvider,
chBuf uint, chBuf uint,
) *reactorTestSuite { ) *reactorTestSuite {
t.Helper() t.Helper()
if conn == nil { if conn == nil {
conn = &proxymocks.AppConnSnapshot{}
}
if connQuery == nil {
connQuery = &proxymocks.AppConnQuery{}
}
if stateProvider == nil {
stateProvider = &mocks.StateProvider{}
conn = &clientmocks.Client{}
} }
rts := &reactorTestSuite{ rts := &reactorTestSuite{
@ -102,7 +94,6 @@ func setup(
paramsOutCh: make(chan p2p.Envelope, chBuf), paramsOutCh: make(chan p2p.Envelope, chBuf),
paramsPeerErrCh: make(chan p2p.PeerError, chBuf), paramsPeerErrCh: make(chan p2p.PeerError, chBuf),
conn: conn, conn: conn,
connQuery: connQuery,
stateProvider: stateProvider, stateProvider: stateProvider,
} }
@ -171,7 +162,6 @@ func setup(
*cfg, *cfg,
logger.With("component", "reactor"), logger.With("component", "reactor"),
conn, conn,
connQuery,
chCreator, chCreator,
rts.peerUpdates, rts.peerUpdates,
rts.stateStore, rts.stateStore,
@ -186,7 +176,6 @@ func setup(
*cfg, *cfg,
logger.With("component", "syncer"), logger.With("component", "syncer"),
conn, conn,
connQuery,
stateProvider, stateProvider,
rts.snapshotChannel, rts.snapshotChannel,
rts.chunkChannel, rts.chunkChannel,
@ -211,7 +200,7 @@ func TestReactor_Sync(t *testing.T) {
defer cancel() defer cancel()
const snapshotHeight = 7 const snapshotHeight = 7
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
chain := buildLightBlockChain(ctx, t, 1, 10, time.Now()) chain := buildLightBlockChain(ctx, t, 1, 10, time.Now())
// app accepts any snapshot // app accepts any snapshot
rts.conn.On("OfferSnapshot", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")). rts.conn.On("OfferSnapshot", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")).
@ -222,7 +211,7 @@ func TestReactor_Sync(t *testing.T) {
Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
// app query returns valid state app hash // app query returns valid state app hash
rts.connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
rts.conn.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
AppVersion: testAppVersion, AppVersion: testAppVersion,
LastBlockHeight: snapshotHeight, LastBlockHeight: snapshotHeight,
LastBlockAppHash: chain[snapshotHeight+1].AppHash, LastBlockAppHash: chain[snapshotHeight+1].AppHash,
@ -237,8 +226,8 @@ func TestReactor_Sync(t *testing.T) {
defer close(closeCh) defer close(closeCh)
go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh,
rts.blockInCh, closeCh, 0) rts.blockInCh, closeCh, 0)
go graduallyAddPeers(t, rts.peerUpdateCh, closeCh, 1*time.Second)
go handleSnapshotRequests(t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{
go graduallyAddPeers(ctx, t, rts.peerUpdateCh, closeCh, 1*time.Second)
go handleSnapshotRequests(ctx, t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{
{ {
Height: uint64(snapshotHeight), Height: uint64(snapshotHeight),
Format: 1, Format: 1,
@ -246,7 +235,7 @@ func TestReactor_Sync(t *testing.T) {
}, },
}) })
go handleChunkRequests(t, rts.chunkOutCh, rts.chunkInCh, closeCh, []byte("abc"))
go handleChunkRequests(ctx, t, rts.chunkOutCh, rts.chunkInCh, closeCh, []byte("abc"))
go handleConsensusParamsRequest(ctx, t, rts.paramsOutCh, rts.paramsInCh, closeCh) go handleConsensusParamsRequest(ctx, t, rts.paramsOutCh, rts.paramsInCh, closeCh)
@ -265,7 +254,7 @@ func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
rts.chunkInCh <- p2p.Envelope{ rts.chunkInCh <- p2p.Envelope{
From: types.NodeID("aa"), From: types.NodeID("aa"),
@ -316,14 +305,14 @@ func TestReactor_ChunkRequest(t *testing.T) {
defer cancel() defer cancel()
// mock ABCI connection to return local snapshots // mock ABCI connection to return local snapshots
conn := &proxymocks.AppConnSnapshot{}
conn := &clientmocks.Client{}
conn.On("LoadSnapshotChunk", mock.Anything, abci.RequestLoadSnapshotChunk{ conn.On("LoadSnapshotChunk", mock.Anything, abci.RequestLoadSnapshotChunk{
Height: tc.request.Height, Height: tc.request.Height,
Format: tc.request.Format, Format: tc.request.Format,
Chunk: tc.request.Index, Chunk: tc.request.Index,
}).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil) }).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil)
rts := setup(ctx, t, conn, nil, nil, 2)
rts := setup(ctx, t, conn, nil, 2)
rts.chunkInCh <- p2p.Envelope{ rts.chunkInCh <- p2p.Envelope{
From: types.NodeID("aa"), From: types.NodeID("aa"),
@ -343,7 +332,7 @@ func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
rts.snapshotInCh <- p2p.Envelope{ rts.snapshotInCh <- p2p.Envelope{
From: types.NodeID("aa"), From: types.NodeID("aa"),
@ -403,12 +392,12 @@ func TestReactor_SnapshotsRequest(t *testing.T) {
defer cancel() defer cancel()
// mock ABCI connection to return local snapshots // mock ABCI connection to return local snapshots
conn := &proxymocks.AppConnSnapshot{}
conn := &clientmocks.Client{}
conn.On("ListSnapshots", mock.Anything, abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{ conn.On("ListSnapshots", mock.Anything, abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{
Snapshots: tc.snapshots, Snapshots: tc.snapshots,
}, nil) }, nil)
rts := setup(ctx, t, conn, nil, nil, 100)
rts := setup(ctx, t, conn, nil, 100)
rts.snapshotInCh <- p2p.Envelope{ rts.snapshotInCh <- p2p.Envelope{
From: types.NodeID("aa"), From: types.NodeID("aa"),
@ -435,7 +424,7 @@ func TestReactor_LightBlockResponse(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
var height int64 = 10 var height int64 = 10
// generates a random header // generates a random header
@ -492,7 +481,7 @@ func TestReactor_BlockProviders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
rts.peerUpdateCh <- p2p.PeerUpdate{ rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID("aa"), NodeID: types.NodeID("aa"),
Status: p2p.PeerStatusUp, Status: p2p.PeerStatusUp,
@ -559,7 +548,7 @@ func TestReactor_StateProviderP2P(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
// make syncer non nil else test won't think we are state syncing // make syncer non nil else test won't think we are state syncing
rts.reactor.syncer = rts.syncer rts.reactor.syncer = rts.syncer
peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength)) peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
@ -636,7 +625,7 @@ func TestReactor_Backfill(t *testing.T) {
defer cancel() defer cancel()
t.Cleanup(leaktest.CheckTimeout(t, 1*time.Minute)) t.Cleanup(leaktest.CheckTimeout(t, 1*time.Minute))
rts := setup(ctx, t, nil, nil, nil, 21)
rts := setup(ctx, t, nil, nil, 21)
var ( var (
startHeight int64 = 20 startHeight int64 = 20
@ -860,6 +849,7 @@ func mockLB(ctx context.Context, t *testing.T, height int64, time time.Time, las
// graduallyAddPeers delivers a new randomly-generated peer update on peerUpdateCh once // graduallyAddPeers delivers a new randomly-generated peer update on peerUpdateCh once
// per interval, until closeCh is closed. Each peer update is assigned a random node ID. // per interval, until closeCh is closed. Each peer update is assigned a random node ID.
func graduallyAddPeers( func graduallyAddPeers(
ctx context.Context,
t *testing.T, t *testing.T,
peerUpdateCh chan p2p.PeerUpdate, peerUpdateCh chan p2p.PeerUpdate,
closeCh chan struct{}, closeCh chan struct{},
@ -868,6 +858,10 @@ func graduallyAddPeers(
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
for { for {
select { select {
case <-ctx.Done():
return
case <-closeCh:
return
case <-ticker.C: case <-ticker.C:
peerUpdateCh <- p2p.PeerUpdate{ peerUpdateCh <- p2p.PeerUpdate{
NodeID: factory.RandomNodeID(t), NodeID: factory.RandomNodeID(t),
@ -879,13 +873,12 @@ func graduallyAddPeers(
ParamsChannel: struct{}{}, ParamsChannel: struct{}{},
}, },
} }
case <-closeCh:
return
} }
} }
} }
func handleSnapshotRequests( func handleSnapshotRequests(
ctx context.Context,
t *testing.T, t *testing.T,
receivingCh chan p2p.Envelope, receivingCh chan p2p.Envelope,
sendingCh chan p2p.Envelope, sendingCh chan p2p.Envelope,
@ -895,6 +888,10 @@ func handleSnapshotRequests(
t.Helper() t.Helper()
for { for {
select { select {
case <-ctx.Done():
return
case <-closeCh:
return
case envelope := <-receivingCh: case envelope := <-receivingCh:
_, ok := envelope.Message.(*ssproto.SnapshotsRequest) _, ok := envelope.Message.(*ssproto.SnapshotsRequest)
require.True(t, ok) require.True(t, ok)
@ -910,13 +907,12 @@ func handleSnapshotRequests(
}, },
} }
} }
case <-closeCh:
return
} }
} }
} }
func handleChunkRequests( func handleChunkRequests(
ctx context.Context,
t *testing.T, t *testing.T,
receivingCh chan p2p.Envelope, receivingCh chan p2p.Envelope,
sendingCh chan p2p.Envelope, sendingCh chan p2p.Envelope,
@ -926,6 +922,10 @@ func handleChunkRequests(
t.Helper() t.Helper()
for { for {
select { select {
case <-ctx.Done():
return
case <-closeCh:
return
case envelope := <-receivingCh: case envelope := <-receivingCh:
msg, ok := envelope.Message.(*ssproto.ChunkRequest) msg, ok := envelope.Message.(*ssproto.ChunkRequest)
require.True(t, ok) require.True(t, ok)
@ -940,8 +940,6 @@ func handleChunkRequests(
}, },
} }
case <-closeCh:
return
} }
} }
} }

+ 4
- 6
internal/statesync/syncer.go View File

@ -8,6 +8,7 @@ import (
"sync" "sync"
"time" "time"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
@ -54,8 +55,7 @@ var (
type syncer struct { type syncer struct {
logger log.Logger logger log.Logger
stateProvider StateProvider stateProvider StateProvider
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
conn abciclient.Client
snapshots *snapshotPool snapshots *snapshotPool
snapshotCh *p2p.Channel snapshotCh *p2p.Channel
chunkCh *p2p.Channel chunkCh *p2p.Channel
@ -76,8 +76,7 @@ type syncer struct {
func newSyncer( func newSyncer(
cfg config.StateSyncConfig, cfg config.StateSyncConfig,
logger log.Logger, logger log.Logger,
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
conn abciclient.Client,
stateProvider StateProvider, stateProvider StateProvider,
snapshotCh *p2p.Channel, snapshotCh *p2p.Channel,
chunkCh *p2p.Channel, chunkCh *p2p.Channel,
@ -88,7 +87,6 @@ func newSyncer(
logger: logger, logger: logger,
stateProvider: stateProvider, stateProvider: stateProvider,
conn: conn, conn: conn,
connQuery: connQuery,
snapshots: newSnapshotPool(), snapshots: newSnapshotPool(),
snapshotCh: snapshotCh, snapshotCh: snapshotCh,
chunkCh: chunkCh, chunkCh: chunkCh,
@ -547,7 +545,7 @@ func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunk uin
// verifyApp verifies the sync, checking the app hash, last block height and app version // verifyApp verifies the sync, checking the app hash, last block height and app version
func (s *syncer) verifyApp(ctx context.Context, snapshot *snapshot, appVersion uint64) error { func (s *syncer) verifyApp(ctx context.Context, snapshot *snapshot, appVersion uint64) error {
resp, err := s.connQuery.Info(ctx, proxy.RequestInfo)
resp, err := s.conn.Info(ctx, proxy.RequestInfo)
if err != nil { if err != nil {
return fmt.Errorf("failed to query ABCI app for appHash: %w", err) return fmt.Errorf("failed to query ABCI app for appHash: %w", err)
} }


+ 23
- 25
internal/statesync/syncer_test.go View File

@ -11,9 +11,9 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
clientmocks "github.com/tendermint/tendermint/abci/client/mocks"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/proxy" "github.com/tendermint/tendermint/internal/proxy"
proxymocks "github.com/tendermint/tendermint/internal/proxy/mocks"
sm "github.com/tendermint/tendermint/internal/state" sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/statesync/mocks" "github.com/tendermint/tendermint/internal/statesync/mocks"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
@ -62,13 +62,12 @@ func TestSyncer_SyncAny(t *testing.T) {
stateProvider.On("AppHash", mock.Anything, uint64(2)).Return([]byte("app_hash_2"), nil) stateProvider.On("AppHash", mock.Anything, uint64(2)).Return([]byte("app_hash_2"), nil)
stateProvider.On("Commit", mock.Anything, uint64(1)).Return(commit, nil) stateProvider.On("Commit", mock.Anything, uint64(1)).Return(commit, nil)
stateProvider.On("State", mock.Anything, uint64(1)).Return(state, nil) stateProvider.On("State", mock.Anything, uint64(1)).Return(state, nil)
connSnapshot := &proxymocks.AppConnSnapshot{}
connQuery := &proxymocks.AppConnQuery{}
conn := &clientmocks.Client{}
peerAID := types.NodeID("aa") peerAID := types.NodeID("aa")
peerBID := types.NodeID("bb") peerBID := types.NodeID("bb")
peerCID := types.NodeID("cc") peerCID := types.NodeID("cc")
rts := setup(ctx, t, connSnapshot, connQuery, stateProvider, 4)
rts := setup(ctx, t, conn, stateProvider, 4)
rts.reactor.syncer = rts.syncer rts.reactor.syncer = rts.syncer
@ -110,7 +109,7 @@ func TestSyncer_SyncAny(t *testing.T) {
// We start a sync, with peers sending back chunks when requested. We first reject the snapshot // We start a sync, with peers sending back chunks when requested. We first reject the snapshot
// with height 2 format 2, and accept the snapshot at height 1. // with height 2 format 2, and accept the snapshot at height 1.
connSnapshot.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
Snapshot: &abci.Snapshot{ Snapshot: &abci.Snapshot{
Height: 2, Height: 2,
Format: 2, Format: 2,
@ -119,7 +118,7 @@ func TestSyncer_SyncAny(t *testing.T) {
}, },
AppHash: []byte("app_hash_2"), AppHash: []byte("app_hash_2"),
}).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil) }).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
connSnapshot.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
Snapshot: &abci.Snapshot{ Snapshot: &abci.Snapshot{
Height: s.Height, Height: s.Height,
Format: s.Format, Format: s.Format,
@ -171,7 +170,7 @@ func TestSyncer_SyncAny(t *testing.T) {
// The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1, // The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
// which should cause it to keep the existing chunk 0 and 2, and restart restoration from // which should cause it to keep the existing chunk 0 and 2, and restart restoration from
// beginning. We also wait for a little while, to exercise the retry logic in fetchChunks(). // beginning. We also wait for a little while, to exercise the retry logic in fetchChunks().
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
Index: 2, Chunk: []byte{1, 1, 2}, Index: 2, Chunk: []byte{1, 1, 2},
}).Once().Run(func(args mock.Arguments) { time.Sleep(1 * time.Second) }).Return( }).Once().Run(func(args mock.Arguments) { time.Sleep(1 * time.Second) }).Return(
&abci.ResponseApplySnapshotChunk{ &abci.ResponseApplySnapshotChunk{
@ -179,16 +178,16 @@ func TestSyncer_SyncAny(t *testing.T) {
RefetchChunks: []uint32{1}, RefetchChunks: []uint32{1},
}, nil) }, nil)
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
Index: 0, Chunk: []byte{1, 1, 0}, Index: 0, Chunk: []byte{1, 1, 0},
}).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
Index: 1, Chunk: []byte{1, 1, 1}, Index: 1, Chunk: []byte{1, 1, 1},
}).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
Index: 2, Chunk: []byte{1, 1, 2}, Index: 2, Chunk: []byte{1, 1, 2},
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
conn.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
AppVersion: testAppVersion, AppVersion: testAppVersion,
LastBlockHeight: 1, LastBlockHeight: 1,
LastBlockAppHash: []byte("app_hash"), LastBlockAppHash: []byte("app_hash"),
@ -217,8 +216,7 @@ func TestSyncer_SyncAny(t *testing.T) {
require.Equal(t, int64(len(rts.syncer.snapshots.snapshots)), rts.reactor.TotalSnapshots()) require.Equal(t, int64(len(rts.syncer.snapshots.snapshots)), rts.reactor.TotalSnapshots())
require.Equal(t, int64(0), rts.reactor.SnapshotChunksCount()) require.Equal(t, int64(0), rts.reactor.SnapshotChunksCount())
connSnapshot.AssertExpectations(t)
connQuery.AssertExpectations(t)
conn.AssertExpectations(t)
} }
func TestSyncer_SyncAny_noSnapshots(t *testing.T) { func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
@ -228,7 +226,7 @@ func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
_, _, err := rts.syncer.SyncAny(ctx, 0, func() error { return nil }) _, _, err := rts.syncer.SyncAny(ctx, 0, func() error { return nil })
require.Equal(t, errNoSnapshots, err) require.Equal(t, errNoSnapshots, err)
@ -241,7 +239,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
peerID := types.NodeID("aa") peerID := types.NodeID("aa")
@ -265,7 +263,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
// s22 is tried first, then s12, then s11, then errNoSnapshots // s22 is tried first, then s12, then s11, then errNoSnapshots
s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
@ -307,7 +305,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
// s22 is tried first, which reject s22 and s12, then s11 will abort. // s22 is tried first, which reject s22 and s12, then s11 will abort.
s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
@ -345,7 +343,7 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
peerAID := types.NodeID("aa") peerAID := types.NodeID("aa")
peerBID := types.NodeID("bb") peerBID := types.NodeID("bb")
@ -394,7 +392,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
errBoom := errors.New("boom") errBoom := errors.New("boom")
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
@ -444,7 +442,7 @@ func TestSyncer_offerSnapshot(t *testing.T) {
stateProvider := &mocks.StateProvider{} stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")} s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
rts.conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{ rts.conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
@ -497,7 +495,7 @@ func TestSyncer_applyChunks_Results(t *testing.T) {
stateProvider := &mocks.StateProvider{} stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
body := []byte{1, 2, 3} body := []byte{1, 2, 3}
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, t.TempDir()) chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, t.TempDir())
@ -557,7 +555,7 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
stateProvider := &mocks.StateProvider{} stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, t.TempDir()) chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, t.TempDir())
require.NoError(t, err) require.NoError(t, err)
@ -628,7 +626,7 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
stateProvider := &mocks.StateProvider{} stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
// Set up three peers across two snapshots, and ask for one of them to be banned. // Set up three peers across two snapshots, and ask for one of them to be banned.
// It should be banned from all snapshots. // It should be banned from all snapshots.
@ -761,9 +759,9 @@ func TestSyncer_verifyApp(t *testing.T) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
rts.connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(tc.response, tc.err)
rts.conn.On("Info", mock.Anything, proxy.RequestInfo).Return(tc.response, tc.err)
err := rts.syncer.verifyApp(ctx, s, appVersion) err := rts.syncer.verifyApp(ctx, s, appVersion)
unwrapped := errors.Unwrap(err) unwrapped := errors.Unwrap(err)
if unwrapped != nil { if unwrapped != nil {


+ 16
- 15
node/node.go View File

@ -100,7 +100,10 @@ func newDefaultNode(
return nil, err return nil, err
} }
appClient, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
appClient, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
if err != nil {
return nil, err
}
return makeNode( return makeNode(
ctx, ctx,
@ -120,7 +123,7 @@ func makeNode(
cfg *config.Config, cfg *config.Config,
filePrivval *privval.FilePV, filePrivval *privval.FilePV,
nodeKey types.NodeKey, nodeKey types.NodeKey,
clientCreator abciclient.Creator,
client abciclient.Client,
genesisDocProvider genesisDocProvider, genesisDocProvider genesisDocProvider,
dbProvider config.DBProvider, dbProvider config.DBProvider,
logger log.Logger, logger log.Logger,
@ -155,7 +158,7 @@ func makeNode(
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), nodeMetrics.proxy)
proxyApp := proxy.New(client, logger.With("module", "proxy"), nodeMetrics.proxy)
if err := proxyApp.Start(ctx); err != nil { if err := proxyApp.Start(ctx); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %w", err) return nil, fmt.Errorf("error starting proxy app connections: %w", err)
} }
@ -281,7 +284,7 @@ func makeNode(
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
logger.With("module", "state"), logger.With("module", "state"),
proxyApp.Consensus(),
proxyApp,
mp, mp,
evPool, evPool,
blockStore, blockStore,
@ -339,8 +342,7 @@ func makeNode(
genDoc.InitialHeight, genDoc.InitialHeight,
*cfg.StateSync, *cfg.StateSync,
logger.With("module", "statesync"), logger.With("module", "statesync"),
proxyApp.Snapshot(),
proxyApp.Query(),
proxyApp,
router.OpenChannel, router.OpenChannel,
peerManager.Subscribe(ctx), peerManager.Subscribe(ctx),
stateStore, stateStore,
@ -390,14 +392,13 @@ func makeNode(
shutdownOps: makeCloser(closers), shutdownOps: makeCloser(closers),
rpcEnv: &rpccore.Environment{ rpcEnv: &rpccore.Environment{
ProxyAppQuery: proxyApp.Query(),
ProxyAppMempool: proxyApp.Mempool(),
StateStore: stateStore,
BlockStore: blockStore,
ProxyApp: proxyApp,
EvidencePool: evPool, EvidencePool: evPool,
ConsensusState: csState, ConsensusState: csState,
StateStore: stateStore,
BlockStore: blockStore,
ConsensusReactor: csReactor, ConsensusReactor: csReactor,
BlockSyncReactor: bcReactor, BlockSyncReactor: bcReactor,
@ -752,14 +753,14 @@ func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.Gene
return state, nil return state, nil
} }
func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOptions {
func getRouterConfig(conf *config.Config, appClient abciclient.Client) p2p.RouterOptions {
opts := p2p.RouterOptions{ opts := p2p.RouterOptions{
QueueType: conf.P2P.QueueType, QueueType: conf.P2P.QueueType,
} }
if conf.FilterPeers && proxyApp != nil {
if conf.FilterPeers && appClient != nil {
opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error { opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error {
res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{
res, err := appClient.Query(ctx, abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/id/%s", id), Path: fmt.Sprintf("/p2p/filter/id/%s", id),
}) })
if err != nil { if err != nil {
@ -773,7 +774,7 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt
} }
opts.FilterPeerByIP = func(ctx context.Context, ip net.IP, port uint16) error { opts.FilterPeerByIP = func(ctx context.Context, ip net.IP, port uint16) error {
res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{
res, err := appClient.Query(ctx, abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/addr/%s", net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))), Path: fmt.Sprintf("/p2p/filter/addr/%s", net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))),
}) })
if err != nil { if err != nil {


+ 12
- 12
node/node_test.go View File

@ -273,8 +273,8 @@ func TestCreateProposalBlock(t *testing.T) {
logger := log.NewNopLogger() logger := log.NewNopLogger()
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, kvstore.NewApplication())
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err = proxyApp.Start(ctx) err = proxyApp.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -291,7 +291,7 @@ func TestCreateProposalBlock(t *testing.T) {
mp := mempool.NewTxMempool( mp := mempool.NewTxMempool(
logger.With("module", "mempool"), logger.With("module", "mempool"),
cfg.Mempool, cfg.Mempool,
proxyApp.Mempool(),
proxyApp,
) )
// Make EvidencePool // Make EvidencePool
@ -327,7 +327,7 @@ func TestCreateProposalBlock(t *testing.T) {
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
logger, logger,
proxyApp.Consensus(),
proxyApp,
mp, mp,
evidencePool, evidencePool,
blockStore, blockStore,
@ -371,8 +371,8 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
logger := log.NewNopLogger() logger := log.NewNopLogger()
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, kvstore.NewApplication())
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err = proxyApp.Start(ctx) err = proxyApp.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -390,7 +390,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
mp := mempool.NewTxMempool( mp := mempool.NewTxMempool(
logger.With("module", "mempool"), logger.With("module", "mempool"),
cfg.Mempool, cfg.Mempool,
proxyApp.Mempool(),
proxyApp,
) )
// fill the mempool with one txs just below the maximum size // fill the mempool with one txs just below the maximum size
@ -402,7 +402,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
logger, logger,
proxyApp.Consensus(),
proxyApp,
mp, mp,
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore, blockStore,
@ -438,8 +438,8 @@ func TestMaxProposalBlockSize(t *testing.T) {
logger := log.NewNopLogger() logger := log.NewNopLogger()
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, kvstore.NewApplication())
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err = proxyApp.Start(ctx) err = proxyApp.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -454,7 +454,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
mp := mempool.NewTxMempool( mp := mempool.NewTxMempool(
logger.With("module", "mempool"), logger.With("module", "mempool"),
cfg.Mempool, cfg.Mempool,
proxyApp.Mempool(),
proxyApp,
) )
// fill the mempool with one txs just below the maximum size // fill the mempool with one txs just below the maximum size
@ -473,7 +473,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
stateStore, stateStore,
logger, logger,
proxyApp.Consensus(),
proxyApp,
mp, mp,
sm.EmptyEvidencePool{}, sm.EmptyEvidencePool{},
blockStore, blockStore,


+ 1
- 1
node/public.go View File

@ -35,7 +35,7 @@ func New(
ctx context.Context, ctx context.Context,
conf *config.Config, conf *config.Config,
logger log.Logger, logger log.Logger,
cf abciclient.Creator,
cf abciclient.Client,
gen *types.GenesisDoc, gen *types.GenesisDoc,
) (service.Service, error) { ) (service.Service, error) {
nodeKey, err := types.LoadOrGenNodeKey(conf.NodeKeyFile()) nodeKey, err := types.LoadOrGenNodeKey(conf.NodeKeyFile())


+ 5
- 5
node/setup.go View File

@ -10,6 +10,7 @@ import (
dbm "github.com/tendermint/tm-db" dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/internal/blocksync" "github.com/tendermint/tendermint/internal/blocksync"
@ -20,7 +21,6 @@ import (
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/internal/p2p/pex" "github.com/tendermint/tendermint/internal/p2p/pex"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state" sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/state/indexer/sink" "github.com/tendermint/tendermint/internal/state/indexer/sink"
@ -171,7 +171,7 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
func createMempoolReactor( func createMempoolReactor(
ctx context.Context, ctx context.Context,
cfg *config.Config, cfg *config.Config,
proxyApp proxy.AppConns,
appClient abciclient.Client,
store sm.Store, store sm.Store,
memplMetrics *mempool.Metrics, memplMetrics *mempool.Metrics,
peerManager *p2p.PeerManager, peerManager *p2p.PeerManager,
@ -183,7 +183,7 @@ func createMempoolReactor(
mp := mempool.NewTxMempool( mp := mempool.NewTxMempool(
logger, logger,
cfg.Mempool, cfg.Mempool,
proxyApp.Mempool(),
appClient,
mempool.WithMetrics(memplMetrics), mempool.WithMetrics(memplMetrics),
mempool.WithPreCheck(sm.TxPreCheckFromStore(store)), mempool.WithPreCheck(sm.TxPreCheckFromStore(store)),
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)), mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
@ -387,7 +387,7 @@ func createRouter(
nodeKey types.NodeKey, nodeKey types.NodeKey,
peerManager *p2p.PeerManager, peerManager *p2p.PeerManager,
cfg *config.Config, cfg *config.Config,
proxyApp proxy.AppConns,
appClient abciclient.Client,
) (*p2p.Router, error) { ) (*p2p.Router, error) {
p2pLogger := logger.With("module", "p2p") p2pLogger := logger.With("module", "p2p")
@ -418,7 +418,7 @@ func createRouter(
peerManager, peerManager,
[]p2p.Transport{transport}, []p2p.Transport{transport},
[]p2p.Endpoint{ep}, []p2p.Endpoint{ep},
getRouterConfig(cfg, proxyApp),
getRouterConfig(cfg, appClient),
) )
} }


+ 1
- 1
rpc/test/helpers.go View File

@ -98,7 +98,7 @@ func StartTendermint(
} }
} }
papp := abciclient.NewLocalCreator(app)
papp := abciclient.NewLocalClient(logger, app)
tmNode, err := node.New(ctx, conf, logger, papp, nil) tmNode, err := node.New(ctx, conf, logger, papp, nil)
if err != nil { if err != nil {
return nil, func(_ context.Context) error { cancel(); return nil }, err return nil, func(_ context.Context) error { cancel(); return nil }, err


+ 1
- 1
test/e2e/node/main.go View File

@ -136,7 +136,7 @@ func startNode(ctx context.Context, cfg *Config) error {
ctx, ctx,
tmcfg, tmcfg,
nodeLogger, nodeLogger,
abciclient.NewLocalCreator(app),
abciclient.NewLocalClient(nodeLogger, app),
nil, nil,
) )
if err != nil { if err != nil {


+ 4
- 9
test/fuzz/mempool/checktx.go View File

@ -15,9 +15,9 @@ var getMp func() mempool.Mempool
func init() { func init() {
app := kvstore.NewApplication() app := kvstore.NewApplication()
cc := abciclient.NewLocalCreator(app)
appConnMem, _ := cc(log.NewNopLogger())
err := appConnMem.Start(context.TODO())
logger := log.NewNopLogger()
conn := abciclient.NewLocalClient(logger, app)
err := conn.Start(context.TODO())
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -27,12 +27,7 @@ func init() {
getMp = func() mempool.Mempool { getMp = func() mempool.Mempool {
if mp == nil { if mp == nil {
mp = mempool.NewTxMempool(
log.NewNopLogger(),
cfg,
appConnMem,
)
mp = mempool.NewTxMempool(logger, cfg, conn)
} }
return mp return mp
} }


Loading…
Cancel
Save