From 74af343f282651f63cbb157a4bfaed7658a339f8 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Fri, 11 Jun 2021 17:26:18 +0200 Subject: [PATCH] statesync: tune backfill process (#6565) This PR make some tweaks to backfill after running e2e tests: - Separates sync and backfill as two distinct processes that the node calls. The reason is because if sync fails then the node should fail but if backfill fails it is still possible to proceed. - Removes peers who don't have the block at a height from the local peer list. As the process goes backwards if a node doesn't have a block at a height they're likely pruning blocks and thus they won't have any prior ones either. - Sleep when we've run out of peers, then try again. --- internal/statesync/block_queue.go | 4 +- internal/statesync/reactor.go | 103 ++++++++++++++++------------- internal/statesync/reactor_test.go | 2 +- internal/statesync/syncer.go | 2 +- node/node.go | 9 +-- test/e2e/runner/evidence.go | 2 +- test/e2e/runner/perturb.go | 2 +- 7 files changed, 67 insertions(+), 57 deletions(-) diff --git a/internal/statesync/block_queue.go b/internal/statesync/block_queue.go index fd8cac278..946875490 100644 --- a/internal/statesync/block_queue.go +++ b/internal/statesync/block_queue.go @@ -211,8 +211,8 @@ func (q *blockQueue) error() error { q.mtx.Lock() defer q.mtx.Unlock() if q.retries >= q.maxRetries { - return fmt.Errorf("failed to backfill blocks following reverse sync. Max retries exceeded (%d). "+ - "Target height: %d, height reached: %d", q.maxRetries, q.stopHeight, q.verifyHeight) + return fmt.Errorf("max retries to fetch valid blocks exceeded (%d); "+ + "target height: %d, height reached: %d", q.maxRetries, q.stopHeight, q.verifyHeight) } return nil } diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 703cbeedb..b5436d1ab 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -98,6 +98,10 @@ const ( // maxLightBlockRequestRetries is the amount of retries acceptable before // the backfill process aborts maxLightBlockRequestRetries = 20 + + // the amount of processes fetching light blocks - this should be roughly calculated + // as the time to fetch a block / time to verify a block + lightBlockFetchers = 4 ) // Reactor handles state sync, both restoring snapshots for the local node and @@ -206,11 +210,11 @@ func (r *Reactor) OnStop() { // application. It also saves tendermint state and runs a backfill process to // retrieve the necessary amount of headers, commits and validators sets to be // able to process evidence and participate in consensus. -func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) error { +func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) (sm.State, error) { r.mtx.Lock() if r.syncer != nil { r.mtx.Unlock() - return errors.New("a state sync is already in progress") + return sm.State{}, errors.New("a state sync is already in progress") } r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir) @@ -229,7 +233,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) state, commit, err := r.syncer.SyncAny(discoveryTime, hook) if err != nil { - return err + return sm.State{}, err } r.mtx.Lock() @@ -238,24 +242,41 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) err = r.stateStore.Bootstrap(state) if err != nil { - return fmt.Errorf("failed to bootstrap node with new state: %w", err) + return sm.State{}, fmt.Errorf("failed to bootstrap node with new state: %w", err) } err = r.blockStore.SaveSeenCommit(state.LastBlockHeight, commit) if err != nil { - return fmt.Errorf("failed to store last seen commit: %w", err) + return sm.State{}, fmt.Errorf("failed to store last seen commit: %w", err) } - // start backfill process to retrieve the necessary headers, commits and - // validator sets - return r.backfill(state) + return state, nil } // Backfill sequentially fetches, verifies and stores light blocks in reverse // order. It does not stop verifying blocks until reaching a block with a height // and time that is less or equal to the stopHeight and stopTime. The // trustedBlockID should be of the header at startHeight. -func (r *Reactor) Backfill( +func (r *Reactor) Backfill(state sm.State) error { + params := state.ConsensusParams.Evidence + stopHeight := state.LastBlockHeight - params.MaxAgeNumBlocks + stopTime := state.LastBlockTime.Add(-params.MaxAgeDuration) + // ensure that stop height doesn't go below the initial height + if stopHeight < state.InitialHeight { + stopHeight = state.InitialHeight + // this essentially makes stop time a void criteria for termination + stopTime = state.LastBlockTime + } + return r.backfill( + context.Background(), + state.ChainID, + state.LastBlockHeight, stopHeight, + state.LastBlockID, + stopTime, + ) +} + +func (r *Reactor) backfill( ctx context.Context, chainID string, startHeight, stopHeight int64, @@ -265,6 +286,7 @@ func (r *Reactor) Backfill( r.Logger.Info("starting backfill process...", "startHeight", startHeight, "stopHeight", stopHeight, "trustedBlockID", trustedBlockID) + const sleepTime = 1 * time.Second var ( lastValidatorSet *types.ValidatorSet lastChangeHeight int64 = startHeight @@ -277,7 +299,7 @@ func (r *Reactor) Backfill( // time. Ideally we want the verification process to never have to be // waiting on blocks. If it takes 4s to retrieve a block and 1s to verify // it, then steady state involves four workers. - for i := 0; i < 4; i++ { + for i := 0; i < lightBlockFetchers; i++ { go func() { for { select { @@ -285,30 +307,33 @@ func (r *Reactor) Backfill( r.Logger.Debug("fetching next block", "height", height) lb, peer, err := r.dispatcher.LightBlock(ctx, height) if err != nil { - // we don't punish the peer as it might just not have the block - // at that height - r.Logger.Info("error with fetching light block", - "height", height, "err", err) queue.retry(height) + if errors.Is(err, errNoConnectedPeers) { + r.Logger.Info("backfill: no connected peers to fetch light blocks from; sleeping...", + "sleepTime", sleepTime) + time.Sleep(sleepTime) + } else { + // we don't punish the peer as it might just have not responded in time + r.Logger.Info("backfill: error with fetching light block", + "height", height, "err", err) + } continue } if lb == nil { - r.Logger.Info("peer didn't have block, fetching from another peer", "height", height) - queue.retry(height) - continue - } - - if lb.Height != height { - r.Logger.Info("peer provided wrong height, retrying...", "height", height) + r.Logger.Info("backfill: peer didn't have block, fetching from another peer", "height", height) queue.retry(height) + // as we are fetching blocks backwards, if this node doesn't have the block it likely doesn't + // have any prior ones, thus we remove it from the peer list + r.dispatcher.removePeer(peer) continue } // run a validate basic. This checks the validator set and commit // hashes line up err = lb.ValidateBasic(chainID) - if err != nil { - r.Logger.Info("fetched light block failed validate basic, removing peer...", "err", err) + if err != nil || lb.Height != height { + r.Logger.Info("backfill: fetched light block failed validate basic, removing peer...", + "err", err, "height", height) queue.retry(height) r.blockCh.Error <- p2p.PeerError{ NodeID: peer, @@ -322,7 +347,7 @@ func (r *Reactor) Backfill( block: lb, peer: peer, }) - r.Logger.Debug("added light block to processing queue", "height", height) + r.Logger.Debug("backfill: added light block to processing queue", "height", height) case <-queue.done(): return @@ -376,7 +401,7 @@ func (r *Reactor) Backfill( trustedBlockID = resp.block.LastBlockID queue.success(resp.block.Height) - r.Logger.Info("verified and stored light block", "height", resp.block.Height) + r.Logger.Info("backfill: verified and stored light block", "height", resp.block.Height) lastValidatorSet = resp.block.ValidatorSet @@ -386,7 +411,12 @@ func (r *Reactor) Backfill( } // save the final batch of validators - return r.stateStore.SaveValidatorSets(queue.terminal.Height, lastChangeHeight, lastValidatorSet) + if err := r.stateStore.SaveValidatorSets(queue.terminal.Height, lastChangeHeight, lastValidatorSet); err != nil { + return err + } + + r.Logger.Info("successfully completed backfill process", "endHeight", queue.terminal.Height) + return nil } } } @@ -777,24 +807,3 @@ func (r *Reactor) fetchLightBlock(height uint64) (*types.LightBlock, error) { }, nil } - -// backfill is a convenience wrapper around the backfill function. It takes -// state to work out how many prior blocks need to be verified -func (r *Reactor) backfill(state sm.State) error { - params := state.ConsensusParams.Evidence - stopHeight := state.LastBlockHeight - params.MaxAgeNumBlocks - stopTime := state.LastBlockTime.Add(-params.MaxAgeDuration) - // ensure that stop height doesn't go below the initial height - if stopHeight < state.InitialHeight { - stopHeight = state.InitialHeight - // this essentially makes stop time a void criteria for termination - stopTime = state.LastBlockTime - } - return r.Backfill( - context.Background(), - state.ChainID, - state.LastBlockHeight, stopHeight, - state.LastBlockID, - stopTime, - ) -} diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 36a8f9075..fab6e30c7 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -454,7 +454,7 @@ func TestReactor_Backfill(t *testing.T) { go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, failureRate) - err := rts.reactor.Backfill( + err := rts.reactor.backfill( context.Background(), factory.DefaultTestChainID, startHeight, diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 47f640aa3..d58c27d61 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -143,7 +143,7 @@ func (s *syncer) RemovePeer(peerID p2p.NodeID) { // which the caller must use to bootstrap the node. func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.State, *types.Commit, error) { if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime { - discoveryTime = 5 * minimumDiscoveryTime + discoveryTime = minimumDiscoveryTime } if discoveryTime > 0 { diff --git a/node/node.go b/node/node.go index 3a84fb2b4..45f72fd74 100644 --- a/node/node.go +++ b/node/node.go @@ -1054,20 +1054,21 @@ func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reacto } go func() { - err := ssR.Sync(stateProvider, config.DiscoveryTime) + state, err := ssR.Sync(stateProvider, config.DiscoveryTime) if err != nil { ssR.Logger.Error("state sync failed", "err", err) return } - state, err := stateStore.Load() + err = ssR.Backfill(state) if err != nil { - ssR.Logger.Error("failed to load state after statesync", "err", err) + ssR.Logger.Error("backfill failed; node has insufficient history to verify all evidence;"+ + " proceeding optimistically...", "err", err) } + conR.Metrics.StateSyncing.Set(0) if fastSync { // FIXME Very ugly to have these metrics bleed through here. - conR.Metrics.StateSyncing.Set(0) conR.Metrics.FastSyncing.Set(1) err = bcR.SwitchToFastSync(state) if err != nil { diff --git a/test/e2e/runner/evidence.go b/test/e2e/runner/evidence.go index a9373e355..d2fa0bc56 100644 --- a/test/e2e/runner/evidence.go +++ b/test/e2e/runner/evidence.go @@ -65,7 +65,7 @@ func InjectEvidence(testnet *e2e.Testnet, amount int) error { // wait for the node to reach the height above the forged height so that // it is able to validate the evidence - status, err := waitForNode(targetNode, waitHeight, 10*time.Second) + status, err := waitForNode(targetNode, waitHeight, 15*time.Second) if err != nil { return err } diff --git a/test/e2e/runner/perturb.go b/test/e2e/runner/perturb.go index 50c9f67a7..81f3d77ac 100644 --- a/test/e2e/runner/perturb.go +++ b/test/e2e/runner/perturb.go @@ -72,7 +72,7 @@ func PerturbNode(node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.Resul return nil, nil } - status, err := waitForNode(node, 0, 10*time.Second) + status, err := waitForNode(node, 0, 15*time.Second) if err != nil { return nil, err }