Browse Source

statesync: improve e2e test outcomes (#6378)

I believe that this, in my testing seems to help the e2e state-sync
tests complete more reliably, by fixing some potential, range-related
slice building, as well as the way the test app hashes snapshots.

Additionally, and I'm not sure if we want to do this, but I added this
hook to the reactor that re-sends the request for snapshots during the
retry. This helps in tests prevent systems from getting stuck, but I
think in reality, it might create more traffic, and operators would
just restart a state-syncing node to get a similar effect.
pull/6383/head
Sam Kleinman 4 years ago
committed by GitHub
parent
commit
d36a5905a6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 44 additions and 30 deletions
  1. +8
    -7
      CHANGELOG_PENDING.md
  2. +4
    -0
      config/config.go
  3. +10
    -6
      statesync/reactor.go
  4. +2
    -2
      statesync/snapshots.go
  5. +9
    -1
      statesync/syncer.go
  6. +7
    -7
      statesync/syncer_test.go
  7. +4
    -7
      test/e2e/app/snapshots.go

+ 8
- 7
CHANGELOG_PENDING.md View File

@ -34,12 +34,12 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- [store] \#5848 Remove block store state in favor of using the db iterators directly (@cmwaters) - [store] \#5848 Remove block store state in favor of using the db iterators directly (@cmwaters)
- [state] \#5864 Use an iterator when pruning state (@cmwaters) - [state] \#5864 Use an iterator when pruning state (@cmwaters)
- [types] \#6023 Remove `tm2pb.Header`, `tm2pb.BlockID`, `tm2pb.PartSetHeader` and `tm2pb.NewValidatorUpdate`. - [types] \#6023 Remove `tm2pb.Header`, `tm2pb.BlockID`, `tm2pb.PartSetHeader` and `tm2pb.NewValidatorUpdate`.
- Each of the above types has a `ToProto` and `FromProto` method or function which replaced this logic.
- Each of the above types has a `ToProto` and `FromProto` method or function which replaced this logic.
- [light] \#6054 Move `MaxRetryAttempt` option from client to provider. - [light] \#6054 Move `MaxRetryAttempt` option from client to provider.
- `NewWithOptions` now sets the max retry attempts and timeouts (@cmwaters)
- `NewWithOptions` now sets the max retry attempts and timeouts (@cmwaters)
- [all] \#6077 Change spelling from British English to American (@cmwaters) - [all] \#6077 Change spelling from British English to American (@cmwaters)
- Rename "Subscription.Cancelled()" to "Subscription.Canceled()" in libs/pubsub
- Rename "behaviour" pkg to "behavior" and internalized it in blockchain v2
- Rename "Subscription.Cancelled()" to "Subscription.Canceled()" in libs/pubsub
- Rename "behaviour" pkg to "behavior" and internalized it in blockchain v2
- [rpc/client/http] \#6176 Remove `endpoint` arg from `New`, `NewWithTimeout` and `NewWithClient` (@melekes) - [rpc/client/http] \#6176 Remove `endpoint` arg from `New`, `NewWithTimeout` and `NewWithClient` (@melekes)
- [rpc/client/http] \#6176 Unexpose `WSEvents` (@melekes) - [rpc/client/http] \#6176 Unexpose `WSEvents` (@melekes)
- [rpc/jsonrpc/client/ws_client] \#6176 `NewWS` no longer accepts options (use `NewWSWithOptions` and `OnReconnect` funcs to configure the client) (@melekes) - [rpc/jsonrpc/client/ws_client] \#6176 `NewWS` no longer accepts options (use `NewWSWithOptions` and `OnReconnect` funcs to configure the client) (@melekes)
@ -76,11 +76,12 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- [node] \#6059 Validate and complete genesis doc before saving to state store (@silasdavis) - [node] \#6059 Validate and complete genesis doc before saving to state store (@silasdavis)
- [state] \#6067 Batch save state data (@githubsands & @cmwaters) - [state] \#6067 Batch save state data (@githubsands & @cmwaters)
- [crypto] \#6120 Implement batch verification interface for ed25519 and sr25519. (@marbar3778) - [crypto] \#6120 Implement batch verification interface for ed25519 and sr25519. (@marbar3778)
- [types] \#6120 use batch verification for verifying commits signatures.
- If the key type supports the batch verification API it will try to batch verify. If the verification fails we will single verify each signature.
- [types] \#6120 use batch verification for verifying commits signatures.
- If the key type supports the batch verification API it will try to batch verify. If the verification fails we will single verify each signature.
- [privval/file] \#6185 Return error on `LoadFilePV`, `LoadFilePVEmptyState`. Allows for better programmatic control of Tendermint. - [privval/file] \#6185 Return error on `LoadFilePV`, `LoadFilePVEmptyState`. Allows for better programmatic control of Tendermint.
- [privval] \#6240 Add `context.Context` to privval interface.
- [privval] \#6240 Add `context.Context` to privval interface.
- [rpc] \#6265 set cache control in http-rpc response header (@JayT106) - [rpc] \#6265 set cache control in http-rpc response header (@JayT106)
- [statesync] \#6378 Retry requests for snapshots and add a minimum discovery time (5s) for new snapshots.
### BUG FIXES ### BUG FIXES


+ 4
- 0
config/config.go View File

@ -834,6 +834,10 @@ func (cfg *StateSyncConfig) ValidateBasic() error {
return errors.New("found empty rpc-servers entry") return errors.New("found empty rpc-servers entry")
} }
} }
if cfg.DiscoveryTime != 0 && cfg.DiscoveryTime < 5*time.Second {
return errors.New("discovery time must be 0s or greater than five seconds")
}
if cfg.TrustPeriod <= 0 { if cfg.TrustPeriod <= 0 {
return errors.New("trusted-period is required") return errors.New("trusted-period is required")
} }


