From d515bbcf1daf386b48247b76bb60f1adf889ecb2 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Fri, 18 Jun 2021 09:59:52 +0200 Subject: [PATCH] statesync: increase chunk priority and robustness (#6582) --- CHANGELOG_PENDING.md | 5 ++-- config/config.go | 4 +-- statesync/reactor.go | 4 +-- statesync/syncer.go | 47 +++++++++++++++++++++------------- test/e2e/generator/generate.go | 6 ++--- test/e2e/runner/load.go | 2 +- test/e2e/runner/rpc.go | 2 +- 7 files changed, 41 insertions(+), 29 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index d20458a43..5ba207354 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -25,8 +25,9 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### IMPROVEMENTS - [statesync] \#6566 Allow state sync fetchers and request timeout to be configurable. (@alexanderbez) -- [statesync] \#6378 Retry requests for snapshots and add a minimum discovery time (5s) for new snapshots. +- [statesync] \#6378 Retry requests for snapshots and add a minimum discovery time (5s) for new snapshots. (@tychoish) +- [statesync] \#6582 Increase chunk priority and add multiple retry chunk requests (@cmwaters) ### BUG FIXES -- [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (cmwaters) +- [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (@cmwaters) diff --git a/config/config.go b/config/config.go index 421957907..bc769e857 100644 --- a/config/config.go +++ b/config/config.go @@ -789,8 +789,8 @@ func (cfg *StateSyncConfig) ValidateBasic() error { return fmt.Errorf("invalid trusted_hash: %w", err) } - if cfg.ChunkRequestTimeout < time.Second { - return errors.New("chunk_request_timeout must be least a one second") + if cfg.ChunkRequestTimeout < 5*time.Second { + return errors.New("chunk_request_timeout must be at least 5 seconds") } if cfg.ChunkFetchers <= 0 { diff --git a/statesync/reactor.go b/statesync/reactor.go index 37ccebed1..427fe6653 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -69,8 +69,8 @@ func (r *Reactor) GetChannels() []*p2p.ChannelDescriptor { }, { ID: ChunkChannel, - Priority: 1, - SendQueueCapacity: 4, + Priority: 3, + SendQueueCapacity: 10, RecvMessageCapacity: chunkMsgSize, }, } diff --git a/statesync/syncer.go b/statesync/syncer.go index 4e792bcd9..9cf7f99ec 100644 --- a/statesync/syncer.go +++ b/statesync/syncer.go @@ -50,13 +50,14 @@ var ( // sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific // snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate. type syncer struct { - cfg config.StateSyncConfig logger log.Logger stateProvider StateProvider conn proxy.AppConnSnapshot connQuery proxy.AppConnQuery snapshots *snapshotPool tempDir string + chunkFetchers int32 + retryTimeout time.Duration mtx tmsync.RWMutex chunks *chunkQueue @@ -73,13 +74,14 @@ func newSyncer( ) *syncer { return &syncer{ - cfg: cfg, logger: logger, stateProvider: stateProvider, conn: conn, connQuery: connQuery, snapshots: newSnapshotPool(stateProvider), tempDir: tempDir, + chunkFetchers: cfg.ChunkFetchers, + retryTimeout: cfg.ChunkRequestTimeout, } } @@ -250,7 +252,7 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types. // Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - for i := int32(0); i < s.cfg.ChunkFetchers; i++ { + for i := int32(0); i < s.chunkFetchers; i++ { go s.fetchChunks(ctx, snapshot, chunks) } @@ -383,36 +385,45 @@ func (s *syncer) applyChunks(chunks *chunkQueue) error { // fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks // will be received from the reactor via syncer.AddChunks() to chunkQueue.Add(). func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) { + var ( + next = true + index uint32 + err error + ) + for { - index, err := chunks.Allocate() - if err == errDone { - // Keep checking until the context is cancelled (restore is done), in case any - // chunks need to be refetched. - select { - case <-ctx.Done(): + if next { + index, err = chunks.Allocate() + if errors.Is(err, errDone) { + // Keep checking until the context is canceled (restore is done), in case any + // chunks need to be refetched. + select { + case <-ctx.Done(): + return + default: + } + time.Sleep(2 * time.Second) + continue + } + if err != nil { + s.logger.Error("Failed to allocate chunk from queue", "err", err) return - default: } - time.Sleep(2 * time.Second) - continue - } - if err != nil { - s.logger.Error("Failed to allocate chunk from queue", "err", err) - return } s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height, "format", snapshot.Format, "chunk", index, "total", chunks.Size()) - ticker := time.NewTicker(s.cfg.ChunkRequestTimeout) + ticker := time.NewTicker(s.retryTimeout) defer ticker.Stop() s.requestChunk(snapshot, index) select { case <-chunks.WaitFor(index): + next = true case <-ticker.C: - s.requestChunk(snapshot, index) + next = false case <-ctx.Done(): return diff --git a/test/e2e/generator/generate.go b/test/e2e/generator/generate.go index 827170235..6a126f79d 100644 --- a/test/e2e/generator/generate.go +++ b/test/e2e/generator/generate.go @@ -82,10 +82,10 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er numValidators = 4 case "large": // FIXME Networks are kept small since large ones use too much CPU. - numSeeds = r.Intn(3) + numSeeds = r.Intn(2) numLightClients = r.Intn(3) - numValidators = 4 + r.Intn(7) - numFulls = r.Intn(5) + numValidators = 4 + r.Intn(4) + numFulls = r.Intn(4) default: return manifest, fmt.Errorf("unknown topology %q", opt["topology"]) } diff --git a/test/e2e/runner/load.go b/test/e2e/runner/load.go index 4b372f5a0..4e1a6ebba 100644 --- a/test/e2e/runner/load.go +++ b/test/e2e/runner/load.go @@ -81,7 +81,7 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) { select { case chTx <- tx: - time.Sleep(time.Duration(100/multiplier) * time.Millisecond) + time.Sleep(time.Second / time.Duration(multiplier)) case <-ctx.Done(): close(chTx) diff --git a/test/e2e/runner/rpc.go b/test/e2e/runner/rpc.go index adba68781..39792ba07 100644 --- a/test/e2e/runner/rpc.go +++ b/test/e2e/runner/rpc.go @@ -84,7 +84,7 @@ func waitForNode(node *e2e.Node, height int64, timeout time.Duration) (*rpctypes return status, nil } - time.Sleep(200 * time.Millisecond) + time.Sleep(300 * time.Millisecond) } }