diff --git a/internal/statesync/block_queue.go b/internal/statesync/block_queue.go index fd8cac278..946875490 100644 --- a/internal/statesync/block_queue.go +++ b/internal/statesync/block_queue.go @@ -211,8 +211,8 @@ func (q *blockQueue) error() error { q.mtx.Lock() defer q.mtx.Unlock() if q.retries >= q.maxRetries { - return fmt.Errorf("failed to backfill blocks following reverse sync. Max retries exceeded (%d). "+ - "Target height: %d, height reached: %d", q.maxRetries, q.stopHeight, q.verifyHeight) + return fmt.Errorf("max retries to fetch valid blocks exceeded (%d); "+ + "target height: %d, height reached: %d", q.maxRetries, q.stopHeight, q.verifyHeight) } return nil } diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 703cbeedb..b5436d1ab 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -98,6 +98,10 @@ const ( // maxLightBlockRequestRetries is the amount of retries acceptable before // the backfill process aborts maxLightBlockRequestRetries = 20 + + // the amount of processes fetching light blocks - this should be roughly calculated + // as the time to fetch a block / time to verify a block + lightBlockFetchers = 4 ) // Reactor handles state sync, both restoring snapshots for the local node and @@ -206,11 +210,11 @@ func (r *Reactor) OnStop() { // application. It also saves tendermint state and runs a backfill process to // retrieve the necessary amount of headers, commits and validators sets to be // able to process evidence and participate in consensus. -func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) error { +func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) (sm.State, error) { r.mtx.Lock() if r.syncer != nil { r.mtx.Unlock() - return errors.New("a state sync is already in progress") + return sm.State{}, errors.New("a state sync is already in progress") } r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir) @@ -229,7 +233,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) state, commit, err := r.syncer.SyncAny(discoveryTime, hook) if err != nil { - return err + return sm.State{}, err } r.mtx.Lock() @@ -238,24 +242,41 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) err = r.stateStore.Bootstrap(state) if err != nil { - return fmt.Errorf("failed to bootstrap node with new state: %w", err) + return sm.State{}, fmt.Errorf("failed to bootstrap node with new state: %w", err) } err = r.blockStore.SaveSeenCommit(state.LastBlockHeight, commit) if err != nil { - return fmt.Errorf("failed to store last seen commit: %w", err) + return sm.State{}, fmt.Errorf("failed to store last seen commit: %w", err) } - // start backfill process to retrieve the necessary headers, commits and - // validator sets - return r.backfill(state) + return state, nil } // Backfill sequentially fetches, verifies and stores light blocks in reverse // order. It does not stop verifying blocks until reaching a block with a height // and time that is less or equal to the stopHeight and stopTime. The // trustedBlockID should be of the header at startHeight. -func (r *Reactor) Backfill( +func (r *Reactor) Backfill(state sm.State) error { + params := state.ConsensusParams.Evidence + stopHeight := state.LastBlockHeight - params.MaxAgeNumBlocks + stopTime := state.LastBlockTime.Add(-params.MaxAgeDuration) + // ensure that stop height doesn't go below the initial height + if stopHeight < state.InitialHeight { + stopHeight = state.InitialHeight + // this essentially makes stop time a void criteria for termination + stopTime = state.LastBlockTime + } + return r.backfill( + context.Background(), + state.ChainID, + state.LastBlockHeight, stopHeight, + state.LastBlockID, + stopTime, + ) +} + +func (r *Reactor) backfill( ctx context.Context, chainID string, startHeight, stopHeight int64, @@ -265,6 +286,7 @@ func (r *Reactor) Backfill( r.Logger.Info("starting backfill process...", "startHeight", startHeight, "stopHeight", stopHeight, "trustedBlockID", trustedBlockID) + const sleepTime = 1 * time.Second var ( lastValidatorSet *types.ValidatorSet lastChangeHeight int64 = startHeight @@ -277,7 +299,7 @@ func (r *Reactor) Backfill( // time. Ideally we want the verification process to never have to be // waiting on blocks. If it takes 4s to retrieve a block and 1s to verify // it, then steady state involves four workers. - for i := 0; i < 4; i++ { + for i := 0; i < lightBlockFetchers; i++ { go func() { for { select { @@ -285,30 +307,33 @@ func (r *Reactor) Backfill( r.Logger.Debug("fetching next block", "height", height) lb, peer, err := r.dispatcher.LightBlock(ctx, height) if err != nil { - // we don't punish the peer as it might just not have the block - // at that height - r.Logger.Info("error with fetching light block", - "height", height, "err", err) queue.retry(height) + if errors.Is(err, errNoConnectedPeers) { + r.Logger.Info("backfill: no connected peers to fetch light blocks from; sleeping...", + "sleepTime", sleepTime) + time.Sleep(sleepTime) + } else { + // we don't punish the peer as it might just have not responded in time + r.Logger.Info("backfill: error with fetching light block", + "height", height, "err", err) + } continue } if lb == nil { - r.Logger.Info("peer didn't have block, fetching from another peer", "height", height) - queue.retry(height) - continue - } - - if lb.Height != height { - r.Logger.Info("peer provided wrong height, retrying...", "height", height) + r.Logger.Info("backfill: peer didn't have block, fetching from another peer", "height", height) queue.retry(height) + // as we are fetching blocks backwards, if this node doesn't have the block it likely doesn't + // have any prior ones, thus we remove it from the peer list + r.dispatcher.removePeer(peer) continue } // run a validate basic. This checks the validator set and commit // hashes line up err = lb.ValidateBasic(chainID) - if err != nil { - r.Logger.Info("fetched light block failed validate basic, removing peer...", "err", err) + if err != nil || lb.Height != height { + r.Logger.Info("backfill: fetched light block failed validate basic, removing peer...", + "err", err, "height", height) queue.retry(height) r.blockCh.Error <- p2p.PeerError{ NodeID: peer, @@ -322,7 +347,7 @@ func (r *Reactor) Backfill( block: lb, peer: peer, }) - r.Logger.Debug("added light block to processing queue", "height", height) + r.Logger.Debug("backfill: added light block to processing queue", "height", height) case <-queue.done(): return @@ -376,7 +401,7 @@ func (r *Reactor) Backfill( trustedBlockID = resp.block.LastBlockID queue.success(resp.block.Height) - r.Logger.Info("verified and stored light block", "height", resp.block.Height) + r.Logger.Info("backfill: verified and stored light block", "height", resp.block.Height) lastValidatorSet = resp.block.ValidatorSet @@ -386,7 +411,12 @@ func (r *Reactor) Backfill( } // save the final batch of validators - return r.stateStore.SaveValidatorSets(queue.terminal.Height, lastChangeHeight, lastValidatorSet) + if err := r.stateStore.SaveValidatorSets(queue.terminal.Height, lastChangeHeight, lastValidatorSet); err != nil { + return err + } + + r.Logger.Info("successfully completed backfill process", "endHeight", queue.terminal.Height) + return nil } } } @@ -777,24 +807,3 @@ func (r *Reactor) fetchLightBlock(height uint64) (*types.LightBlock, error) { }, nil } - -// backfill is a convenience wrapper around the backfill function. It takes -// state to work out how many prior blocks need to be verified -func (r *Reactor) backfill(state sm.State) error { - params := state.ConsensusParams.Evidence - stopHeight := state.LastBlockHeight - params.MaxAgeNumBlocks - stopTime := state.LastBlockTime.Add(-params.MaxAgeDuration) - // ensure that stop height doesn't go below the initial height - if stopHeight < state.InitialHeight { - stopHeight = state.InitialHeight - // this essentially makes stop time a void criteria for termination - stopTime = state.LastBlockTime - } - return r.Backfill( - context.Background(), - state.ChainID, - state.LastBlockHeight, stopHeight, - state.LastBlockID, - stopTime, - ) -} diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 36a8f9075..fab6e30c7 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -454,7 +454,7 @@ func TestReactor_Backfill(t *testing.T) { go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, failureRate) - err := rts.reactor.Backfill( + err := rts.reactor.backfill( context.Background(), factory.DefaultTestChainID, startHeight, diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 47f640aa3..d58c27d61 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -143,7 +143,7 @@ func (s *syncer) RemovePeer(peerID p2p.NodeID) { // which the caller must use to bootstrap the node. func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.State, *types.Commit, error) { if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime { - discoveryTime = 5 * minimumDiscoveryTime + discoveryTime = minimumDiscoveryTime } if discoveryTime > 0 { diff --git a/node/node.go b/node/node.go index 3a84fb2b4..45f72fd74 100644 --- a/node/node.go +++ b/node/node.go @@ -1054,20 +1054,21 @@ func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reacto } go func() { - err := ssR.Sync(stateProvider, config.DiscoveryTime) + state, err := ssR.Sync(stateProvider, config.DiscoveryTime) if err != nil { ssR.Logger.Error("state sync failed", "err", err) return } - state, err := stateStore.Load() + err = ssR.Backfill(state) if err != nil { - ssR.Logger.Error("failed to load state after statesync", "err", err) + ssR.Logger.Error("backfill failed; node has insufficient history to verify all evidence;"+ + " proceeding optimistically...", "err", err) } + conR.Metrics.StateSyncing.Set(0) if fastSync { // FIXME Very ugly to have these metrics bleed through here. - conR.Metrics.StateSyncing.Set(0) conR.Metrics.FastSyncing.Set(1) err = bcR.SwitchToFastSync(state) if err != nil { diff --git a/test/e2e/runner/evidence.go b/test/e2e/runner/evidence.go index a9373e355..d2fa0bc56 100644 --- a/test/e2e/runner/evidence.go +++ b/test/e2e/runner/evidence.go @@ -65,7 +65,7 @@ func InjectEvidence(testnet *e2e.Testnet, amount int) error { // wait for the node to reach the height above the forged height so that // it is able to validate the evidence - status, err := waitForNode(targetNode, waitHeight, 10*time.Second) + status, err := waitForNode(targetNode, waitHeight, 15*time.Second) if err != nil { return err } diff --git a/test/e2e/runner/perturb.go b/test/e2e/runner/perturb.go index 50c9f67a7..81f3d77ac 100644 --- a/test/e2e/runner/perturb.go +++ b/test/e2e/runner/perturb.go @@ -72,7 +72,7 @@ func PerturbNode(node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.Resul return nil, nil } - status, err := waitForNode(node, 0, 10*time.Second) + status, err := waitForNode(node, 0, 15*time.Second) if err != nil { return nil, err }