Browse Source

remaining

pull/8062/head
tycho garen 3 years ago
parent
commit
30bdc19601
5 changed files with 56 additions and 47 deletions
  1. +25
    -19
      internal/blocksync/reactor.go
  2. +15
    -8
      internal/consensus/state.go
  3. +6
    -11
      internal/state/tx_filter.go
  4. +4
    -3
      node/node.go
  5. +6
    -6
      node/setup.go

+ 25
- 19
internal/blocksync/reactor.go View File

@ -70,6 +70,8 @@ type Reactor struct {
// immutable
initialState sm.State
// store
stateStore sm.Store
blockExec *sm.BlockExecutor
store *store.BlockStore
@ -101,7 +103,7 @@ type Reactor struct {
func NewReactor(
ctx context.Context,
logger log.Logger,
state sm.State,
stateStore sm.Store,
blockExec *sm.BlockExecutor,
store *store.BlockStore,
consReactor consensusReactor,
@ -111,19 +113,6 @@ func NewReactor(
metrics *consensus.Metrics,
eventBus *eventbus.EventBus,
) (*Reactor, error) {
if state.LastBlockHeight != store.Height() {
return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())
}
startHeight := store.Height() + 1
if startHeight == 1 {
startHeight = state.InitialHeight
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor())
if err != nil {
return nil, err
@ -131,20 +120,16 @@ func NewReactor(
r := &Reactor{
logger: logger,
initialState: state,
stateStore: stateStore,
blockExec: blockExec,
store: store,
pool: NewBlockPool(logger, startHeight, requestsCh, errorsCh),
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
requestsCh: requestsCh,
errorsCh: errorsCh,
blockSyncCh: blockSyncCh,
blockSyncOutBridgeCh: make(chan p2p.Envelope),
peerUpdates: peerUpdates,
metrics: metrics,
eventBus: eventBus,
syncStartTime: time.Time{},
}
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
@ -159,6 +144,27 @@ func NewReactor(
// If blockSync is enabled, we also start the pool and the pool processing
// goroutine. If the pool fails to start, an error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
state, err := r.stateStore.Load()
if err != nil {
return err
}
r.initialState = state
if state.LastBlockHeight != r.store.Height() {
return fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, r.store.Height())
}
startHeight := r.store.Height() + 1
if startHeight == 1 {
startHeight = state.InitialHeight
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh)
r.requestsCh = requestsCh
r.errorsCh = errorsCh
if r.blockSync.IsSet() {
if err := r.pool.Start(ctx); err != nil {
return err


+ 15
- 8
internal/consensus/state.go View File

@ -120,6 +120,7 @@ type State struct {
// store blocks and commits
blockStore sm.BlockStore
stateStore sm.Store
// create and execute blocks
blockExec *sm.BlockExecutor
@ -189,7 +190,7 @@ func NewState(
ctx context.Context,
logger log.Logger,
cfg *config.ConsensusConfig,
state sm.State,
store sm.Store,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
txNotifier txNotifier,
@ -201,6 +202,7 @@ func NewState(
config: cfg,
blockExec: blockExec,
blockStore: blockStore,
stateStore: store,
txNotifier: txNotifier,
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
@ -220,13 +222,6 @@ func NewState(
cs.doPrevote = cs.defaultDoPrevote
cs.setProposal = cs.defaultSetProposal
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
}
cs.updateToState(ctx, state)
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
cs.BaseService = *service.NewBaseService(logger, "State", cs)
@ -365,6 +360,18 @@ func (cs *State) LoadCommit(height int64) *types.Commit {
// OnStart loads the latest state via the WAL, and starts the timeout and
// receive routines.
func (cs *State) OnStart(ctx context.Context) error {
state, err := cs.stateStore.Load()
if err != nil {
return err
}
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
}
cs.updateToState(ctx, state)
// We may set the WAL in testing before calling Start, so only OpenWAL if its
// still the nilWAL.
if _, ok := cs.wal.(nilWAL); ok {


+ 6
- 11
internal/state/tx_filter.go View File

@ -10,10 +10,11 @@ import (
)
func cachingStateFetcher(store Store) func() (State, error) {
const ttl = time.Second
var (
last time.Time
mutex = &sync.Mutex{}
ttl time.Duration
cache State
err error
)
@ -31,21 +32,15 @@ func cachingStateFetcher(store Store) func() (State, error) {
return State{}, err
}
last = time.Now()
// at least 100ms but maybe as much as that+
// a block interval. This might end up being
// too much, but it replaces a mechanism that
// cached these values for the entire runtime
// of the process
ttl = (100 * time.Millisecond) + cache.LastBlockTime.Sub(last)
return cache, nil
}
}
// TxPreCheck returns a function to filter transactions before processing.
// TxPreCheckFromStore returns a function to filter transactions before processing.
// The function limits the size of a transaction to the block's maximum data size.
func TxPreCheck(store Store) mempool.PreCheckFunc {
func TxPreCheckFromStore(store Store) mempool.PreCheckFunc {
fetch := cachingStateFetcher(store)
return func(tx types.Tx) error {
@ -69,9 +64,9 @@ func TxPreCheckForState(state State) mempool.PreCheckFunc {
}
// TxPostCheck returns a function to filter transactions after processing.
// TxPostCheckFromStore returns a function to filter transactions after processing.
// The function limits the gas wanted by a transaction to the block's maximum total gas.
func TxPostCheck(store Store) mempool.PostCheckFunc {
func TxPostCheckFromStore(store Store) mempool.PostCheckFunc {
fetch := cachingStateFetcher(store)
return func(tx types.Tx, resp *abci.ResponseCheckTx) error {


+ 4
- 3
node/node.go View File

@ -142,6 +142,7 @@ func makeNode(
genDoc, err := genesisDocProvider()
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
state
}
err = genDoc.ValidateAndComplete()
@ -279,7 +280,7 @@ func makeNode(
}
evReactor, evPool, err := createEvidenceReactor(ctx,
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@ -297,7 +298,7 @@ func makeNode(
)
csReactor, csState, err := createConsensusReactor(ctx,
cfg, state, blockExec, blockStore, mp, evPool,
cfg, stateStore, blockExec, blockStore, mp, evPool,
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
peerManager, router, logger,
)
@ -309,7 +310,7 @@ func makeNode(
// doing a state sync first.
bcReactor, err := blocksync.NewReactor(ctx,
logger.With("module", "blockchain"),
state.Copy(),
stateStore,
blockExec,
blockStore,
csReactor,


+ 6
- 6
node/setup.go View File

@ -185,8 +185,8 @@ func createMempoolReactor(
cfg.Mempool,
proxyApp.Mempool(),
mempool.WithMetrics(memplMetrics),
mempool.WithPreCheck(sm.TxPreCheck(store)),
mempool.WithPostCheck(sm.TxPostCheck(store)),
mempool.WithPreCheck(sm.TxPreCheckFromStore(store)),
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
)
reactor, err := mempool.NewReactor(
@ -213,7 +213,7 @@ func createEvidenceReactor(
ctx context.Context,
cfg *config.Config,
dbProvider config.DBProvider,
stateDB dbm.DB,
store sm.Store,
blockStore *store.BlockStore,
peerManager *p2p.PeerManager,
router *p2p.Router,
@ -228,7 +228,7 @@ func createEvidenceReactor(
logger = logger.With("module", "evidence")
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore, metrics)
evidencePool, err := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics)
if err != nil {
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
}
@ -252,7 +252,7 @@ func createEvidenceReactor(
func createConsensusReactor(
ctx context.Context,
cfg *config.Config,
state sm.State,
store sm.Store,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mp mempool.Mempool,
@ -270,7 +270,7 @@ func createConsensusReactor(
consensusState := consensus.NewState(ctx,
logger,
cfg.Consensus,
state.Copy(),
store,
blockExec,
blockStore,
mp,


Loading…
Cancel
Save