From be8c9833ca16c979bc3b0861bd9526e8503be336 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 15 Jun 2021 15:10:16 -0400 Subject: [PATCH] state sync: tune request timeout and chunkers (backport #6566) (#6581) * state sync: tune request timeout and chunkers (#6566) (cherry picked from commit 7d961b55b2132d53ccf7ee8d6c86b84fc7fc9ddc) # Conflicts: # CHANGELOG_PENDING.md # config/config.go # internal/statesync/reactor.go # internal/statesync/reactor_test.go # node/node.go # statesync/syncer.go * fix build * fix config * fix config Co-authored-by: Aleksandr Bezobchuk Co-authored-by: Aleksandr Bezobchuk --- CHANGELOG_PENDING.md | 1 + config/config.go | 37 ++++++++++++++++++++++++++++--------- config/toml.go | 7 +++++++ node/node.go | 8 ++++++-- statesync/reactor.go | 14 ++++++++++++-- statesync/reactor_test.go | 7 +++++-- statesync/syncer.go | 28 ++++++++++++++++++++-------- statesync/syncer_test.go | 24 ++++++++++++++++++------ test/maverick/node/node.go | 8 ++++++-- 9 files changed, 103 insertions(+), 31 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 1206ff5cf..d20458a43 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -24,6 +24,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### IMPROVEMENTS +- [statesync] \#6566 Allow state sync fetchers and request timeout to be configurable. (@alexanderbez) - [statesync] \#6378 Retry requests for snapshots and add a minimum discovery time (5s) for new snapshots. ### BUG FIXES diff --git a/config/config.go b/config/config.go index 1c261cf9b..421957907 100644 --- a/config/config.go +++ b/config/config.go @@ -716,13 +716,15 @@ func (cfg *MempoolConfig) ValidateBasic() error { // StateSyncConfig defines the configuration for the Tendermint state sync service type StateSyncConfig struct { - Enable bool `mapstructure:"enable"` - TempDir string `mapstructure:"temp_dir"` - RPCServers []string `mapstructure:"rpc_servers"` - TrustPeriod time.Duration `mapstructure:"trust_period"` - TrustHeight int64 `mapstructure:"trust_height"` - TrustHash string `mapstructure:"trust_hash"` - DiscoveryTime time.Duration `mapstructure:"discovery_time"` + Enable bool `mapstructure:"enable"` + TempDir string `mapstructure:"temp_dir"` + RPCServers []string `mapstructure:"rpc_servers"` + TrustPeriod time.Duration `mapstructure:"trust_period"` + TrustHeight int64 `mapstructure:"trust_height"` + TrustHash string `mapstructure:"trust_hash"` + DiscoveryTime time.Duration `mapstructure:"discovery_time"` + ChunkRequestTimeout time.Duration `mapstructure:"chunk_request_timeout"` + ChunkFetchers int32 `mapstructure:"chunk_fetchers"` } func (cfg *StateSyncConfig) TrustHashBytes() []byte { @@ -737,8 +739,10 @@ func (cfg *StateSyncConfig) TrustHashBytes() []byte { // DefaultStateSyncConfig returns a default configuration for the state sync service func DefaultStateSyncConfig() *StateSyncConfig { return &StateSyncConfig{ - TrustPeriod: 168 * time.Hour, - DiscoveryTime: 15 * time.Second, + TrustPeriod: 168 * time.Hour, + DiscoveryTime: 15 * time.Second, + ChunkRequestTimeout: 10 * time.Second, + ChunkFetchers: 4, } } @@ -753,14 +757,17 @@ func (cfg *StateSyncConfig) ValidateBasic() error { if len(cfg.RPCServers) == 0 { return errors.New("rpc_servers is required") } + if len(cfg.RPCServers) < 2 { return errors.New("at least two rpc_servers entries is required") } + for _, server := range cfg.RPCServers { if len(server) == 0 { return errors.New("found empty rpc_servers entry") } } + if cfg.DiscoveryTime != 0 && cfg.DiscoveryTime < 5*time.Second { return errors.New("discovery time must be 0s or greater than five seconds") } @@ -768,17 +775,29 @@ func (cfg *StateSyncConfig) ValidateBasic() error { if cfg.TrustPeriod <= 0 { return errors.New("trusted_period is required") } + if cfg.TrustHeight <= 0 { return errors.New("trusted_height is required") } + if len(cfg.TrustHash) == 0 { return errors.New("trusted_hash is required") } + _, err := hex.DecodeString(cfg.TrustHash) if err != nil { return fmt.Errorf("invalid trusted_hash: %w", err) } + + if cfg.ChunkRequestTimeout < time.Second { + return errors.New("chunk_request_timeout must be least a one second") + } + + if cfg.ChunkFetchers <= 0 { + return errors.New("chunk_fetchers is required") + } } + return nil } diff --git a/config/toml.go b/config/toml.go index 82ecdda20..d89c7e44c 100644 --- a/config/toml.go +++ b/config/toml.go @@ -372,6 +372,13 @@ discovery_time = "{{ .StateSync.DiscoveryTime }}" # Will create a new, randomly named directory within, and remove it when done. temp_dir = "{{ .StateSync.TempDir }}" +# The timeout duration before re-requesting a chunk, possibly from a different +# peer (default: 1 minute). +chunk_request_timeout = "{{ .StateSync.ChunkRequestTimeout }}" + +# The number of concurrent chunk fetchers to run (default: 1). +chunk_fetchers = "{{ .StateSync.ChunkFetchers }}" + ####################################################### ### Fast Sync Configuration Connections ### ####################################################### diff --git a/node/node.go b/node/node.go index d395a7d66..205c1f747 100644 --- a/node/node.go +++ b/node/node.go @@ -774,8 +774,12 @@ func NewNode(config *cfg.Config, // FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 - stateSyncReactor := statesync.NewReactor(proxyApp.Snapshot(), proxyApp.Query(), - config.StateSync.TempDir) + stateSyncReactor := statesync.NewReactor( + *config.StateSync, + proxyApp.Snapshot(), + proxyApp.Query(), + config.StateSync.TempDir, + ) stateSyncReactor.SetLogger(logger.With("module", "statesync")) nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state) diff --git a/statesync/reactor.go b/statesync/reactor.go index 1a3cdbff5..37ccebed1 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -6,6 +6,7 @@ import ( "time" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/p2p" ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" @@ -28,6 +29,7 @@ const ( type Reactor struct { p2p.BaseReactor + cfg config.StateSyncConfig conn proxy.AppConnSnapshot connQuery proxy.AppConnQuery tempDir string @@ -39,12 +41,20 @@ type Reactor struct { } // NewReactor creates a new state sync reactor. -func NewReactor(conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, tempDir string) *Reactor { +func NewReactor( + cfg config.StateSyncConfig, + conn proxy.AppConnSnapshot, + connQuery proxy.AppConnQuery, + tempDir string, +) *Reactor { + r := &Reactor{ + cfg: cfg, conn: conn, connQuery: connQuery, } r.BaseReactor = *p2p.NewBaseReactor("StateSync", r) + return r } @@ -252,7 +262,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) r.mtx.Unlock() return sm.State{}, nil, errors.New("a state sync is already in progress") } - r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir) + r.syncer = newSyncer(r.cfg, r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir) r.mtx.Unlock() hook := func() { diff --git a/statesync/reactor_test.go b/statesync/reactor_test.go index 49d8376b8..053b47ef5 100644 --- a/statesync/reactor_test.go +++ b/statesync/reactor_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" p2pmocks "github.com/tendermint/tendermint/p2p/mocks" ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" @@ -60,7 +61,8 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) { } // Start a reactor and send a ssproto.ChunkRequest, then wait for and check response - r := NewReactor(conn, nil, "") + cfg := config.DefaultStateSyncConfig() + r := NewReactor(*cfg, conn, nil, "") err := r.Start() require.NoError(t, err) t.Cleanup(func() { @@ -137,7 +139,8 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) { } // Start a reactor and send a SnapshotsRequestMessage, then wait for and check responses - r := NewReactor(conn, nil, "") + cfg := config.DefaultStateSyncConfig() + r := NewReactor(*cfg, conn, nil, "") err := r.Start() require.NoError(t, err) t.Cleanup(func() { diff --git a/statesync/syncer.go b/statesync/syncer.go index f38133e3d..4e792bcd9 100644 --- a/statesync/syncer.go +++ b/statesync/syncer.go @@ -8,6 +8,7 @@ import ( "time" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/p2p" @@ -18,12 +19,9 @@ import ( ) const ( - // chunkFetchers is the number of concurrent chunk fetchers to run. - chunkFetchers = 4 // chunkTimeout is the timeout while waiting for the next chunk from the chunk queue. chunkTimeout = 2 * time.Minute - // requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer. - chunkRequestTimeout = 10 * time.Second + // minimumDiscoveryTime is the lowest allowable time for a // SyncAny discovery time. minimumDiscoveryTime = 5 * time.Second @@ -52,6 +50,7 @@ var ( // sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific // snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate. type syncer struct { + cfg config.StateSyncConfig logger log.Logger stateProvider StateProvider conn proxy.AppConnSnapshot @@ -64,9 +63,17 @@ type syncer struct { } // newSyncer creates a new syncer. -func newSyncer(logger log.Logger, conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, - stateProvider StateProvider, tempDir string) *syncer { +func newSyncer( + cfg config.StateSyncConfig, + logger log.Logger, + conn proxy.AppConnSnapshot, + connQuery proxy.AppConnQuery, + stateProvider StateProvider, + tempDir string, +) *syncer { + return &syncer{ + cfg: cfg, logger: logger, stateProvider: stateProvider, conn: conn, @@ -243,7 +250,7 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types. // Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - for i := int32(0); i < chunkFetchers; i++ { + for i := int32(0); i < s.cfg.ChunkFetchers; i++ { go s.fetchChunks(ctx, snapshot, chunks) } @@ -396,16 +403,21 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height, "format", snapshot.Format, "chunk", index, "total", chunks.Size()) - ticker := time.NewTicker(chunkRequestTimeout) + ticker := time.NewTicker(s.cfg.ChunkRequestTimeout) defer ticker.Stop() + s.requestChunk(snapshot, index) + select { case <-chunks.WaitFor(index): + case <-ticker.C: s.requestChunk(snapshot, index) + case <-ctx.Done(): return } + ticker.Stop() } } diff --git a/statesync/syncer_test.go b/statesync/syncer_test.go index a534f0452..4fd74a794 100644 --- a/statesync/syncer_test.go +++ b/statesync/syncer_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/p2p" @@ -31,7 +32,9 @@ func setupOfferSyncer(t *testing.T) (*syncer, *proxymocks.AppConnSnapshot) { connSnapshot := &proxymocks.AppConnSnapshot{} stateProvider := &mocks.StateProvider{} stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") + cfg := config.DefaultStateSyncConfig() + syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") + return syncer, connSnapshot } @@ -83,7 +86,8 @@ func TestSyncer_SyncAny(t *testing.T) { connSnapshot := &proxymocks.AppConnSnapshot{} connQuery := &proxymocks.AppConnQuery{} - syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") + cfg := config.DefaultStateSyncConfig() + syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") // Adding a chunk should error when no sync is in progress _, err := syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}}) @@ -406,7 +410,9 @@ func TestSyncer_applyChunks_Results(t *testing.T) { connSnapshot := &proxymocks.AppConnSnapshot{} stateProvider := &mocks.StateProvider{} stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") + + cfg := config.DefaultStateSyncConfig() + syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") body := []byte{1, 2, 3} chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "") @@ -457,7 +463,9 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) { connSnapshot := &proxymocks.AppConnSnapshot{} stateProvider := &mocks.StateProvider{} stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") + + cfg := config.DefaultStateSyncConfig() + syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "") require.NoError(t, err) @@ -520,7 +528,9 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) { connSnapshot := &proxymocks.AppConnSnapshot{} stateProvider := &mocks.StateProvider{} stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) - syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") + + cfg := config.DefaultStateSyncConfig() + syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") // Set up three peers across two snapshots, and ask for one of them to be banned. // It should be banned from all snapshots. @@ -633,7 +643,9 @@ func TestSyncer_verifyApp(t *testing.T) { connQuery := &proxymocks.AppConnQuery{} connSnapshot := &proxymocks.AppConnSnapshot{} stateProvider := &mocks.StateProvider{} - syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") + + cfg := config.DefaultStateSyncConfig() + syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") connQuery.On("InfoSync", proxy.RequestInfo).Return(tc.response, tc.err) version, err := syncer.verifyApp(s) diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index f2aa79e02..7383169dc 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -809,8 +809,12 @@ func NewNode(config *cfg.Config, // FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 - stateSyncReactor := statesync.NewReactor(proxyApp.Snapshot(), proxyApp.Query(), - config.StateSync.TempDir) + stateSyncReactor := statesync.NewReactor( + *config.StateSync, + proxyApp.Snapshot(), + proxyApp.Query(), + config.StateSync.TempDir, + ) stateSyncReactor.SetLogger(logger.With("module", "statesync")) nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state)