+ 10
- 6
statesync/reactor.go View File

@ -477,14 +477,18 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir) r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir)
r.mtx.Unlock() r.mtx.Unlock()
// request snapshots from all currently connected peers
r.Logger.Debug("requesting snapshots from known peers")
r.snapshotCh.Out <- p2p.Envelope{
Broadcast: true,
Message: &ssproto.SnapshotsRequest{},
hook := func() {
// request snapshots from all currently connected peers
r.Logger.Debug("requesting snapshots from known peers")
r.snapshotCh.Out <- p2p.Envelope{
Broadcast: true,
Message: &ssproto.SnapshotsRequest{},
}
} }
state, commit, err := r.syncer.SyncAny(discoveryTime)
hook()
state, commit, err := r.syncer.SyncAny(discoveryTime, hook)
r.mtx.Lock() r.mtx.Lock()
r.syncer = nil r.syncer = nil


+ 2
- 2
statesync/snapshots.go View File

@ -178,8 +178,8 @@ func (p *snapshotPool) Ranked() []*snapshot {
defer p.Unlock() defer p.Unlock()
candidates := make([]*snapshot, 0, len(p.snapshots)) candidates := make([]*snapshot, 0, len(p.snapshots))
for _, snapshot := range p.snapshots {
candidates = append(candidates, snapshot)
for key := range p.snapshots {
candidates = append(candidates, p.snapshots[key])
} }
sort.Slice(candidates, func(i, j int) bool { sort.Slice(candidates, func(i, j int) bool {


+ 9
- 1
statesync/syncer.go View File

@ -24,6 +24,9 @@ const (
chunkTimeout = 2 * time.Minute chunkTimeout = 2 * time.Minute
// requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer. // requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer.
chunkRequestTimeout = 10 * time.Second chunkRequestTimeout = 10 * time.Second
// minimumDiscoveryTime is the lowest allowable time for a
// SyncAny discovery time.
minimumDiscoveryTime = 5 * time.Second
) )
var ( var (
@ -138,7 +141,11 @@ func (s *syncer) RemovePeer(peerID p2p.NodeID) {
// SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further // SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further
// snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit // snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit
// which the caller must use to bootstrap the node. // which the caller must use to bootstrap the node.
func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit, error) {
func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.State, *types.Commit, error) {
if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime {
discoveryTime = 5 * minimumDiscoveryTime
}
if discoveryTime > 0 { if discoveryTime > 0 {
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime)) s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
time.Sleep(discoveryTime) time.Sleep(discoveryTime)
@ -161,6 +168,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit,
if discoveryTime == 0 { if discoveryTime == 0 {
return sm.State{}, nil, errNoSnapshots return sm.State{}, nil, errNoSnapshots
} }
retryHook()
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime)) s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
time.Sleep(discoveryTime) time.Sleep(discoveryTime)
continue continue


+ 7
- 7
statesync/syncer_test.go View File

@ -175,7 +175,7 @@ func TestSyncer_SyncAny(t *testing.T) {
LastBlockAppHash: []byte("app_hash"), LastBlockAppHash: []byte("app_hash"),
}, nil) }, nil)
newState, lastCommit, err := rts.syncer.SyncAny(0)
newState, lastCommit, err := rts.syncer.SyncAny(0, func() {})
require.NoError(t, err) require.NoError(t, err)
wg.Wait() wg.Wait()
@ -201,7 +201,7 @@ func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
rts := setup(t, nil, nil, stateProvider, 2) rts := setup(t, nil, nil, stateProvider, 2)
_, _, err := rts.syncer.SyncAny(0)
_, _, err := rts.syncer.SyncAny(0, func() {})
require.Equal(t, errNoSnapshots, err) require.Equal(t, errNoSnapshots, err)
} }
@ -221,7 +221,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) {
Snapshot: toABCI(s), AppHash: []byte("app_hash"), Snapshot: toABCI(s), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil) }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
_, _, err = rts.syncer.SyncAny(0)
_, _, err = rts.syncer.SyncAny(0, func() {})
require.Equal(t, errAbort, err) require.Equal(t, errAbort, err)
rts.conn.AssertExpectations(t) rts.conn.AssertExpectations(t)
} }
@ -260,7 +260,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) {
Snapshot: toABCI(s11), AppHash: []byte("app_hash"), Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
_, _, err = rts.syncer.SyncAny(0)
_, _, err = rts.syncer.SyncAny(0, func() {})
require.Equal(t, errNoSnapshots, err) require.Equal(t, errNoSnapshots, err)
rts.conn.AssertExpectations(t) rts.conn.AssertExpectations(t)
} }
@ -295,7 +295,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) {
Snapshot: toABCI(s11), AppHash: []byte("app_hash"), Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil) }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
_, _, err = rts.syncer.SyncAny(0)
_, _, err = rts.syncer.SyncAny(0, func() {})
require.Equal(t, errAbort, err) require.Equal(t, errAbort, err)
rts.conn.AssertExpectations(t) rts.conn.AssertExpectations(t)
} }
@ -341,7 +341,7 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) {
Snapshot: toABCI(sa), AppHash: []byte("app_hash"), Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
_, _, err = rts.syncer.SyncAny(0)
_, _, err = rts.syncer.SyncAny(0, func() {})
require.Equal(t, errNoSnapshots, err) require.Equal(t, errNoSnapshots, err)
rts.conn.AssertExpectations(t) rts.conn.AssertExpectations(t)
} }
@ -364,7 +364,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) {
Snapshot: toABCI(s), AppHash: []byte("app_hash"), Snapshot: toABCI(s), AppHash: []byte("app_hash"),
}).Once().Return(nil, errBoom) }).Once().Return(nil, errBoom)
_, _, err = rts.syncer.SyncAny(0)
_, _, err = rts.syncer.SyncAny(0, func() {})
require.True(t, errors.Is(err, errBoom)) require.True(t, errors.Is(err, errBoom))
rts.conn.AssertExpectations(t) rts.conn.AssertExpectations(t)
} }


