From d36a5905a67db1ed7afb09f371b3ea3910afb6eb Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 21 Apr 2021 12:00:15 -0400 Subject: [PATCH] statesync: improve e2e test outcomes (#6378) I believe that this, in my testing seems to help the e2e state-sync tests complete more reliably, by fixing some potential, range-related slice building, as well as the way the test app hashes snapshots. Additionally, and I'm not sure if we want to do this, but I added this hook to the reactor that re-sends the request for snapshots during the retry. This helps in tests prevent systems from getting stuck, but I think in reality, it might create more traffic, and operators would just restart a state-syncing node to get a similar effect. --- CHANGELOG_PENDING.md | 15 ++++++++------- config/config.go | 4 ++++ statesync/reactor.go | 16 ++++++++++------ statesync/snapshots.go | 4 ++-- statesync/syncer.go | 10 +++++++++- statesync/syncer_test.go | 14 +++++++------- test/e2e/app/snapshots.go | 11 ++++------- 7 files changed, 44 insertions(+), 30 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 020b87a23..9b1c4349e 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -34,12 +34,12 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - [store] \#5848 Remove block store state in favor of using the db iterators directly (@cmwaters) - [state] \#5864 Use an iterator when pruning state (@cmwaters) - [types] \#6023 Remove `tm2pb.Header`, `tm2pb.BlockID`, `tm2pb.PartSetHeader` and `tm2pb.NewValidatorUpdate`. - - Each of the above types has a `ToProto` and `FromProto` method or function which replaced this logic. + - Each of the above types has a `ToProto` and `FromProto` method or function which replaced this logic. - [light] \#6054 Move `MaxRetryAttempt` option from client to provider. - - `NewWithOptions` now sets the max retry attempts and timeouts (@cmwaters) + - `NewWithOptions` now sets the max retry attempts and timeouts (@cmwaters) - [all] \#6077 Change spelling from British English to American (@cmwaters) - - Rename "Subscription.Cancelled()" to "Subscription.Canceled()" in libs/pubsub - - Rename "behaviour" pkg to "behavior" and internalized it in blockchain v2 + - Rename "Subscription.Cancelled()" to "Subscription.Canceled()" in libs/pubsub + - Rename "behaviour" pkg to "behavior" and internalized it in blockchain v2 - [rpc/client/http] \#6176 Remove `endpoint` arg from `New`, `NewWithTimeout` and `NewWithClient` (@melekes) - [rpc/client/http] \#6176 Unexpose `WSEvents` (@melekes) - [rpc/jsonrpc/client/ws_client] \#6176 `NewWS` no longer accepts options (use `NewWSWithOptions` and `OnReconnect` funcs to configure the client) (@melekes) @@ -76,11 +76,12 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - [node] \#6059 Validate and complete genesis doc before saving to state store (@silasdavis) - [state] \#6067 Batch save state data (@githubsands & @cmwaters) - [crypto] \#6120 Implement batch verification interface for ed25519 and sr25519. (@marbar3778) -- [types] \#6120 use batch verification for verifying commits signatures. - - If the key type supports the batch verification API it will try to batch verify. If the verification fails we will single verify each signature. +- [types] \#6120 use batch verification for verifying commits signatures. + - If the key type supports the batch verification API it will try to batch verify. If the verification fails we will single verify each signature. - [privval/file] \#6185 Return error on `LoadFilePV`, `LoadFilePVEmptyState`. Allows for better programmatic control of Tendermint. -- [privval] \#6240 Add `context.Context` to privval interface. +- [privval] \#6240 Add `context.Context` to privval interface. - [rpc] \#6265 set cache control in http-rpc response header (@JayT106) +- [statesync] \#6378 Retry requests for snapshots and add a minimum discovery time (5s) for new snapshots. ### BUG FIXES diff --git a/config/config.go b/config/config.go index 4a3968c8c..5b26ad7f2 100644 --- a/config/config.go +++ b/config/config.go @@ -834,6 +834,10 @@ func (cfg *StateSyncConfig) ValidateBasic() error { return errors.New("found empty rpc-servers entry") } } + if cfg.DiscoveryTime != 0 && cfg.DiscoveryTime < 5*time.Second { + return errors.New("discovery time must be 0s or greater than five seconds") + } + if cfg.TrustPeriod <= 0 { return errors.New("trusted-period is required") } diff --git a/statesync/reactor.go b/statesync/reactor.go index 7c48fe3d7..79da87c0f 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -477,14 +477,18 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir) r.mtx.Unlock() - // request snapshots from all currently connected peers - r.Logger.Debug("requesting snapshots from known peers") - r.snapshotCh.Out <- p2p.Envelope{ - Broadcast: true, - Message: &ssproto.SnapshotsRequest{}, + hook := func() { + // request snapshots from all currently connected peers + r.Logger.Debug("requesting snapshots from known peers") + r.snapshotCh.Out <- p2p.Envelope{ + Broadcast: true, + Message: &ssproto.SnapshotsRequest{}, + } } - state, commit, err := r.syncer.SyncAny(discoveryTime) + hook() + + state, commit, err := r.syncer.SyncAny(discoveryTime, hook) r.mtx.Lock() r.syncer = nil diff --git a/statesync/snapshots.go b/statesync/snapshots.go index 1dd3a6d34..3bed0fbf1 100644 --- a/statesync/snapshots.go +++ b/statesync/snapshots.go @@ -178,8 +178,8 @@ func (p *snapshotPool) Ranked() []*snapshot { defer p.Unlock() candidates := make([]*snapshot, 0, len(p.snapshots)) - for _, snapshot := range p.snapshots { - candidates = append(candidates, snapshot) + for key := range p.snapshots { + candidates = append(candidates, p.snapshots[key]) } sort.Slice(candidates, func(i, j int) bool { diff --git a/statesync/syncer.go b/statesync/syncer.go index 46799cde7..c5ab44f89 100644 --- a/statesync/syncer.go +++ b/statesync/syncer.go @@ -24,6 +24,9 @@ const ( chunkTimeout = 2 * time.Minute // requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer. chunkRequestTimeout = 10 * time.Second + // minimumDiscoveryTime is the lowest allowable time for a + // SyncAny discovery time. + minimumDiscoveryTime = 5 * time.Second ) var ( @@ -138,7 +141,11 @@ func (s *syncer) RemovePeer(peerID p2p.NodeID) { // SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further // snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit // which the caller must use to bootstrap the node. -func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit, error) { +func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.State, *types.Commit, error) { + if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime { + discoveryTime = 5 * minimumDiscoveryTime + } + if discoveryTime > 0 { s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime)) time.Sleep(discoveryTime) @@ -161,6 +168,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit, if discoveryTime == 0 { return sm.State{}, nil, errNoSnapshots } + retryHook() s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime)) time.Sleep(discoveryTime) continue diff --git a/statesync/syncer_test.go b/statesync/syncer_test.go index eb809c5cc..7d150c5d5 100644 --- a/statesync/syncer_test.go +++ b/statesync/syncer_test.go @@ -175,7 +175,7 @@ func TestSyncer_SyncAny(t *testing.T) { LastBlockAppHash: []byte("app_hash"), }, nil) - newState, lastCommit, err := rts.syncer.SyncAny(0) + newState, lastCommit, err := rts.syncer.SyncAny(0, func() {}) require.NoError(t, err) wg.Wait() @@ -201,7 +201,7 @@ func TestSyncer_SyncAny_noSnapshots(t *testing.T) { rts := setup(t, nil, nil, stateProvider, 2) - _, _, err := rts.syncer.SyncAny(0) + _, _, err := rts.syncer.SyncAny(0, func() {}) require.Equal(t, errNoSnapshots, err) } @@ -221,7 +221,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) { Snapshot: toABCI(s), AppHash: []byte("app_hash"), }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil) - _, _, err = rts.syncer.SyncAny(0) + _, _, err = rts.syncer.SyncAny(0, func() {}) require.Equal(t, errAbort, err) rts.conn.AssertExpectations(t) } @@ -260,7 +260,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) { Snapshot: toABCI(s11), AppHash: []byte("app_hash"), }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) - _, _, err = rts.syncer.SyncAny(0) + _, _, err = rts.syncer.SyncAny(0, func() {}) require.Equal(t, errNoSnapshots, err) rts.conn.AssertExpectations(t) } @@ -295,7 +295,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) { Snapshot: toABCI(s11), AppHash: []byte("app_hash"), }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil) - _, _, err = rts.syncer.SyncAny(0) + _, _, err = rts.syncer.SyncAny(0, func() {}) require.Equal(t, errAbort, err) rts.conn.AssertExpectations(t) } @@ -341,7 +341,7 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) { Snapshot: toABCI(sa), AppHash: []byte("app_hash"), }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) - _, _, err = rts.syncer.SyncAny(0) + _, _, err = rts.syncer.SyncAny(0, func() {}) require.Equal(t, errNoSnapshots, err) rts.conn.AssertExpectations(t) } @@ -364,7 +364,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) { Snapshot: toABCI(s), AppHash: []byte("app_hash"), }).Once().Return(nil, errBoom) - _, _, err = rts.syncer.SyncAny(0) + _, _, err = rts.syncer.SyncAny(0, func() {}) require.True(t, errors.Is(err, errBoom)) rts.conn.AssertExpectations(t) } diff --git a/test/e2e/app/snapshots.go b/test/e2e/app/snapshots.go index 590b13cee..4ddb7ecdc 100644 --- a/test/e2e/app/snapshots.go +++ b/test/e2e/app/snapshots.go @@ -2,7 +2,6 @@ package main import ( - "crypto/sha256" "encoding/json" "errors" "fmt" @@ -88,11 +87,10 @@ func (s *SnapshotStore) Create(state *State) (abci.Snapshot, error) { if err != nil { return abci.Snapshot{}, err } - hash := sha256.Sum256(bz) snapshot := abci.Snapshot{ Height: state.Height, Format: 1, - Hash: hash[:], + Hash: hashItems(state.Values), Chunks: byteChunks(bz), } err = ioutil.WriteFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", state.Height)), bz, 0644) @@ -111,10 +109,9 @@ func (s *SnapshotStore) Create(state *State) (abci.Snapshot, error) { func (s *SnapshotStore) List() ([]*abci.Snapshot, error) { s.RLock() defer s.RUnlock() - snapshots := []*abci.Snapshot{} - for _, snapshot := range s.metadata { - s := snapshot // copy to avoid pointer to range variable - snapshots = append(snapshots, &s) + snapshots := make([]*abci.Snapshot, len(s.metadata)) + for idx := range s.metadata { + snapshots[idx] = &s.metadata[idx] } return snapshots, nil }