Browse Source

state sync: tune request timeout and chunkers (#6566)

pull/6586/head
Aleksandr Bezobchuk 3 years ago
committed by GitHub
parent
commit
7d961b55b2
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 68 additions and 16 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +28
    -9
      config/config.go
  3. +7
    -0
      config/toml.go
  4. +14
    -1
      internal/statesync/reactor.go
  5. +5
    -0
      internal/statesync/reactor_test.go
  6. +12
    -6
      internal/statesync/syncer.go
  7. +1
    -0
      node/node.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -88,6 +88,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
### IMPROVEMENTS ### IMPROVEMENTS
- [statesync] \#6566 Allow state sync fetchers and request timeout to be configurable. (@alexanderbez)
- [types] \#6478 Add `block_id` to `newblock` event (@jeebster) - [types] \#6478 Add `block_id` to `newblock` event (@jeebster)
- [crypto/ed25519] \#5632 Adopt zip215 `ed25519` verification. (@marbar3778) - [crypto/ed25519] \#5632 Adopt zip215 `ed25519` verification. (@marbar3778)
- [privval] \#5603 Add `--key` to `init`, `gen_validator`, `testnet` & `unsafe_reset_priv_validator` for use in generating `secp256k1` keys. - [privval] \#5603 Add `--key` to `init`, `gen_validator`, `testnet` & `unsafe_reset_priv_validator` for use in generating `secp256k1` keys.


+ 28
- 9
config/config.go View File

@ -806,13 +806,15 @@ func (cfg *MempoolConfig) ValidateBasic() error {
// StateSyncConfig defines the configuration for the Tendermint state sync service // StateSyncConfig defines the configuration for the Tendermint state sync service
type StateSyncConfig struct { type StateSyncConfig struct {
Enable bool `mapstructure:"enable"`
TempDir string `mapstructure:"temp-dir"`
RPCServers []string `mapstructure:"rpc-servers"`
TrustPeriod time.Duration `mapstructure:"trust-period"`
TrustHeight int64 `mapstructure:"trust-height"`
TrustHash string `mapstructure:"trust-hash"`
DiscoveryTime time.Duration `mapstructure:"discovery-time"`
Enable bool `mapstructure:"enable"`
TempDir string `mapstructure:"temp-dir"`
RPCServers []string `mapstructure:"rpc-servers"`
TrustPeriod time.Duration `mapstructure:"trust-period"`
TrustHeight int64 `mapstructure:"trust-height"`
TrustHash string `mapstructure:"trust-hash"`
DiscoveryTime time.Duration `mapstructure:"discovery-time"`
ChunkRequestTimeout time.Duration `mapstructure:"chunk-request-timeout"`
ChunkFetchers int32 `mapstructure:"chunk-fetchers"`
} }
func (cfg *StateSyncConfig) TrustHashBytes() []byte { func (cfg *StateSyncConfig) TrustHashBytes() []byte {
@ -827,8 +829,10 @@ func (cfg *StateSyncConfig) TrustHashBytes() []byte {
// DefaultStateSyncConfig returns a default configuration for the state sync service // DefaultStateSyncConfig returns a default configuration for the state sync service
func DefaultStateSyncConfig() *StateSyncConfig { func DefaultStateSyncConfig() *StateSyncConfig {
return &StateSyncConfig{ return &StateSyncConfig{
TrustPeriod: 168 * time.Hour,
DiscoveryTime: 15 * time.Second,
TrustPeriod: 168 * time.Hour,
DiscoveryTime: 15 * time.Second,
ChunkRequestTimeout: 10 * time.Second,
ChunkFetchers: 4,
} }
} }
@ -843,14 +847,17 @@ func (cfg *StateSyncConfig) ValidateBasic() error {
if len(cfg.RPCServers) == 0 { if len(cfg.RPCServers) == 0 {
return errors.New("rpc-servers is required") return errors.New("rpc-servers is required")
} }
if len(cfg.RPCServers) < 2 { if len(cfg.RPCServers) < 2 {
return errors.New("at least two rpc-servers entries is required") return errors.New("at least two rpc-servers entries is required")
} }
for _, server := range cfg.RPCServers { for _, server := range cfg.RPCServers {
if len(server) == 0 { if len(server) == 0 {
return errors.New("found empty rpc-servers entry") return errors.New("found empty rpc-servers entry")
} }
} }
if cfg.DiscoveryTime != 0 && cfg.DiscoveryTime < 5*time.Second { if cfg.DiscoveryTime != 0 && cfg.DiscoveryTime < 5*time.Second {
return errors.New("discovery time must be 0s or greater than five seconds") return errors.New("discovery time must be 0s or greater than five seconds")
} }
@ -858,17 +865,29 @@ func (cfg *StateSyncConfig) ValidateBasic() error {
if cfg.TrustPeriod <= 0 { if cfg.TrustPeriod <= 0 {
return errors.New("trusted-period is required") return errors.New("trusted-period is required")
} }
if cfg.TrustHeight <= 0 { if cfg.TrustHeight <= 0 {
return errors.New("trusted-height is required") return errors.New("trusted-height is required")
} }
if len(cfg.TrustHash) == 0 { if len(cfg.TrustHash) == 0 {
return errors.New("trusted-hash is required") return errors.New("trusted-hash is required")
} }
_, err := hex.DecodeString(cfg.TrustHash) _, err := hex.DecodeString(cfg.TrustHash)
if err != nil { if err != nil {
return fmt.Errorf("invalid trusted-hash: %w", err) return fmt.Errorf("invalid trusted-hash: %w", err)
} }
if cfg.ChunkRequestTimeout < time.Second {
return errors.New("chunk-request-timeout must be least a one second")
}
if cfg.ChunkFetchers <= 0 {
return errors.New("chunk-fetchers is required")
}
} }
return nil return nil
} }


+ 7
- 0
config/toml.go View File

@ -425,6 +425,13 @@ discovery-time = "{{ .StateSync.DiscoveryTime }}"
# Will create a new, randomly named directory within, and remove it when done. # Will create a new, randomly named directory within, and remove it when done.
temp-dir = "{{ .StateSync.TempDir }}" temp-dir = "{{ .StateSync.TempDir }}"
# The timeout duration before re-requesting a chunk, possibly from a different
# peer (default: 1 minute).
chunk-request-timeout = "{{ .StateSync.ChunkRequestTimeout }}"
# The number of concurrent chunk fetchers to run (default: 1).
chunk-fetchers = "{{ .StateSync.ChunkFetchers }}"
####################################################### #######################################################
### Fast Sync Configuration Connections ### ### Fast Sync Configuration Connections ###
####################################################### #######################################################


+ 14
- 1
internal/statesync/reactor.go View File

@ -10,6 +10,7 @@ import (
"time" "time"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
tmsync "github.com/tendermint/tendermint/internal/libs/sync" tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -109,6 +110,7 @@ const (
type Reactor struct { type Reactor struct {
service.BaseService service.BaseService
cfg config.StateSyncConfig
stateStore sm.Store stateStore sm.Store
blockStore *store.BlockStore blockStore *store.BlockStore
@ -134,6 +136,7 @@ type Reactor struct {
// and querying, references to p2p Channels and a channel to listen for peer // and querying, references to p2p Channels and a channel to listen for peer
// updates on. Note, the reactor will close all p2p Channels when stopping. // updates on. Note, the reactor will close all p2p Channels when stopping.
func NewReactor( func NewReactor(
cfg config.StateSyncConfig,
logger log.Logger, logger log.Logger,
conn proxy.AppConnSnapshot, conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery, connQuery proxy.AppConnQuery,
@ -144,6 +147,7 @@ func NewReactor(
tempDir string, tempDir string,
) *Reactor { ) *Reactor {
r := &Reactor{ r := &Reactor{
cfg: cfg,
conn: conn, conn: conn,
connQuery: connQuery, connQuery: connQuery,
snapshotCh: snapshotCh, snapshotCh: snapshotCh,
@ -217,7 +221,16 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
return sm.State{}, errors.New("a state sync is already in progress") return sm.State{}, errors.New("a state sync is already in progress")
} }
r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir)
r.syncer = newSyncer(
r.cfg,
r.Logger,
r.conn,
r.connQuery,
stateProvider,
r.snapshotCh.Out,
r.chunkCh.Out,
r.tempDir,
)
r.mtx.Unlock() r.mtx.Unlock()
hook := func() { hook := func() {


+ 5
- 0
internal/statesync/reactor_test.go View File

@ -14,6 +14,7 @@ import (
dbm "github.com/tendermint/tm-db" dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/statesync/mocks" "github.com/tendermint/tendermint/internal/statesync/mocks"
"github.com/tendermint/tendermint/internal/test/factory" "github.com/tendermint/tendermint/internal/test/factory"
@ -121,7 +122,10 @@ func setup(
rts.stateStore = &smmocks.Store{} rts.stateStore = &smmocks.Store{}
rts.blockStore = store.NewBlockStore(dbm.NewMemDB()) rts.blockStore = store.NewBlockStore(dbm.NewMemDB())
cfg := config.DefaultStateSyncConfig()
rts.reactor = NewReactor( rts.reactor = NewReactor(
*cfg,
log.TestingLogger(), log.TestingLogger(),
conn, conn,
connQuery, connQuery,
@ -138,6 +142,7 @@ func setup(
rts.reactor.dispatcher = newDispatcher(rts.blockChannel.Out, 1*time.Second) rts.reactor.dispatcher = newDispatcher(rts.blockChannel.Out, 1*time.Second)
rts.syncer = newSyncer( rts.syncer = newSyncer(
*cfg,
log.NewNopLogger(), log.NewNopLogger(),
conn, conn,
connQuery, connQuery,


+ 12
- 6
internal/statesync/syncer.go View File

@ -8,6 +8,7 @@ import (
"time" "time"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
tmsync "github.com/tendermint/tendermint/internal/libs/sync" tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -18,12 +19,9 @@ import (
) )
const ( const (
// chunkFetchers is the number of concurrent chunk fetchers to run.
chunkFetchers = 4
// chunkTimeout is the timeout while waiting for the next chunk from the chunk queue. // chunkTimeout is the timeout while waiting for the next chunk from the chunk queue.
chunkTimeout = 2 * time.Minute chunkTimeout = 2 * time.Minute
// requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer.
chunkRequestTimeout = 10 * time.Second
// minimumDiscoveryTime is the lowest allowable time for a // minimumDiscoveryTime is the lowest allowable time for a
// SyncAny discovery time. // SyncAny discovery time.
minimumDiscoveryTime = 5 * time.Second minimumDiscoveryTime = 5 * time.Second
@ -52,6 +50,7 @@ var (
// sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific // sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
// snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate. // snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
type syncer struct { type syncer struct {
cfg config.StateSyncConfig
logger log.Logger logger log.Logger
stateProvider StateProvider stateProvider StateProvider
conn proxy.AppConnSnapshot conn proxy.AppConnSnapshot
@ -67,6 +66,7 @@ type syncer struct {
// newSyncer creates a new syncer. // newSyncer creates a new syncer.
func newSyncer( func newSyncer(
cfg config.StateSyncConfig,
logger log.Logger, logger log.Logger,
conn proxy.AppConnSnapshot, conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery, connQuery proxy.AppConnQuery,
@ -75,6 +75,7 @@ func newSyncer(
tempDir string, tempDir string,
) *syncer { ) *syncer {
return &syncer{ return &syncer{
cfg: cfg,
logger: logger, logger: logger,
stateProvider: stateProvider, stateProvider: stateProvider,
conn: conn, conn: conn,
@ -256,7 +257,7 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.
// Spawn chunk fetchers. They will terminate when the chunk queue is closed or context canceled. // Spawn chunk fetchers. They will terminate when the chunk queue is closed or context canceled.
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
for i := int32(0); i < chunkFetchers; i++ {
for i := int32(0); i < s.cfg.ChunkFetchers; i++ {
go s.fetchChunks(ctx, snapshot, chunks) go s.fetchChunks(ctx, snapshot, chunks)
} }
@ -410,16 +411,21 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch
s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height, s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height,
"format", snapshot.Format, "chunk", index, "total", chunks.Size()) "format", snapshot.Format, "chunk", index, "total", chunks.Size())
ticker := time.NewTicker(chunkRequestTimeout)
ticker := time.NewTicker(s.cfg.ChunkRequestTimeout)
defer ticker.Stop() defer ticker.Stop()
s.requestChunk(snapshot, index) s.requestChunk(snapshot, index)
select { select {
case <-chunks.WaitFor(index): case <-chunks.WaitFor(index):
case <-ticker.C: case <-ticker.C:
s.requestChunk(snapshot, index) s.requestChunk(snapshot, index)
case <-ctx.Done(): case <-ctx.Done():
return return
} }
ticker.Stop() ticker.Stop()
} }
} }


+ 1
- 0
node/node.go View File

@ -335,6 +335,7 @@ func makeNode(config *cfg.Config,
} }
stateSyncReactor = statesync.NewReactor( stateSyncReactor = statesync.NewReactor(
*config.StateSync,
stateSyncReactorShim.Logger, stateSyncReactorShim.Logger,
proxyApp.Snapshot(), proxyApp.Snapshot(),
proxyApp.Query(), proxyApp.Query(),


Loading…
Cancel
Save