Browse Source

statesync: make fetching chunks more robust (#6587)

pull/6608/head
Callum Waters 3 years ago
committed by GitHub
parent
commit
6e238b5b9d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 81 additions and 67 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +7
    -7
      config/config.go
  3. +3
    -3
      config/toml.go
  4. +11
    -14
      internal/statesync/reactor.go
  5. +47
    -31
      internal/statesync/syncer.go
  6. +11
    -11
      internal/statesync/syncer_test.go
  7. +1
    -1
      node/node.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -122,6 +122,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- [crypto/merkle] \#6513 Optimize HashAlternatives (@marbar3778)
- [p2p/pex] \#6509 Improve addrBook.hash performance (@cuonglm)
- [consensus/metrics] \#6549 Change block_size gauge to a histogram for better observability over time (@marbar3778)
- [statesync] \#6587 Increase chunk priority and re-request chunks that don't arrive (@cmwaters)
### BUG FIXES


+ 7
- 7
config/config.go View File

@ -817,7 +817,7 @@ type StateSyncConfig struct {
TrustHash string `mapstructure:"trust-hash"`
DiscoveryTime time.Duration `mapstructure:"discovery-time"`
ChunkRequestTimeout time.Duration `mapstructure:"chunk-request-timeout"`
ChunkFetchers int32 `mapstructure:"chunk-fetchers"`
Fetchers int32 `mapstructure:"fetchers"`
}
func (cfg *StateSyncConfig) TrustHashBytes() []byte {
@ -834,8 +834,8 @@ func DefaultStateSyncConfig() *StateSyncConfig {
return &StateSyncConfig{
TrustPeriod: 168 * time.Hour,
DiscoveryTime: 15 * time.Second,
ChunkRequestTimeout: 10 * time.Second,
ChunkFetchers: 4,
ChunkRequestTimeout: 15 * time.Second,
Fetchers: 4,
}
}
@ -882,12 +882,12 @@ func (cfg *StateSyncConfig) ValidateBasic() error {
return fmt.Errorf("invalid trusted-hash: %w", err)
}
if cfg.ChunkRequestTimeout < time.Second {
return errors.New("chunk-request-timeout must be least a one second")
if cfg.ChunkRequestTimeout < 5*time.Second {
return errors.New("chunk-request-timeout must be at least 5 seconds")
}
if cfg.ChunkFetchers <= 0 {
return errors.New("chunk-fetchers is required")
if cfg.Fetchers <= 0 {
return errors.New("fetchers is required")
}
}


+ 3
- 3
config/toml.go View File

@ -426,11 +426,11 @@ discovery-time = "{{ .StateSync.DiscoveryTime }}"
temp-dir = "{{ .StateSync.TempDir }}"
# The timeout duration before re-requesting a chunk, possibly from a different
# peer (default: 1 minute).
# peer (default: 15 seconds).
chunk-request-timeout = "{{ .StateSync.ChunkRequestTimeout }}"
# The number of concurrent chunk fetchers to run (default: 1).
chunk-fetchers = "{{ .StateSync.ChunkFetchers }}"
# The number of concurrent chunk and block fetchers to run (default: 4).
fetchers = "{{ .StateSync.Fetchers }}"
#######################################################
### Fast Sync Configuration Connections ###


+ 11
- 14
internal/statesync/reactor.go View File

@ -38,7 +38,7 @@ var (
MsgType: new(ssproto.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(SnapshotChannel),
Priority: 5,
Priority: 6,
SendQueueCapacity: 10,
RecvMessageCapacity: snapshotMsgSize,
@ -49,7 +49,7 @@ var (
MsgType: new(ssproto.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(ChunkChannel),
Priority: 1,
Priority: 3,
SendQueueCapacity: 4,
RecvMessageCapacity: chunkMsgSize,
@ -60,7 +60,7 @@ var (
MsgType: new(ssproto.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(LightBlockChannel),
Priority: 1,
Priority: 2,
SendQueueCapacity: 10,
RecvMessageCapacity: lightBlockMsgSize,
@ -99,10 +99,6 @@ const (
// maxLightBlockRequestRetries is the amount of retries acceptable before
// the backfill process aborts
maxLightBlockRequestRetries = 20
// the amount of processes fetching light blocks - this should be roughly calculated
// as the time to fetch a block / time to verify a block
lightBlockFetchers = 4
)
// Reactor handles state sync, both restoring snapshots for the local node and
@ -214,7 +210,11 @@ func (r *Reactor) OnStop() {
// application. It also saves tendermint state and runs a backfill process to
// retrieve the necessary amount of headers, commits and validators sets to be
// able to process evidence and participate in consensus.
func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) (sm.State, error) {
func (r *Reactor) Sync(
ctx context.Context,
stateProvider StateProvider,
discoveryTime time.Duration,
) (sm.State, error) {
r.mtx.Lock()
if r.syncer != nil {
r.mtx.Unlock()
@ -233,18 +233,15 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
)
r.mtx.Unlock()
hook := func() {
requestSnapshotsHook := func() {
// request snapshots from all currently connected peers
r.Logger.Debug("requesting snapshots from known peers")
r.snapshotCh.Out <- p2p.Envelope{
Broadcast: true,
Message: &ssproto.SnapshotsRequest{},
}
}
hook()
state, commit, err := r.syncer.SyncAny(discoveryTime, hook)
state, commit, err := r.syncer.SyncAny(ctx, discoveryTime, requestSnapshotsHook)
if err != nil {
return sm.State{}, err
}
@ -312,7 +309,7 @@ func (r *Reactor) backfill(
// time. Ideally we want the verification process to never have to be
// waiting on blocks. If it takes 4s to retrieve a block and 1s to verify
// it, then steady state involves four workers.
for i := 0; i < lightBlockFetchers; i++ {
for i := 0; i < int(r.cfg.Fetchers); i++ {
go func() {
for {
select {


+ 47
- 31
internal/statesync/syncer.go View File

@ -50,7 +50,6 @@ var (
// sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
// snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
type syncer struct {
cfg config.StateSyncConfig
logger log.Logger
stateProvider StateProvider
conn proxy.AppConnSnapshot
@ -59,6 +58,8 @@ type syncer struct {
snapshotCh chan<- p2p.Envelope
chunkCh chan<- p2p.Envelope
tempDir string
fetchers int32
retryTimeout time.Duration
mtx tmsync.RWMutex
chunks *chunkQueue
@ -75,7 +76,6 @@ func newSyncer(
tempDir string,
) *syncer {
return &syncer{
cfg: cfg,
logger: logger,
stateProvider: stateProvider,
conn: conn,
@ -84,6 +84,8 @@ func newSyncer(
snapshotCh: snapshotCh,
chunkCh: chunkCh,
tempDir: tempDir,
fetchers: cfg.Fetchers,
retryTimeout: cfg.ChunkRequestTimeout,
}
}
@ -142,12 +144,18 @@ func (s *syncer) RemovePeer(peerID p2p.NodeID) {
// SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further
// snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit
// which the caller must use to bootstrap the node.
func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.State, *types.Commit, error) {
func (s *syncer) SyncAny(
ctx context.Context,
discoveryTime time.Duration,
requestSnapshots func(),
) (sm.State, *types.Commit, error) {
if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime {
discoveryTime = minimumDiscoveryTime
}
if discoveryTime > 0 {
requestSnapshots()
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
time.Sleep(discoveryTime)
}
@ -169,7 +177,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.Stat
if discoveryTime == 0 {
return sm.State{}, nil, errNoSnapshots
}
retryHook()
requestSnapshots()
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
time.Sleep(discoveryTime)
continue
@ -182,7 +190,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.Stat
defer chunks.Close() // in case we forget to close it elsewhere
}
newState, commit, err := s.Sync(snapshot, chunks)
newState, commit, err := s.Sync(ctx, snapshot, chunks)
switch {
case err == nil:
return newState, commit, nil
@ -234,7 +242,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.Stat
// Sync executes a sync for a specific snapshot, returning the latest state and block commit which
// the caller must use to bootstrap the node.
func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.Commit, error) {
func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.Commit, error) {
s.mtx.Lock()
if s.chunks != nil {
s.mtx.Unlock()
@ -249,19 +257,19 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.
}()
// Offer snapshot to ABCI app.
err := s.offerSnapshot(snapshot)
err := s.offerSnapshot(ctx, snapshot)
if err != nil {
return sm.State{}, nil, err
}
// Spawn chunk fetchers. They will terminate when the chunk queue is closed or context canceled.
ctx, cancel := context.WithCancel(context.Background())
fetchCtx, cancel := context.WithCancel(ctx)
defer cancel()
for i := int32(0); i < s.cfg.ChunkFetchers; i++ {
go s.fetchChunks(ctx, snapshot, chunks)
for i := int32(0); i < s.fetchers; i++ {
go s.fetchChunks(fetchCtx, snapshot, chunks)
}
pctx, pcancel := context.WithTimeout(context.Background(), 10*time.Second)
pctx, pcancel := context.WithTimeout(ctx, 10*time.Second)
defer pcancel()
// Optimistically build new state, so we don't discover any light client failures at the end.
@ -275,7 +283,7 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.
}
// Restore snapshot
err = s.applyChunks(chunks)
err = s.applyChunks(ctx, chunks)
if err != nil {
return sm.State{}, nil, err
}
@ -296,10 +304,10 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.
// offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's
// response, or nil if the snapshot was accepted.
func (s *syncer) offerSnapshot(snapshot *snapshot) error {
func (s *syncer) offerSnapshot(ctx context.Context, snapshot *snapshot) error {
s.logger.Info("Offering snapshot to ABCI app", "height", snapshot.Height,
"format", snapshot.Format, "hash", snapshot.Hash)
resp, err := s.conn.OfferSnapshotSync(context.Background(), abci.RequestOfferSnapshot{
resp, err := s.conn.OfferSnapshotSync(ctx, abci.RequestOfferSnapshot{
Snapshot: &abci.Snapshot{
Height: snapshot.Height,
Format: snapshot.Format,
@ -332,7 +340,7 @@ func (s *syncer) offerSnapshot(snapshot *snapshot) error {
// applyChunks applies chunks to the app. It returns various errors depending on the app's
// response, or nil once the snapshot is fully restored.
func (s *syncer) applyChunks(chunks *chunkQueue) error {
func (s *syncer) applyChunks(ctx context.Context, chunks *chunkQueue) error {
for {
chunk, err := chunks.Next()
if err == errDone {
@ -341,7 +349,7 @@ func (s *syncer) applyChunks(chunks *chunkQueue) error {
return fmt.Errorf("failed to fetch chunk: %w", err)
}
resp, err := s.conn.ApplySnapshotChunkSync(context.Background(), abci.RequestApplySnapshotChunk{
resp, err := s.conn.ApplySnapshotChunkSync(ctx, abci.RequestApplySnapshotChunk{
Index: chunk.Index,
Chunk: chunk.Chunk,
Sender: string(chunk.Sender),
@ -391,36 +399,44 @@ func (s *syncer) applyChunks(chunks *chunkQueue) error {
// fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks
// will be received from the reactor via syncer.AddChunks() to chunkQueue.Add().
func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) {
var (
next = true
index uint32
err error
)
for {
index, err := chunks.Allocate()
if err == errDone {
// Keep checking until the context is canceled (restore is done), in case any
// chunks need to be refetched.
select {
case <-ctx.Done():
if next {
index, err = chunks.Allocate()
if errors.Is(err, errDone) {
// Keep checking until the context is canceled (restore is done), in case any
// chunks need to be refetched.
select {
case <-ctx.Done():
return
case <-time.After(2 * time.Second):
continue
}
}
if err != nil {
s.logger.Error("Failed to allocate chunk from queue", "err", err)
return
default:
}
time.Sleep(2 * time.Second)
continue
}
if err != nil {
s.logger.Error("Failed to allocate chunk from queue", "err", err)
return
}
s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height,
"format", snapshot.Format, "chunk", index, "total", chunks.Size())
ticker := time.NewTicker(s.cfg.ChunkRequestTimeout)
ticker := time.NewTicker(s.retryTimeout)
defer ticker.Stop()
s.requestChunk(snapshot, index)
select {
case <-chunks.WaitFor(index):
next = true
case <-ticker.C:
s.requestChunk(snapshot, index)
next = false
case <-ctx.Done():
return


+ 11
- 11
internal/statesync/syncer_test.go View File

@ -180,7 +180,7 @@ func TestSyncer_SyncAny(t *testing.T) {
LastBlockAppHash: []byte("app_hash"),
}, nil)
newState, lastCommit, err := rts.syncer.SyncAny(0, func() {})
newState, lastCommit, err := rts.syncer.SyncAny(ctx, 0, func() {})
require.NoError(t, err)
wg.Wait()
@ -206,7 +206,7 @@ func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
rts := setup(t, nil, nil, stateProvider, 2)
_, _, err := rts.syncer.SyncAny(0, func() {})
_, _, err := rts.syncer.SyncAny(ctx, 0, func() {})
require.Equal(t, errNoSnapshots, err)
}
@ -226,7 +226,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) {
Snapshot: toABCI(s), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
_, _, err = rts.syncer.SyncAny(0, func() {})
_, _, err = rts.syncer.SyncAny(ctx, 0, func() {})
require.Equal(t, errAbort, err)
rts.conn.AssertExpectations(t)
}
@ -265,7 +265,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) {
Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
_, _, err = rts.syncer.SyncAny(0, func() {})
_, _, err = rts.syncer.SyncAny(ctx, 0, func() {})
require.Equal(t, errNoSnapshots, err)
rts.conn.AssertExpectations(t)
}
@ -300,7 +300,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) {
Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
_, _, err = rts.syncer.SyncAny(0, func() {})
_, _, err = rts.syncer.SyncAny(ctx, 0, func() {})
require.Equal(t, errAbort, err)
rts.conn.AssertExpectations(t)
}
@ -346,7 +346,7 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) {
Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
_, _, err = rts.syncer.SyncAny(0, func() {})
_, _, err = rts.syncer.SyncAny(ctx, 0, func() {})
require.Equal(t, errNoSnapshots, err)
rts.conn.AssertExpectations(t)
}
@ -369,7 +369,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) {
Snapshot: toABCI(s), AppHash: []byte("app_hash"),
}).Once().Return(nil, errBoom)
_, _, err = rts.syncer.SyncAny(0, func() {})
_, _, err = rts.syncer.SyncAny(ctx, 0, func() {})
require.True(t, errors.Is(err, errBoom))
rts.conn.AssertExpectations(t)
}
@ -406,7 +406,7 @@ func TestSyncer_offerSnapshot(t *testing.T) {
AppHash: []byte("app_hash"),
}).Return(&abci.ResponseOfferSnapshot{Result: tc.result}, tc.err)
err := rts.syncer.offerSnapshot(s)
err := rts.syncer.offerSnapshot(ctx, s)
if tc.expectErr == unknownErr {
require.Error(t, err)
} else {
@ -462,7 +462,7 @@ func TestSyncer_applyChunks_Results(t *testing.T) {
Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
}
err = rts.syncer.applyChunks(chunks)
err = rts.syncer.applyChunks(ctx, chunks)
if tc.expectErr == unknownErr {
require.Error(t, err)
} else {
@ -527,7 +527,7 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
// check the queue contents, and finally close the queue to end the goroutine.
// We don't really care about the result of applyChunks, since it has separate test.
go func() {
rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
rts.syncer.applyChunks(ctx, chunks) //nolint:errcheck // purposefully ignore error
}()
time.Sleep(50 * time.Millisecond)
@ -626,7 +626,7 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
// However, it will block on e.g. retry result, so we spawn a goroutine that will
// be shut down when the chunk queue closes.
go func() {
rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
rts.syncer.applyChunks(ctx, chunks) //nolint:errcheck // purposefully ignore error
}()
time.Sleep(50 * time.Millisecond)


+ 1
- 1
node/node.go View File

@ -1055,7 +1055,7 @@ func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reacto
}
go func() {
state, err := ssR.Sync(stateProvider, config.DiscoveryTime)
state, err := ssR.Sync(context.TODO(), stateProvider, config.DiscoveryTime)
if err != nil {
ssR.Logger.Error("state sync failed", "err", err)
return


Loading…
Cancel
Save