+ 4
- 7
test/e2e/app/snapshots.go View File

@ -2,7 +2,6 @@
package main package main
import ( import (
"crypto/sha256"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -88,11 +87,10 @@ func (s *SnapshotStore) Create(state *State) (abci.Snapshot, error) {
if err != nil { if err != nil {
return abci.Snapshot{}, err return abci.Snapshot{}, err
} }
hash := sha256.Sum256(bz)
snapshot := abci.Snapshot{ snapshot := abci.Snapshot{
Height: state.Height, Height: state.Height,
Format: 1, Format: 1,
Hash: hash[:],
Hash: hashItems(state.Values),
Chunks: byteChunks(bz), Chunks: byteChunks(bz),
} }
err = ioutil.WriteFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", state.Height)), bz, 0644) err = ioutil.WriteFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", state.Height)), bz, 0644)
@ -111,10 +109,9 @@ func (s *SnapshotStore) Create(state *State) (abci.Snapshot, error) {
func (s *SnapshotStore) List() ([]*abci.Snapshot, error) { func (s *SnapshotStore) List() ([]*abci.Snapshot, error) {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
snapshots := []*abci.Snapshot{}
for _, snapshot := range s.metadata {
s := snapshot // copy to avoid pointer to range variable
snapshots = append(snapshots, &s)
snapshots := make([]*abci.Snapshot, len(s.metadata))
for idx := range s.metadata {
snapshots[idx] = &s.metadata[idx]
} }
return snapshots, nil return snapshots, nil
} }


Loading…
Cancel
Save