Browse Source

node: nodes should fetch state on startup (#8062)

pull/8070/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
0167f0d527
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 221 additions and 105 deletions
  1. +25
    -19
      internal/blocksync/reactor.go
  2. +1
    -1
      internal/blocksync/reactor_test.go
  3. +21
    -11
      internal/consensus/byzantine_test.go
  4. +6
    -3
      internal/consensus/common_test.go
  5. +4
    -3
      internal/consensus/reactor_test.go
  6. +17
    -12
      internal/consensus/replay_file.go
  7. +36
    -9
      internal/consensus/state.go
  8. +5
    -1
      internal/consensus/wal_generator.go
  9. +1
    -2
      internal/mempool/mempool.go
  10. +1
    -1
      internal/mempool/mempool_test.go
  11. +2
    -2
      internal/state/execution.go
  12. +73
    -10
      internal/state/tx_filter.go
  13. +1
    -1
      internal/state/tx_filter_test.go
  14. +17
    -17
      node/node.go
  15. +0
    -3
      node/node_test.go
  16. +11
    -9
      node/setup.go
  17. +0
    -1
      test/fuzz/mempool/checktx.go

+ 25
- 19
internal/blocksync/reactor.go View File

@ -70,6 +70,8 @@ type Reactor struct {
// immutable
initialState sm.State
// store
stateStore sm.Store
blockExec *sm.BlockExecutor
store *store.BlockStore
@ -101,7 +103,7 @@ type Reactor struct {
func NewReactor(
ctx context.Context,
logger log.Logger,
state sm.State,
stateStore sm.Store,
blockExec *sm.BlockExecutor,
store *store.BlockStore,
consReactor consensusReactor,
@ -111,19 +113,6 @@ func NewReactor(
metrics *consensus.Metrics,
eventBus *eventbus.EventBus,
) (*Reactor, error) {
if state.LastBlockHeight != store.Height() {
return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())
}
startHeight := store.Height() + 1
if startHeight == 1 {
startHeight = state.InitialHeight
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor())
if err != nil {
return nil, err
@ -131,20 +120,16 @@ func NewReactor(
r := &Reactor{
logger: logger,
initialState: state,
stateStore: stateStore,
blockExec: blockExec,
store: store,
pool: NewBlockPool(logger, startHeight, requestsCh, errorsCh),
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
requestsCh: requestsCh,
errorsCh: errorsCh,
blockSyncCh: blockSyncCh,
blockSyncOutBridgeCh: make(chan p2p.Envelope),
peerUpdates: peerUpdates,
metrics: metrics,
eventBus: eventBus,
syncStartTime: time.Time{},
}
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
@ -159,6 +144,27 @@ func NewReactor(
// If blockSync is enabled, we also start the pool and the pool processing
// goroutine. If the pool fails to start, an error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
state, err := r.stateStore.Load()
if err != nil {
return err
}
r.initialState = state
if state.LastBlockHeight != r.store.Height() {
return fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, r.store.Height())
}
startHeight := r.store.Height() + 1
if startHeight == 1 {
startHeight = state.InitialHeight
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh)
r.requestsCh = requestsCh
r.errorsCh = errorsCh
if r.blockSync.IsSet() {
if err := r.pool.Start(ctx); err != nil {
return err


+ 1
- 1
internal/blocksync/reactor_test.go View File

@ -176,7 +176,7 @@ func (rts *reactorTestSuite) addNode(
rts.reactors[nodeID], err = NewReactor(
ctx,
rts.logger.With("nodeID", nodeID),
state.Copy(),
stateStore,
blockExec,
blockStore,
nil,


+ 21
- 11
internal/consensus/byzantine_test.go View File

@ -82,7 +82,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
log.TestingLogger().With("module", "mempool"),
thisConfig.Mempool,
proxyAppConnMem,
0,
)
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
@ -95,7 +94,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(ctx, logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
require.NoError(t, err)
// set private validator
pv := privVals[i]
cs.SetPrivValidator(ctx, pv)
@ -105,14 +105,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
require.NoError(t, err)
cs.SetEventBus(eventBus)
evpool.SetEventBus(eventBus)
cs.SetTimeoutTicker(tickerFunc())
states[i] = cs
}()
}
rts := setup(ctx, t, nValidators, states, 100) // buffer must be large enough to not deadlock
rts := setup(ctx, t, nValidators, states, 512) // buffer must be large enough to not deadlock
var bzNodeID types.NodeID
@ -238,8 +237,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
reactor.SwitchToConsensus(ctx, reactor.state.GetState(), false)
}
// Evidence should be submitted and committed at the third height but
@ -248,20 +246,26 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
var wg sync.WaitGroup
i := 0
subctx, subcancel := context.WithCancel(ctx)
defer subcancel()
for _, sub := range rts.subs {
wg.Add(1)
go func(j int, s eventbus.Subscription) {
defer wg.Done()
for {
if ctx.Err() != nil {
if subctx.Err() != nil {
return
}
msg, err := s.Next(subctx)
if subctx.Err() != nil {
return
}
msg, err := s.Next(ctx)
assert.NoError(t, err)
if err != nil {
cancel()
t.Errorf("waiting for subscription: %v", err)
subcancel()
return
}
@ -273,12 +277,18 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
}
}(i, sub)
i++
}
wg.Wait()
// don't run more assertions if we've encountered a timeout
select {
case <-subctx.Done():
t.Fatal("encountered timeout")
default:
}
pubkey, err := bzNodeState.privValidator.GetPubKey(ctx)
require.NoError(t, err)


+ 6
- 3
internal/consensus/common_test.go View File

@ -469,7 +469,6 @@ func newStateWithConfigAndBlockStore(
logger.With("module", "mempool"),
thisConfig.Mempool,
proxyAppConnMem,
0,
)
if thisConfig.Consensus.WaitForTxs() {
@ -484,15 +483,19 @@ func newStateWithConfigAndBlockStore(
require.NoError(t, stateStore.Save(state))
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(ctx,
cs, err := NewState(ctx,
logger.With("module", "consensus"),
thisConfig.Consensus,
state,
stateStore,
blockExec,
blockStore,
mempool,
evpool,
)
if err != nil {
t.Fatal(err)
}
cs.SetPrivValidator(ctx, pv)
eventBus := eventbus.NewDefault(logger.With("module", "events"))


+ 4
- 3
internal/consensus/reactor_test.go View File

@ -461,6 +461,7 @@ func TestReactorWithEvidence(t *testing.T) {
stateStore := sm.NewStore(stateDB)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
require.NoError(t, stateStore.Save(state))
thisConfig, err := ResetConfig(t.TempDir(), fmt.Sprintf("%s_%d", testName, i))
require.NoError(t, err)
@ -483,7 +484,6 @@ func TestReactorWithEvidence(t *testing.T) {
log.TestingLogger().With("module", "mempool"),
thisConfig.Mempool,
proxyAppConnMem,
0,
)
if thisConfig.Consensus.WaitForTxs() {
@ -506,8 +506,9 @@ func TestReactorWithEvidence(t *testing.T) {
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(ctx, logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2)
require.NoError(t, err)
cs.SetPrivValidator(ctx, pv)
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))


+ 17
- 12
internal/consensus/replay_file.go View File

@ -84,7 +84,7 @@ func (cs *State) ReplayFile(ctx context.Context, file string, console bool) erro
return err
}
pb := newPlayback(file, fp, cs, cs.state.Copy())
pb := newPlayback(file, fp, cs, cs.stateStore)
defer pb.fp.Close()
var nextN int // apply N msgs in a row
@ -126,17 +126,17 @@ type playback struct {
count int // how many lines/msgs into the file are we
// replays can be reset to beginning
fileName string // so we can close/reopen the file
genesisState sm.State // so the replay session knows where to restart from
fileName string // so we can close/reopen the file
stateStore sm.Store
}
func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *playback {
func newPlayback(fileName string, fp *os.File, cs *State, store sm.Store) *playback {
return &playback{
cs: cs,
fp: fp,
fileName: fileName,
genesisState: genState,
dec: NewWALDecoder(fp),
cs: cs,
fp: fp,
fileName: fileName,
stateStore: store,
dec: NewWALDecoder(fp),
}
}
@ -145,8 +145,11 @@ func (pb *playback) replayReset(ctx context.Context, count int, newStepSub event
pb.cs.Stop()
pb.cs.Wait()
newCS := NewState(ctx, pb.cs.logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
newCS, err := NewState(ctx, pb.cs.logger, pb.cs.config, pb.stateStore, pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
if err != nil {
return err
}
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()
@ -345,9 +348,11 @@ func newConsensusStateForReplay(
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(ctx, logger, csConfig, state.Copy(), blockExec,
consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec,
blockStore, mempool, evpool)
if err != nil {
return nil, err
}
consensusState.SetEventBus(eventBus)
return consensusState, nil
}

+ 36
- 9
internal/consensus/state.go View File

@ -121,6 +121,9 @@ type State struct {
// store blocks and commits
blockStore sm.BlockStore
stateStore sm.Store
initialStatePopulated bool
// create and execute blocks
blockExec *sm.BlockExecutor
@ -189,18 +192,19 @@ func NewState(
ctx context.Context,
logger log.Logger,
cfg *config.ConsensusConfig,
state sm.State,
store sm.Store,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
txNotifier txNotifier,
evpool evidencePool,
options ...StateOption,
) *State {
) (*State, error) {
cs := &State{
logger: logger,
config: cfg,
blockExec: blockExec,
blockStore: blockStore,
stateStore: store,
txNotifier: txNotifier,
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
@ -220,21 +224,40 @@ func NewState(
cs.doPrevote = cs.defaultDoPrevote
cs.setProposal = cs.defaultSetProposal
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
if err := cs.updateStateFromStore(ctx); err != nil {
return nil, err
}
cs.updateToState(ctx, state)
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
cs.BaseService = *service.NewBaseService(logger, "State", cs)
for _, option := range options {
option(cs)
}
return cs
return cs, nil
}
func (cs *State) updateStateFromStore(ctx context.Context) error {
if cs.initialStatePopulated {
return nil
}
state, err := cs.stateStore.Load()
if err != nil {
return fmt.Errorf("loading state: %w", err)
}
if state.IsEmpty() {
return nil
}
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
}
cs.updateToState(ctx, state)
cs.initialStatePopulated = true
return nil
}
// SetEventBus sets event bus.
@ -365,6 +388,10 @@ func (cs *State) LoadCommit(height int64) *types.Commit {
// OnStart loads the latest state via the WAL, and starts the timeout and
// receive routines.
func (cs *State) OnStart(ctx context.Context) error {
if err := cs.updateStateFromStore(ctx); err != nil {
return err
}
// We may set the WAL in testing before calling Start, so only OpenWAL if its
// still the nilWAL.
if _, ok := cs.wal.(nilWAL); ok {


+ 5
- 1
internal/consensus/wal_generator.go View File

@ -83,7 +83,11 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(ctx, logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
if err != nil {
t.Fatal(err)
}
consensusState.SetEventBus(eventBus)
if privValidator != nil && privValidator != (*privval.FilePV)(nil) {
consensusState.SetPrivValidator(ctx, privValidator)


+ 1
- 2
internal/mempool/mempool.go View File

@ -94,7 +94,6 @@ func NewTxMempool(
logger log.Logger,
cfg *config.MempoolConfig,
proxyAppConn proxy.AppConnMempool,
height int64,
options ...TxMempoolOption,
) *TxMempool {
@ -102,7 +101,7 @@ func NewTxMempool(
logger: logger,
config: cfg,
proxyAppConn: proxyAppConn,
height: height,
height: -1,
cache: NopTxCache{},
metrics: NopMetrics(),
txStore: NewTxStore(),


+ 1
- 1
internal/mempool/mempool_test.go View File

@ -95,7 +95,7 @@ func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoo
appConnMem.Wait()
})
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, options...)
}
func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {


+ 2
- 2
internal/state/execution.go View File

@ -360,8 +360,8 @@ func (blockExec *BlockExecutor) Commit(
block.Height,
block.Txs,
deliverTxResponses,
TxPreCheck(state),
TxPostCheck(state),
TxPreCheckForState(state),
TxPostCheckForState(state),
)
return res.Data, res.RetainHeight, err


+ 73
- 10
internal/state/tx_filter.go View File

@ -1,22 +1,85 @@
package state
import (
"sync"
"time"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/types"
)
// TxPreCheck returns a function to filter transactions before processing.
// The function limits the size of a transaction to the block's maximum data size.
func TxPreCheck(state State) mempool.PreCheckFunc {
maxDataBytes := types.MaxDataBytesNoEvidence(
state.ConsensusParams.Block.MaxBytes,
state.Validators.Size(),
func cachingStateFetcher(store Store) func() (State, error) {
const ttl = time.Second
var (
last time.Time
mutex = &sync.Mutex{}
cache State
err error
)
return mempool.PreCheckMaxBytes(maxDataBytes)
return func() (State, error) {
mutex.Lock()
defer mutex.Unlock()
if time.Since(last) < ttl && cache.ChainID != "" {
return cache, nil
}
cache, err = store.Load()
if err != nil {
return State{}, err
}
last = time.Now()
return cache, nil
}
}
// TxPostCheck returns a function to filter transactions after processing.
// TxPreCheckFromStore returns a function to filter transactions before processing.
// The function limits the size of a transaction to the block's maximum data size.
func TxPreCheckFromStore(store Store) mempool.PreCheckFunc {
fetch := cachingStateFetcher(store)
return func(tx types.Tx) error {
state, err := fetch()
if err != nil {
return err
}
return TxPreCheckForState(state)(tx)
}
}
func TxPreCheckForState(state State) mempool.PreCheckFunc {
return func(tx types.Tx) error {
maxDataBytes := types.MaxDataBytesNoEvidence(
state.ConsensusParams.Block.MaxBytes,
state.Validators.Size(),
)
return mempool.PreCheckMaxBytes(maxDataBytes)(tx)
}
}
// TxPostCheckFromStore returns a function to filter transactions after processing.
// The function limits the gas wanted by a transaction to the block's maximum total gas.
func TxPostCheck(state State) mempool.PostCheckFunc {
return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)
func TxPostCheckFromStore(store Store) mempool.PostCheckFunc {
fetch := cachingStateFetcher(store)
return func(tx types.Tx, resp *abci.ResponseCheckTx) error {
state, err := fetch()
if err != nil {
return err
}
return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)(tx, resp)
}
}
func TxPostCheckForState(state State) mempool.PostCheckFunc {
return func(tx types.Tx, resp *abci.ResponseCheckTx) error {
return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)(tx, resp)
}
}

+ 1
- 1
internal/state/tx_filter_test.go View File

@ -31,7 +31,7 @@ func TestTxFilter(t *testing.T) {
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
f := sm.TxPreCheck(state)
f := sm.TxPreCheckForState(state)
if tc.isErr {
assert.NotNil(t, f(tc.tx), "#%v", i)
} else {


+ 17
- 17
node/node.go View File

@ -143,11 +143,8 @@ func makeNode(
return nil, combineCloseError(err, makeCloser(closers))
}
err = genDoc.ValidateAndComplete()
if err != nil {
return nil, combineCloseError(
fmt.Errorf("error in genesis doc: %w", err),
makeCloser(closers))
if err = genDoc.ValidateAndComplete(); err != nil {
return nil, combineCloseError(fmt.Errorf("error in genesis doc: %w", err), makeCloser(closers))
}
state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc)
@ -241,10 +238,6 @@ func makeNode(
}
}
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey)
logNodeStartupInfo(state, pubKey, logger, cfg.Mode)
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
@ -271,14 +264,14 @@ func makeNode(
}
mpReactor, mp, err := createMempoolReactor(ctx,
cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger,
cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
evReactor, evPool, err := createEvidenceReactor(ctx,
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@ -295,8 +288,12 @@ func makeNode(
sm.BlockExecutorWithMetrics(nodeMetrics.state),
)
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey)
csReactor, csState, err := createConsensusReactor(ctx,
cfg, state, blockExec, blockStore, mp, evPool,
cfg, stateStore, blockExec, blockStore, mp, evPool,
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
peerManager, router, logger,
)
@ -308,7 +305,7 @@ func makeNode(
// doing a state sync first.
bcReactor, err := blocksync.NewReactor(ctx,
logger.With("module", "blockchain"),
state.Copy(),
stateStore,
blockExec,
blockStore,
csReactor,
@ -730,10 +727,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider {
// loadStateFromDBOrGenesisDocProvider attempts to load the state from the
// database, or creates one using the given genesisDocProvider. On success this also
// returns the genesis doc loaded through the given provider.
func loadStateFromDBOrGenesisDocProvider(
stateStore sm.Store,
genDoc *types.GenesisDoc,
) (sm.State, error) {
func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.GenesisDoc) (sm.State, error) {
// 1. Attempt to load state form the database
state, err := stateStore.Load()
@ -747,6 +741,12 @@ func loadStateFromDBOrGenesisDocProvider(
if err != nil {
return sm.State{}, err
}
// 3. save the gensis document to the state store so
// its fetchable by other callers.
if err := stateStore.Save(state); err != nil {
return sm.State{}, err
}
}
return state, nil


+ 0
- 3
node/node_test.go View File

@ -292,7 +292,6 @@ func TestCreateProposalBlock(t *testing.T) {
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
)
// Make EvidencePool
@ -392,7 +391,6 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
)
// fill the mempool with one txs just below the maximum size
@ -457,7 +455,6 @@ func TestMaxProposalBlockSize(t *testing.T) {
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
)
// fill the mempool with one txs just below the maximum size


+ 11
- 9
node/setup.go View File

@ -172,7 +172,7 @@ func createMempoolReactor(
ctx context.Context,
cfg *config.Config,
proxyApp proxy.AppConns,
state sm.State,
store sm.Store,
memplMetrics *mempool.Metrics,
peerManager *p2p.PeerManager,
router *p2p.Router,
@ -184,10 +184,9 @@ func createMempoolReactor(
logger,
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempool.WithMetrics(memplMetrics),
mempool.WithPreCheck(sm.TxPreCheck(state)),
mempool.WithPostCheck(sm.TxPostCheck(state)),
mempool.WithPreCheck(sm.TxPreCheckFromStore(store)),
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
)
reactor, err := mempool.NewReactor(
@ -214,7 +213,7 @@ func createEvidenceReactor(
ctx context.Context,
cfg *config.Config,
dbProvider config.DBProvider,
stateDB dbm.DB,
store sm.Store,
blockStore *store.BlockStore,
peerManager *p2p.PeerManager,
router *p2p.Router,
@ -229,7 +228,7 @@ func createEvidenceReactor(
logger = logger.With("module", "evidence")
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore, metrics)
evidencePool, err := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics)
if err != nil {
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
}
@ -253,7 +252,7 @@ func createEvidenceReactor(
func createConsensusReactor(
ctx context.Context,
cfg *config.Config,
state sm.State,
store sm.Store,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mp mempool.Mempool,
@ -268,16 +267,19 @@ func createConsensusReactor(
) (*consensus.Reactor, *consensus.State, error) {
logger = logger.With("module", "consensus")
consensusState := consensus.NewState(ctx,
consensusState, err := consensus.NewState(ctx,
logger,
cfg.Consensus,
state.Copy(),
store,
blockExec,
blockStore,
mp,
evidencePool,
consensus.StateMetrics(csMetrics),
)
if err != nil {
return nil, nil, err
}
if privValidator != nil && cfg.Mode == config.ModeValidator {
consensusState.SetPrivValidator(ctx, privValidator)


+ 0
- 1
test/fuzz/mempool/checktx.go View File

@ -31,7 +31,6 @@ func init() {
log.NewNopLogger(),
cfg,
appConnMem,
0,
)
}


Loading…
Cancel
Save