|
|
@ -50,13 +50,14 @@ var ( |
|
|
|
// sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
|
|
|
|
// snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
|
|
|
|
type syncer struct { |
|
|
|
cfg config.StateSyncConfig |
|
|
|
logger log.Logger |
|
|
|
stateProvider StateProvider |
|
|
|
conn proxy.AppConnSnapshot |
|
|
|
connQuery proxy.AppConnQuery |
|
|
|
snapshots *snapshotPool |
|
|
|
tempDir string |
|
|
|
chunkFetchers int32 |
|
|
|
retryTimeout time.Duration |
|
|
|
|
|
|
|
mtx tmsync.RWMutex |
|
|
|
chunks *chunkQueue |
|
|
@ -73,13 +74,14 @@ func newSyncer( |
|
|
|
) *syncer { |
|
|
|
|
|
|
|
return &syncer{ |
|
|
|
cfg: cfg, |
|
|
|
logger: logger, |
|
|
|
stateProvider: stateProvider, |
|
|
|
conn: conn, |
|
|
|
connQuery: connQuery, |
|
|
|
snapshots: newSnapshotPool(stateProvider), |
|
|
|
tempDir: tempDir, |
|
|
|
chunkFetchers: cfg.ChunkFetchers, |
|
|
|
retryTimeout: cfg.ChunkRequestTimeout, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -250,7 +252,7 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types. |
|
|
|
// Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled.
|
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
defer cancel() |
|
|
|
for i := int32(0); i < s.cfg.ChunkFetchers; i++ { |
|
|
|
for i := int32(0); i < s.chunkFetchers; i++ { |
|
|
|
go s.fetchChunks(ctx, snapshot, chunks) |
|
|
|
} |
|
|
|
|
|
|
@ -383,36 +385,45 @@ func (s *syncer) applyChunks(chunks *chunkQueue) error { |
|
|
|
// fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks
|
|
|
|
// will be received from the reactor via syncer.AddChunks() to chunkQueue.Add().
|
|
|
|
func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) { |
|
|
|
var ( |
|
|
|
next = true |
|
|
|
index uint32 |
|
|
|
err error |
|
|
|
) |
|
|
|
|
|
|
|
for { |
|
|
|
index, err := chunks.Allocate() |
|
|
|
if err == errDone { |
|
|
|
// Keep checking until the context is cancelled (restore is done), in case any
|
|
|
|
// chunks need to be refetched.
|
|
|
|
select { |
|
|
|
case <-ctx.Done(): |
|
|
|
if next { |
|
|
|
index, err = chunks.Allocate() |
|
|
|
if errors.Is(err, errDone) { |
|
|
|
// Keep checking until the context is canceled (restore is done), in case any
|
|
|
|
// chunks need to be refetched.
|
|
|
|
select { |
|
|
|
case <-ctx.Done(): |
|
|
|
return |
|
|
|
default: |
|
|
|
} |
|
|
|
time.Sleep(2 * time.Second) |
|
|
|
continue |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
s.logger.Error("Failed to allocate chunk from queue", "err", err) |
|
|
|
return |
|
|
|
default: |
|
|
|
} |
|
|
|
time.Sleep(2 * time.Second) |
|
|
|
continue |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
s.logger.Error("Failed to allocate chunk from queue", "err", err) |
|
|
|
return |
|
|
|
} |
|
|
|
s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height, |
|
|
|
"format", snapshot.Format, "chunk", index, "total", chunks.Size()) |
|
|
|
|
|
|
|
ticker := time.NewTicker(s.cfg.ChunkRequestTimeout) |
|
|
|
ticker := time.NewTicker(s.retryTimeout) |
|
|
|
defer ticker.Stop() |
|
|
|
|
|
|
|
s.requestChunk(snapshot, index) |
|
|
|
|
|
|
|
select { |
|
|
|
case <-chunks.WaitFor(index): |
|
|
|
next = true |
|
|
|
|
|
|
|
case <-ticker.C: |
|
|
|
s.requestChunk(snapshot, index) |
|
|
|
next = false |
|
|
|
|
|
|
|
case <-ctx.Done(): |
|
|
|
return |
|
|
|