diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 8d753f004..7e28f4586 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -88,6 +88,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi ### IMPROVEMENTS +- [statesync] \#6566 Allow state sync fetchers and request timeout to be configurable. (@alexanderbez) - [types] \#6478 Add `block_id` to `newblock` event (@jeebster) - [crypto/ed25519] \#5632 Adopt zip215 `ed25519` verification. (@marbar3778) - [privval] \#5603 Add `--key` to `init`, `gen_validator`, `testnet` & `unsafe_reset_priv_validator` for use in generating `secp256k1` keys. diff --git a/config/config.go b/config/config.go index 843b8ec18..8d2072b7b 100644 --- a/config/config.go +++ b/config/config.go @@ -806,13 +806,15 @@ func (cfg *MempoolConfig) ValidateBasic() error { // StateSyncConfig defines the configuration for the Tendermint state sync service type StateSyncConfig struct { - Enable bool `mapstructure:"enable"` - TempDir string `mapstructure:"temp-dir"` - RPCServers []string `mapstructure:"rpc-servers"` - TrustPeriod time.Duration `mapstructure:"trust-period"` - TrustHeight int64 `mapstructure:"trust-height"` - TrustHash string `mapstructure:"trust-hash"` - DiscoveryTime time.Duration `mapstructure:"discovery-time"` + Enable bool `mapstructure:"enable"` + TempDir string `mapstructure:"temp-dir"` + RPCServers []string `mapstructure:"rpc-servers"` + TrustPeriod time.Duration `mapstructure:"trust-period"` + TrustHeight int64 `mapstructure:"trust-height"` + TrustHash string `mapstructure:"trust-hash"` + DiscoveryTime time.Duration `mapstructure:"discovery-time"` + ChunkRequestTimeout time.Duration `mapstructure:"chunk-request-timeout"` + ChunkFetchers int32 `mapstructure:"chunk-fetchers"` } func (cfg *StateSyncConfig) TrustHashBytes() []byte { @@ -827,8 +829,10 @@ func (cfg *StateSyncConfig) TrustHashBytes() []byte { // DefaultStateSyncConfig returns a default configuration for the state sync service func DefaultStateSyncConfig() *StateSyncConfig { return &StateSyncConfig{ - TrustPeriod: 168 * time.Hour, - DiscoveryTime: 15 * time.Second, + TrustPeriod: 168 * time.Hour, + DiscoveryTime: 15 * time.Second, + ChunkRequestTimeout: 10 * time.Second, + ChunkFetchers: 4, } } @@ -843,14 +847,17 @@ func (cfg *StateSyncConfig) ValidateBasic() error { if len(cfg.RPCServers) == 0 { return errors.New("rpc-servers is required") } + if len(cfg.RPCServers) < 2 { return errors.New("at least two rpc-servers entries is required") } + for _, server := range cfg.RPCServers { if len(server) == 0 { return errors.New("found empty rpc-servers entry") } } + if cfg.DiscoveryTime != 0 && cfg.DiscoveryTime < 5*time.Second { return errors.New("discovery time must be 0s or greater than five seconds") } @@ -858,17 +865,29 @@ func (cfg *StateSyncConfig) ValidateBasic() error { if cfg.TrustPeriod <= 0 { return errors.New("trusted-period is required") } + if cfg.TrustHeight <= 0 { return errors.New("trusted-height is required") } + if len(cfg.TrustHash) == 0 { return errors.New("trusted-hash is required") } + _, err := hex.DecodeString(cfg.TrustHash) if err != nil { return fmt.Errorf("invalid trusted-hash: %w", err) } + + if cfg.ChunkRequestTimeout < time.Second { + return errors.New("chunk-request-timeout must be least a one second") + } + + if cfg.ChunkFetchers <= 0 { + return errors.New("chunk-fetchers is required") + } } + return nil } diff --git a/config/toml.go b/config/toml.go index 9fabda6ac..638f83f67 100644 --- a/config/toml.go +++ b/config/toml.go @@ -425,6 +425,13 @@ discovery-time = "{{ .StateSync.DiscoveryTime }}" # Will create a new, randomly named directory within, and remove it when done. temp-dir = "{{ .StateSync.TempDir }}" +# The timeout duration before re-requesting a chunk, possibly from a different +# peer (default: 1 minute). +chunk-request-timeout = "{{ .StateSync.ChunkRequestTimeout }}" + +# The number of concurrent chunk fetchers to run (default: 1). +chunk-fetchers = "{{ .StateSync.ChunkFetchers }}" + ####################################################### ### Fast Sync Configuration Connections ### ####################################################### diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index b5436d1ab..8ce906eef 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -10,6 +10,7 @@ import ( "time" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/libs/log" @@ -109,6 +110,7 @@ const ( type Reactor struct { service.BaseService + cfg config.StateSyncConfig stateStore sm.Store blockStore *store.BlockStore @@ -134,6 +136,7 @@ type Reactor struct { // and querying, references to p2p Channels and a channel to listen for peer // updates on. Note, the reactor will close all p2p Channels when stopping. func NewReactor( + cfg config.StateSyncConfig, logger log.Logger, conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, @@ -144,6 +147,7 @@ func NewReactor( tempDir string, ) *Reactor { r := &Reactor{ + cfg: cfg, conn: conn, connQuery: connQuery, snapshotCh: snapshotCh, @@ -217,7 +221,16 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) return sm.State{}, errors.New("a state sync is already in progress") } - r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir) + r.syncer = newSyncer( + r.cfg, + r.Logger, + r.conn, + r.connQuery, + stateProvider, + r.snapshotCh.Out, + r.chunkCh.Out, + r.tempDir, + ) r.mtx.Unlock() hook := func() { diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index fab6e30c7..6e3456322 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -14,6 +14,7 @@ import ( dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/statesync/mocks" "github.com/tendermint/tendermint/internal/test/factory" @@ -121,7 +122,10 @@ func setup( rts.stateStore = &smmocks.Store{} rts.blockStore = store.NewBlockStore(dbm.NewMemDB()) + cfg := config.DefaultStateSyncConfig() + rts.reactor = NewReactor( + *cfg, log.TestingLogger(), conn, connQuery, @@ -138,6 +142,7 @@ func setup( rts.reactor.dispatcher = newDispatcher(rts.blockChannel.Out, 1*time.Second) rts.syncer = newSyncer( + *cfg, log.NewNopLogger(), conn, connQuery, diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index d58c27d61..1eeb3ba3a 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -8,6 +8,7 @@ import ( "time" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/libs/log" @@ -18,12 +19,9 @@ import ( ) const ( - // chunkFetchers is the number of concurrent chunk fetchers to run. - chunkFetchers = 4 // chunkTimeout is the timeout while waiting for the next chunk from the chunk queue. chunkTimeout = 2 * time.Minute - // requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer. - chunkRequestTimeout = 10 * time.Second + // minimumDiscoveryTime is the lowest allowable time for a // SyncAny discovery time. minimumDiscoveryTime = 5 * time.Second @@ -52,6 +50,7 @@ var ( // sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific // snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate. type syncer struct { + cfg config.StateSyncConfig logger log.Logger stateProvider StateProvider conn proxy.AppConnSnapshot @@ -67,6 +66,7 @@ type syncer struct { // newSyncer creates a new syncer. func newSyncer( + cfg config.StateSyncConfig, logger log.Logger, conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, @@ -75,6 +75,7 @@ func newSyncer( tempDir string, ) *syncer { return &syncer{ + cfg: cfg, logger: logger, stateProvider: stateProvider, conn: conn, @@ -256,7 +257,7 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types. // Spawn chunk fetchers. They will terminate when the chunk queue is closed or context canceled. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - for i := int32(0); i < chunkFetchers; i++ { + for i := int32(0); i < s.cfg.ChunkFetchers; i++ { go s.fetchChunks(ctx, snapshot, chunks) } @@ -410,16 +411,21 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height, "format", snapshot.Format, "chunk", index, "total", chunks.Size()) - ticker := time.NewTicker(chunkRequestTimeout) + ticker := time.NewTicker(s.cfg.ChunkRequestTimeout) defer ticker.Stop() + s.requestChunk(snapshot, index) + select { case <-chunks.WaitFor(index): + case <-ticker.C: s.requestChunk(snapshot, index) + case <-ctx.Done(): return } + ticker.Stop() } } diff --git a/node/node.go b/node/node.go index 45f72fd74..5c3cb0118 100644 --- a/node/node.go +++ b/node/node.go @@ -335,6 +335,7 @@ func makeNode(config *cfg.Config, } stateSyncReactor = statesync.NewReactor( + *config.StateSync, stateSyncReactorShim.Logger, proxyApp.Snapshot(), proxyApp.Query(),