diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 316436001..1f7ae9a47 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -122,6 +122,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - [crypto/merkle] \#6513 Optimize HashAlternatives (@marbar3778) - [p2p/pex] \#6509 Improve addrBook.hash performance (@cuonglm) - [consensus/metrics] \#6549 Change block_size gauge to a histogram for better observability over time (@marbar3778) +- [statesync] \#6587 Increase chunk priority and re-request chunks that don't arrive (@cmwaters) ### BUG FIXES diff --git a/config/config.go b/config/config.go index 73f6b4f66..7c38aabc6 100644 --- a/config/config.go +++ b/config/config.go @@ -817,7 +817,7 @@ type StateSyncConfig struct { TrustHash string `mapstructure:"trust-hash"` DiscoveryTime time.Duration `mapstructure:"discovery-time"` ChunkRequestTimeout time.Duration `mapstructure:"chunk-request-timeout"` - ChunkFetchers int32 `mapstructure:"chunk-fetchers"` + Fetchers int32 `mapstructure:"fetchers"` } func (cfg *StateSyncConfig) TrustHashBytes() []byte { @@ -834,8 +834,8 @@ func DefaultStateSyncConfig() *StateSyncConfig { return &StateSyncConfig{ TrustPeriod: 168 * time.Hour, DiscoveryTime: 15 * time.Second, - ChunkRequestTimeout: 10 * time.Second, - ChunkFetchers: 4, + ChunkRequestTimeout: 15 * time.Second, + Fetchers: 4, } } @@ -882,12 +882,12 @@ func (cfg *StateSyncConfig) ValidateBasic() error { return fmt.Errorf("invalid trusted-hash: %w", err) } - if cfg.ChunkRequestTimeout < time.Second { - return errors.New("chunk-request-timeout must be least a one second") + if cfg.ChunkRequestTimeout < 5*time.Second { + return errors.New("chunk-request-timeout must be at least 5 seconds") } - if cfg.ChunkFetchers <= 0 { - return errors.New("chunk-fetchers is required") + if cfg.Fetchers <= 0 { + return errors.New("fetchers is required") } } diff --git a/config/toml.go b/config/toml.go index 638f83f67..015a2f8de 100644 --- a/config/toml.go +++ b/config/toml.go @@ -426,11 +426,11 @@ discovery-time = "{{ .StateSync.DiscoveryTime }}" temp-dir = "{{ .StateSync.TempDir }}" # The timeout duration before re-requesting a chunk, possibly from a different -# peer (default: 1 minute). +# peer (default: 15 seconds). chunk-request-timeout = "{{ .StateSync.ChunkRequestTimeout }}" -# The number of concurrent chunk fetchers to run (default: 1). -chunk-fetchers = "{{ .StateSync.ChunkFetchers }}" +# The number of concurrent chunk and block fetchers to run (default: 4). +fetchers = "{{ .StateSync.Fetchers }}" ####################################################### ### Fast Sync Configuration Connections ### diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index be88bc545..513000c1d 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -38,7 +38,7 @@ var ( MsgType: new(ssproto.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(SnapshotChannel), - Priority: 5, + Priority: 6, SendQueueCapacity: 10, RecvMessageCapacity: snapshotMsgSize, @@ -49,7 +49,7 @@ var ( MsgType: new(ssproto.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(ChunkChannel), - Priority: 1, + Priority: 3, SendQueueCapacity: 4, RecvMessageCapacity: chunkMsgSize, @@ -60,7 +60,7 @@ var ( MsgType: new(ssproto.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(LightBlockChannel), - Priority: 1, + Priority: 2, SendQueueCapacity: 10, RecvMessageCapacity: lightBlockMsgSize, @@ -99,10 +99,6 @@ const ( // maxLightBlockRequestRetries is the amount of retries acceptable before // the backfill process aborts maxLightBlockRequestRetries = 20 - - // the amount of processes fetching light blocks - this should be roughly calculated - // as the time to fetch a block / time to verify a block - lightBlockFetchers = 4 ) // Reactor handles state sync, both restoring snapshots for the local node and @@ -214,7 +210,11 @@ func (r *Reactor) OnStop() { // application. It also saves tendermint state and runs a backfill process to // retrieve the necessary amount of headers, commits and validators sets to be // able to process evidence and participate in consensus. -func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) (sm.State, error) { +func (r *Reactor) Sync( + ctx context.Context, + stateProvider StateProvider, + discoveryTime time.Duration, +) (sm.State, error) { r.mtx.Lock() if r.syncer != nil { r.mtx.Unlock() @@ -233,18 +233,15 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) ) r.mtx.Unlock() - hook := func() { + requestSnapshotsHook := func() { // request snapshots from all currently connected peers - r.Logger.Debug("requesting snapshots from known peers") r.snapshotCh.Out <- p2p.Envelope{ Broadcast: true, Message: &ssproto.SnapshotsRequest{}, } } - hook() - - state, commit, err := r.syncer.SyncAny(discoveryTime, hook) + state, commit, err := r.syncer.SyncAny(ctx, discoveryTime, requestSnapshotsHook) if err != nil { return sm.State{}, err } @@ -312,7 +309,7 @@ func (r *Reactor) backfill( // time. Ideally we want the verification process to never have to be // waiting on blocks. If it takes 4s to retrieve a block and 1s to verify // it, then steady state involves four workers. - for i := 0; i < lightBlockFetchers; i++ { + for i := 0; i < int(r.cfg.Fetchers); i++ { go func() { for { select { diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 1eeb3ba3a..93f027f67 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -50,7 +50,6 @@ var ( // sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific // snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate. type syncer struct { - cfg config.StateSyncConfig logger log.Logger stateProvider StateProvider conn proxy.AppConnSnapshot @@ -59,6 +58,8 @@ type syncer struct { snapshotCh chan<- p2p.Envelope chunkCh chan<- p2p.Envelope tempDir string + fetchers int32 + retryTimeout time.Duration mtx tmsync.RWMutex chunks *chunkQueue @@ -75,7 +76,6 @@ func newSyncer( tempDir string, ) *syncer { return &syncer{ - cfg: cfg, logger: logger, stateProvider: stateProvider, conn: conn, @@ -84,6 +84,8 @@ func newSyncer( snapshotCh: snapshotCh, chunkCh: chunkCh, tempDir: tempDir, + fetchers: cfg.Fetchers, + retryTimeout: cfg.ChunkRequestTimeout, } } @@ -142,12 +144,18 @@ func (s *syncer) RemovePeer(peerID p2p.NodeID) { // SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further // snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit // which the caller must use to bootstrap the node. -func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.State, *types.Commit, error) { +func (s *syncer) SyncAny( + ctx context.Context, + discoveryTime time.Duration, + requestSnapshots func(), +) (sm.State, *types.Commit, error) { + if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime { discoveryTime = minimumDiscoveryTime } if discoveryTime > 0 { + requestSnapshots() s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime)) time.Sleep(discoveryTime) } @@ -169,7 +177,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.Stat if discoveryTime == 0 { return sm.State{}, nil, errNoSnapshots } - retryHook() + requestSnapshots() s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime)) time.Sleep(discoveryTime) continue @@ -182,7 +190,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.Stat defer chunks.Close() // in case we forget to close it elsewhere } - newState, commit, err := s.Sync(snapshot, chunks) + newState, commit, err := s.Sync(ctx, snapshot, chunks) switch { case err == nil: return newState, commit, nil @@ -234,7 +242,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.Stat // Sync executes a sync for a specific snapshot, returning the latest state and block commit which // the caller must use to bootstrap the node. -func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.Commit, error) { +func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.Commit, error) { s.mtx.Lock() if s.chunks != nil { s.mtx.Unlock() @@ -249,19 +257,19 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types. }() // Offer snapshot to ABCI app. - err := s.offerSnapshot(snapshot) + err := s.offerSnapshot(ctx, snapshot) if err != nil { return sm.State{}, nil, err } // Spawn chunk fetchers. They will terminate when the chunk queue is closed or context canceled. - ctx, cancel := context.WithCancel(context.Background()) + fetchCtx, cancel := context.WithCancel(ctx) defer cancel() - for i := int32(0); i < s.cfg.ChunkFetchers; i++ { - go s.fetchChunks(ctx, snapshot, chunks) + for i := int32(0); i < s.fetchers; i++ { + go s.fetchChunks(fetchCtx, snapshot, chunks) } - pctx, pcancel := context.WithTimeout(context.Background(), 10*time.Second) + pctx, pcancel := context.WithTimeout(ctx, 10*time.Second) defer pcancel() // Optimistically build new state, so we don't discover any light client failures at the end. @@ -275,7 +283,7 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types. } // Restore snapshot - err = s.applyChunks(chunks) + err = s.applyChunks(ctx, chunks) if err != nil { return sm.State{}, nil, err } @@ -296,10 +304,10 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types. // offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's // response, or nil if the snapshot was accepted. -func (s *syncer) offerSnapshot(snapshot *snapshot) error { +func (s *syncer) offerSnapshot(ctx context.Context, snapshot *snapshot) error { s.logger.Info("Offering snapshot to ABCI app", "height", snapshot.Height, "format", snapshot.Format, "hash", snapshot.Hash) - resp, err := s.conn.OfferSnapshotSync(context.Background(), abci.RequestOfferSnapshot{ + resp, err := s.conn.OfferSnapshotSync(ctx, abci.RequestOfferSnapshot{ Snapshot: &abci.Snapshot{ Height: snapshot.Height, Format: snapshot.Format, @@ -332,7 +340,7 @@ func (s *syncer) offerSnapshot(snapshot *snapshot) error { // applyChunks applies chunks to the app. It returns various errors depending on the app's // response, or nil once the snapshot is fully restored. -func (s *syncer) applyChunks(chunks *chunkQueue) error { +func (s *syncer) applyChunks(ctx context.Context, chunks *chunkQueue) error { for { chunk, err := chunks.Next() if err == errDone { @@ -341,7 +349,7 @@ func (s *syncer) applyChunks(chunks *chunkQueue) error { return fmt.Errorf("failed to fetch chunk: %w", err) } - resp, err := s.conn.ApplySnapshotChunkSync(context.Background(), abci.RequestApplySnapshotChunk{ + resp, err := s.conn.ApplySnapshotChunkSync(ctx, abci.RequestApplySnapshotChunk{ Index: chunk.Index, Chunk: chunk.Chunk, Sender: string(chunk.Sender), @@ -391,36 +399,44 @@ func (s *syncer) applyChunks(chunks *chunkQueue) error { // fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks // will be received from the reactor via syncer.AddChunks() to chunkQueue.Add(). func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) { + var ( + next = true + index uint32 + err error + ) + for { - index, err := chunks.Allocate() - if err == errDone { - // Keep checking until the context is canceled (restore is done), in case any - // chunks need to be refetched. - select { - case <-ctx.Done(): + if next { + index, err = chunks.Allocate() + if errors.Is(err, errDone) { + // Keep checking until the context is canceled (restore is done), in case any + // chunks need to be refetched. + select { + case <-ctx.Done(): + return + case <-time.After(2 * time.Second): + continue + } + } + if err != nil { + s.logger.Error("Failed to allocate chunk from queue", "err", err) return - default: } - time.Sleep(2 * time.Second) - continue - } - if err != nil { - s.logger.Error("Failed to allocate chunk from queue", "err", err) - return } s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height, "format", snapshot.Format, "chunk", index, "total", chunks.Size()) - ticker := time.NewTicker(s.cfg.ChunkRequestTimeout) + ticker := time.NewTicker(s.retryTimeout) defer ticker.Stop() s.requestChunk(snapshot, index) select { case <-chunks.WaitFor(index): + next = true case <-ticker.C: - s.requestChunk(snapshot, index) + next = false case <-ctx.Done(): return diff --git a/internal/statesync/syncer_test.go b/internal/statesync/syncer_test.go index e8c026106..cb7d600f2 100644 --- a/internal/statesync/syncer_test.go +++ b/internal/statesync/syncer_test.go @@ -180,7 +180,7 @@ func TestSyncer_SyncAny(t *testing.T) { LastBlockAppHash: []byte("app_hash"), }, nil) - newState, lastCommit, err := rts.syncer.SyncAny(0, func() {}) + newState, lastCommit, err := rts.syncer.SyncAny(ctx, 0, func() {}) require.NoError(t, err) wg.Wait() @@ -206,7 +206,7 @@ func TestSyncer_SyncAny_noSnapshots(t *testing.T) { rts := setup(t, nil, nil, stateProvider, 2) - _, _, err := rts.syncer.SyncAny(0, func() {}) + _, _, err := rts.syncer.SyncAny(ctx, 0, func() {}) require.Equal(t, errNoSnapshots, err) } @@ -226,7 +226,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) { Snapshot: toABCI(s), AppHash: []byte("app_hash"), }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil) - _, _, err = rts.syncer.SyncAny(0, func() {}) + _, _, err = rts.syncer.SyncAny(ctx, 0, func() {}) require.Equal(t, errAbort, err) rts.conn.AssertExpectations(t) } @@ -265,7 +265,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) { Snapshot: toABCI(s11), AppHash: []byte("app_hash"), }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) - _, _, err = rts.syncer.SyncAny(0, func() {}) + _, _, err = rts.syncer.SyncAny(ctx, 0, func() {}) require.Equal(t, errNoSnapshots, err) rts.conn.AssertExpectations(t) } @@ -300,7 +300,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) { Snapshot: toABCI(s11), AppHash: []byte("app_hash"), }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil) - _, _, err = rts.syncer.SyncAny(0, func() {}) + _, _, err = rts.syncer.SyncAny(ctx, 0, func() {}) require.Equal(t, errAbort, err) rts.conn.AssertExpectations(t) } @@ -346,7 +346,7 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) { Snapshot: toABCI(sa), AppHash: []byte("app_hash"), }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) - _, _, err = rts.syncer.SyncAny(0, func() {}) + _, _, err = rts.syncer.SyncAny(ctx, 0, func() {}) require.Equal(t, errNoSnapshots, err) rts.conn.AssertExpectations(t) } @@ -369,7 +369,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) { Snapshot: toABCI(s), AppHash: []byte("app_hash"), }).Once().Return(nil, errBoom) - _, _, err = rts.syncer.SyncAny(0, func() {}) + _, _, err = rts.syncer.SyncAny(ctx, 0, func() {}) require.True(t, errors.Is(err, errBoom)) rts.conn.AssertExpectations(t) } @@ -406,7 +406,7 @@ func TestSyncer_offerSnapshot(t *testing.T) { AppHash: []byte("app_hash"), }).Return(&abci.ResponseOfferSnapshot{Result: tc.result}, tc.err) - err := rts.syncer.offerSnapshot(s) + err := rts.syncer.offerSnapshot(ctx, s) if tc.expectErr == unknownErr { require.Error(t, err) } else { @@ -462,7 +462,7 @@ func TestSyncer_applyChunks_Results(t *testing.T) { Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) } - err = rts.syncer.applyChunks(chunks) + err = rts.syncer.applyChunks(ctx, chunks) if tc.expectErr == unknownErr { require.Error(t, err) } else { @@ -527,7 +527,7 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) { // check the queue contents, and finally close the queue to end the goroutine. // We don't really care about the result of applyChunks, since it has separate test. go func() { - rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error + rts.syncer.applyChunks(ctx, chunks) //nolint:errcheck // purposefully ignore error }() time.Sleep(50 * time.Millisecond) @@ -626,7 +626,7 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) { // However, it will block on e.g. retry result, so we spawn a goroutine that will // be shut down when the chunk queue closes. go func() { - rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error + rts.syncer.applyChunks(ctx, chunks) //nolint:errcheck // purposefully ignore error }() time.Sleep(50 * time.Millisecond) diff --git a/node/node.go b/node/node.go index fe14c0044..9b9444a5e 100644 --- a/node/node.go +++ b/node/node.go @@ -1055,7 +1055,7 @@ func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reacto } go func() { - state, err := ssR.Sync(stateProvider, config.DiscoveryTime) + state, err := ssR.Sync(context.TODO(), stateProvider, config.DiscoveryTime) if err != nil { ssR.Logger.Error("state sync failed", "err", err) return