Browse Source

corrections

pull/8067/head
tycho garen 3 years ago
parent
commit
be7b2ec2bd
7 changed files with 43 additions and 44 deletions
  1. +15
    -15
      internal/consensus/replay.go
  2. +0
    -1
      internal/consensus/replay_file.go
  3. +8
    -8
      internal/consensus/replay_test.go
  4. +3
    -3
      internal/proxy/client.go
  5. +9
    -9
      internal/state/execution.go
  6. +4
    -4
      node/node.go
  7. +4
    -4
      node/setup.go

+ 15
- 15
internal/consensus/replay.go View File

@ -238,10 +238,10 @@ func (h *Handshaker) NBlocks() int {
} }
// TODO: retry the handshake/replay if it fails ? // TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(ctx context.Context, proxyApp abciclient.Client) error {
func (h *Handshaker) Handshake(ctx context.Context, appClient abciclient.Client) error {
// Handshake is done via ABCI Info on the query conn. // Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Info(ctx, proxy.RequestInfo)
res, err := appClient.Info(ctx, proxy.RequestInfo)
if err != nil { if err != nil {
return fmt.Errorf("error calling Info: %w", err) return fmt.Errorf("error calling Info: %w", err)
} }
@ -265,7 +265,7 @@ func (h *Handshaker) Handshake(ctx context.Context, proxyApp abciclient.Client)
} }
// Replay blocks up to the latest in the blockstore. // Replay blocks up to the latest in the blockstore.
_, err = h.ReplayBlocks(ctx, h.initialState, appHash, blockHeight, proxyApp)
_, err = h.ReplayBlocks(ctx, h.initialState, appHash, blockHeight, appClient)
if err != nil { if err != nil {
return fmt.Errorf("error on replay: %w", err) return fmt.Errorf("error on replay: %w", err)
} }
@ -286,7 +286,7 @@ func (h *Handshaker) ReplayBlocks(
state sm.State, state sm.State,
appHash []byte, appHash []byte,
appBlockHeight int64, appBlockHeight int64,
proxyApp abciclient.Client,
appClient abciclient.Client,
) ([]byte, error) { ) ([]byte, error) {
storeBlockBase := h.store.Base() storeBlockBase := h.store.Base()
storeBlockHeight := h.store.Height() storeBlockHeight := h.store.Height()
@ -317,7 +317,7 @@ func (h *Handshaker) ReplayBlocks(
Validators: nextVals, Validators: nextVals,
AppStateBytes: h.genDoc.AppState, AppStateBytes: h.genDoc.AppState,
} }
res, err := proxyApp.InitChain(ctx, req)
res, err := appClient.InitChain(ctx, req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -391,7 +391,7 @@ func (h *Handshaker) ReplayBlocks(
// Either the app is asking for replay, or we're all synced up. // Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight { if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false)
return h.replayBlocks(ctx, state, appClient, appBlockHeight, storeBlockHeight, false)
} else if appBlockHeight == storeBlockHeight { } else if appBlockHeight == storeBlockHeight {
// We're good! // We're good!
@ -406,7 +406,7 @@ func (h *Handshaker) ReplayBlocks(
case appBlockHeight < stateBlockHeight: case appBlockHeight < stateBlockHeight:
// the app is further behind than it should be, so replay blocks // the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL // but leave the last block to go through the WAL
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true)
return h.replayBlocks(ctx, state, appClient, appBlockHeight, storeBlockHeight, true)
case appBlockHeight == stateBlockHeight: case appBlockHeight == stateBlockHeight:
// We haven't run Commit (both the state and app are one block behind), // We haven't run Commit (both the state and app are one block behind),
@ -414,7 +414,7 @@ func (h *Handshaker) ReplayBlocks(
// NOTE: We could instead use the cs.WAL on cs.Start, // NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT // but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
h.logger.Info("Replay last block using real app") h.logger.Info("Replay last block using real app")
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp)
state, err = h.replayBlock(ctx, state, storeBlockHeight, appClient)
return state.AppHash, err return state.AppHash, err
case appBlockHeight == storeBlockHeight: case appBlockHeight == storeBlockHeight:
@ -449,7 +449,7 @@ func (h *Handshaker) ReplayBlocks(
func (h *Handshaker) replayBlocks( func (h *Handshaker) replayBlocks(
ctx context.Context, ctx context.Context,
state sm.State, state sm.State,
proxyApp abciclient.Client,
appClient abciclient.Client,
appBlockHeight, appBlockHeight,
storeBlockHeight int64, storeBlockHeight int64,
mutateState bool) ([]byte, error) { mutateState bool) ([]byte, error) {
@ -485,16 +485,16 @@ func (h *Handshaker) replayBlocks(
// We emit events for the index services at the final block due to the sync issue when // We emit events for the index services at the final block due to the sync issue when
// the node shutdown during the block committing status. // the node shutdown during the block committing status.
blockExec := sm.NewBlockExecutor( blockExec := sm.NewBlockExecutor(
h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus) blockExec.SetEventBus(h.eventBus)
appHash, err = sm.ExecCommitBlock(ctx, appHash, err = sm.ExecCommitBlock(ctx,
blockExec, proxyApp, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
blockExec, appClient, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} else { } else {
appHash, err = sm.ExecCommitBlock(ctx, appHash, err = sm.ExecCommitBlock(ctx,
nil, proxyApp, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
nil, appClient, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -505,7 +505,7 @@ func (h *Handshaker) replayBlocks(
if mutateState { if mutateState {
// sync the final block // sync the final block
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp)
state, err = h.replayBlock(ctx, state, storeBlockHeight, appClient)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -521,14 +521,14 @@ func (h *Handshaker) replayBlock(
ctx context.Context, ctx context.Context,
state sm.State, state sm.State,
height int64, height int64,
proxyApp abciclient.Client,
appClient abciclient.Client,
) (sm.State, error) { ) (sm.State, error) {
block := h.store.LoadBlock(height) block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height) meta := h.store.LoadBlockMeta(height)
// Use stubs for both mempool and evidence pool since no transactions nor // Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists. // evidence are needed here - block already exists.
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, appClient, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus) blockExec.SetEventBus(h.eventBus)
var err error var err error


+ 0
- 1
internal/consensus/replay_file.go View File

@ -326,7 +326,6 @@ func newConsensusStateForReplay(
return nil, err return nil, err
} }
// Create proxyAppConn connection (consensus, mempool, query)
client, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) client, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
if err != nil { if err != nil {
return nil, err return nil, err


+ 8
- 8
internal/consensus/replay_test.go View File

@ -876,11 +876,11 @@ func applyBlock(
evpool sm.EvidencePool, evpool sm.EvidencePool,
st sm.State, st sm.State,
blk *types.Block, blk *types.Block,
proxyApp abciclient.Client,
appClient abciclient.Client,
blockStore *mockBlockStore, blockStore *mockBlockStore,
) sm.State { ) sm.State {
testPartSize := types.BlockPartSizeBytes testPartSize := types.BlockPartSizeBytes
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mempool, evpool, blockStore)
bps, err := blk.MakePartSet(testPartSize) bps, err := blk.MakePartSet(testPartSize)
require.NoError(t, err) require.NoError(t, err)
@ -893,7 +893,7 @@ func applyBlock(
func buildAppStateFromChain( func buildAppStateFromChain(
ctx context.Context, ctx context.Context,
t *testing.T, t *testing.T,
proxyApp abciclient.Client,
appClient abciclient.Client,
stateStore sm.Store, stateStore sm.Store,
mempool mempool.Mempool, mempool mempool.Mempool,
evpool sm.EvidencePool, evpool sm.EvidencePool,
@ -905,11 +905,11 @@ func buildAppStateFromChain(
) { ) {
t.Helper() t.Helper()
// start a new app without handshake, play nBlocks blocks // start a new app without handshake, play nBlocks blocks
require.NoError(t, proxyApp.Start(ctx))
require.NoError(t, appClient.Start(ctx))
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
validators := types.TM2PB.ValidatorUpdates(state.Validators) validators := types.TM2PB.ValidatorUpdates(state.Validators)
_, err := proxyApp.InitChain(ctx, abci.RequestInitChain{
_, err := appClient.InitChain(ctx, abci.RequestInitChain{
Validators: validators, Validators: validators,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -920,18 +920,18 @@ func buildAppStateFromChain(
case 0: case 0:
for i := 0; i < nBlocks; i++ { for i := 0; i < nBlocks; i++ {
block := chain[i] block := chain[i]
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore)
} }
case 1, 2, 3: case 1, 2, 3:
for i := 0; i < nBlocks-1; i++ { for i := 0; i < nBlocks-1; i++ {
block := chain[i] block := chain[i]
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, proxyApp, blockStore)
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, block, appClient, blockStore)
} }
if mode == 2 || mode == 3 { if mode == 2 || mode == 3 {
// update the kvstore height and apphash // update the kvstore height and apphash
// as if we ran commit but not // as if we ran commit but not
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], proxyApp, blockStore)
state = applyBlock(ctx, t, stateStore, mempool, evpool, state, chain[nBlocks-1], appClient, blockStore)
} }
default: default:
require.Fail(t, "unknown mode %v", mode) require.Fail(t, "unknown mode %v", mode)


+ 3
- 3
internal/proxy/client.go View File

@ -16,9 +16,9 @@ import (
e2e "github.com/tendermint/tendermint/test/e2e/app" e2e "github.com/tendermint/tendermint/test/e2e/app"
) )
// ClientFactory returns a default ClientCreator, which will create a
// local client if addr is one of: 'kvstore',
// 'persistent_kvstore', 'e2e', or 'noop', otherwise - a remote client.
// ClientFactory returns a client object, which will create a local
// client if addr is one of: 'kvstore', 'persistent_kvstore', 'e2e',
// or 'noop', otherwise - a remote client.
// //
// The Closer is a noop except for persistent_kvstore applications, // The Closer is a noop except for persistent_kvstore applications,
// which will clean up the store. // which will clean up the store.


+ 9
- 9
internal/state/execution.go View File

@ -30,7 +30,7 @@ type BlockExecutor struct {
blockStore BlockStore blockStore BlockStore
// execute the app against this // execute the app against this
proxyApp abciclient.Client
appClient abciclient.Client
// events // events
eventBus types.BlockEventPublisher eventBus types.BlockEventPublisher
@ -60,7 +60,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
func NewBlockExecutor( func NewBlockExecutor(
stateStore Store, stateStore Store,
logger log.Logger, logger log.Logger,
proxyApp abciclient.Client,
appClient abciclient.Client,
pool mempool.Mempool, pool mempool.Mempool,
evpool EvidencePool, evpool EvidencePool,
blockStore BlockStore, blockStore BlockStore,
@ -68,7 +68,7 @@ func NewBlockExecutor(
) *BlockExecutor { ) *BlockExecutor {
res := &BlockExecutor{ res := &BlockExecutor{
store: stateStore, store: stateStore,
proxyApp: proxyApp,
appClient: appClient,
eventBus: eventbus.NopEventBus{}, eventBus: eventbus.NopEventBus{},
mempool: pool, mempool: pool,
evpool: evpool, evpool: evpool,
@ -119,7 +119,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas) txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
preparedProposal, err := blockExec.proxyApp.PrepareProposal(
preparedProposal, err := blockExec.appClient.PrepareProposal(
ctx, ctx,
abci.RequestPrepareProposal{ abci.RequestPrepareProposal{
BlockData: txs.ToSliceOfBytes(), BlockData: txs.ToSliceOfBytes(),
@ -166,7 +166,7 @@ func (blockExec *BlockExecutor) ProcessProposal(
ByzantineValidators: block.Evidence.ToABCI(), ByzantineValidators: block.Evidence.ToABCI(),
} }
resp, err := blockExec.proxyApp.ProcessProposal(ctx, req)
resp, err := blockExec.appClient.ProcessProposal(ctx, req)
if err != nil { if err != nil {
return false, ErrInvalidBlock(err) return false, ErrInvalidBlock(err)
} }
@ -218,7 +218,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
startTime := time.Now().UnixNano() startTime := time.Now().UnixNano()
abciResponses, err := execBlockOnProxyApp(ctx, abciResponses, err := execBlockOnProxyApp(ctx,
blockExec.logger, blockExec.proxyApp, block, blockExec.store, state.InitialHeight,
blockExec.logger, blockExec.appClient, block, blockExec.store, state.InitialHeight,
) )
endTime := time.Now().UnixNano() endTime := time.Now().UnixNano()
blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000) blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
@ -292,7 +292,7 @@ func (blockExec *BlockExecutor) ExtendVote(ctx context.Context, vote *types.Vote
Vote: vote.ToProto(), Vote: vote.ToProto(),
} }
resp, err := blockExec.proxyApp.ExtendVote(ctx, req)
resp, err := blockExec.appClient.ExtendVote(ctx, req)
if err != nil { if err != nil {
return types.VoteExtension{}, err return types.VoteExtension{}, err
} }
@ -304,7 +304,7 @@ func (blockExec *BlockExecutor) VerifyVoteExtension(ctx context.Context, vote *t
Vote: vote.ToProto(), Vote: vote.ToProto(),
} }
resp, err := blockExec.proxyApp.VerifyVoteExtension(ctx, req)
resp, err := blockExec.appClient.VerifyVoteExtension(ctx, req)
if err != nil { if err != nil {
return err return err
} }
@ -340,7 +340,7 @@ func (blockExec *BlockExecutor) Commit(
} }
// Commit block, get hash back // Commit block, get hash back
res, err := blockExec.proxyApp.Commit(ctx)
res, err := blockExec.appClient.Commit(ctx)
if err != nil { if err != nil {
blockExec.logger.Error("client error during proxyAppConn.Commit", "err", err) blockExec.logger.Error("client error during proxyAppConn.Commit", "err", err)
return nil, 0, err return nil, 0, err


+ 4
- 4
node/node.go View File

@ -753,14 +753,14 @@ func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.Gene
return state, nil return state, nil
} }
func getRouterConfig(conf *config.Config, proxyApp abciclient.Client) p2p.RouterOptions {
func getRouterConfig(conf *config.Config, appClient abciclient.Client) p2p.RouterOptions {
opts := p2p.RouterOptions{ opts := p2p.RouterOptions{
QueueType: conf.P2P.QueueType, QueueType: conf.P2P.QueueType,
} }
if conf.FilterPeers && proxyApp != nil {
if conf.FilterPeers && appClient != nil {
opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error { opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error {
res, err := proxyApp.Query(ctx, abci.RequestQuery{
res, err := appClient.Query(ctx, abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/id/%s", id), Path: fmt.Sprintf("/p2p/filter/id/%s", id),
}) })
if err != nil { if err != nil {
@ -774,7 +774,7 @@ func getRouterConfig(conf *config.Config, proxyApp abciclient.Client) p2p.Router
} }
opts.FilterPeerByIP = func(ctx context.Context, ip net.IP, port uint16) error { opts.FilterPeerByIP = func(ctx context.Context, ip net.IP, port uint16) error {
res, err := proxyApp.Query(ctx, abci.RequestQuery{
res, err := appClient.Query(ctx, abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/addr/%s", net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))), Path: fmt.Sprintf("/p2p/filter/addr/%s", net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))),
}) })
if err != nil { if err != nil {


+ 4
- 4
node/setup.go View File

@ -171,7 +171,7 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
func createMempoolReactor( func createMempoolReactor(
ctx context.Context, ctx context.Context,
cfg *config.Config, cfg *config.Config,
proxyApp abciclient.Client,
appClient abciclient.Client,
store sm.Store, store sm.Store,
memplMetrics *mempool.Metrics, memplMetrics *mempool.Metrics,
peerManager *p2p.PeerManager, peerManager *p2p.PeerManager,
@ -183,7 +183,7 @@ func createMempoolReactor(
mp := mempool.NewTxMempool( mp := mempool.NewTxMempool(
logger, logger,
cfg.Mempool, cfg.Mempool,
proxyApp,
appClient,
mempool.WithMetrics(memplMetrics), mempool.WithMetrics(memplMetrics),
mempool.WithPreCheck(sm.TxPreCheckFromStore(store)), mempool.WithPreCheck(sm.TxPreCheckFromStore(store)),
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)), mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
@ -387,7 +387,7 @@ func createRouter(
nodeKey types.NodeKey, nodeKey types.NodeKey,
peerManager *p2p.PeerManager, peerManager *p2p.PeerManager,
cfg *config.Config, cfg *config.Config,
proxyApp abciclient.Client,
appClient abciclient.Client,
) (*p2p.Router, error) { ) (*p2p.Router, error) {
p2pLogger := logger.With("module", "p2p") p2pLogger := logger.With("module", "p2p")
@ -418,7 +418,7 @@ func createRouter(
peerManager, peerManager,
[]p2p.Transport{transport}, []p2p.Transport{transport},
[]p2p.Endpoint{ep}, []p2p.Endpoint{ep},
getRouterConfig(cfg, proxyApp),
getRouterConfig(cfg, appClient),
) )
} }


Loading…
Cancel
Save