Browse Source

statesync: avoid compounding retry logic for fetching consensus parameters (#8032)

We're waiting between trying witnesses (which shouldn't be neccessary
because the witnesses shouldn't depend on each other,) and also
between *attempts*, and really the outer sleep should be enough.
jasmina/4457_block_sync_verification
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
a965f03c15
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 92 additions and 63 deletions
  1. +1
    -1
      internal/statesync/reactor_test.go
  2. +91
    -62
      internal/statesync/stateprovider.go

+ 1
- 1
internal/statesync/reactor_test.go View File

@ -600,7 +600,7 @@ func TestReactor_StateProviderP2P(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
rts.reactor.syncer.stateProvider = rts.reactor.stateProvider rts.reactor.syncer.stateProvider = rts.reactor.stateProvider
actx, cancel := context.WithTimeout(ctx, 10*time.Second)
actx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel() defer cancel()
appHash, err := rts.reactor.stateProvider.AppHash(actx, 5) appHash, err := rts.reactor.stateProvider.AppHash(actx, 5)


+ 91
- 62
internal/statesync/stateprovider.go View File

@ -3,7 +3,6 @@ package statesync
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"strings" "strings"
@ -331,7 +330,7 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State,
// We'll also need to fetch consensus params via P2P. // We'll also need to fetch consensus params via P2P.
state.ConsensusParams, err = s.consensusParams(ctx, currentLightBlock.Height) state.ConsensusParams, err = s.consensusParams(ctx, currentLightBlock.Height)
if err != nil { if err != nil {
return sm.State{}, err
return sm.State{}, fmt.Errorf("fetching consensus params: %w", err)
} }
// validate the consensus params // validate the consensus params
if !bytes.Equal(nextLightBlock.ConsensusHash, state.ConsensusParams.HashConsensusParams()) { if !bytes.Equal(nextLightBlock.ConsensusHash, state.ConsensusParams.HashConsensusParams()) {
@ -355,80 +354,110 @@ func (s *stateProviderP2P) addProvider(p lightprovider.Provider) {
// consensusParams sends out a request for consensus params blocking // consensusParams sends out a request for consensus params blocking
// until one is returned. // until one is returned.
// //
// If it fails to get a valid set of consensus params from any of the
// providers it returns an error; however, it will retry indefinitely
// (with backoff) until the context is canceled.
// It attempts to send requests to all witnesses in parallel, but if
// none responds it will retry them all sometime later until it
// receives some response. This operation will block until it receives
// a response or the context is canceled.
func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (types.ConsensusParams, error) { func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (types.ConsensusParams, error) {
var iterCount int64
ctx, cancel := context.WithCancel(ctx)
defer cancel()
timer := time.NewTimer(0)
defer timer.Stop()
for {
params, err := s.tryGetConsensusParamsFromWitnesses(ctx, height)
if err != nil {
return types.ConsensusParams{}, err
}
if params != nil {
return *params, nil
}
iterCount++
out := make(chan types.ConsensusParams)
// jitter+backoff the retry loop
timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout +
time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec
select {
case <-ctx.Done():
return types.ConsensusParams{}, ctx.Err()
case <-timer.C:
retryAll := func() (<-chan struct{}, error) {
wg := &sync.WaitGroup{}
for _, provider := range s.lc.Witnesses() {
p, ok := provider.(*BlockProvider)
if !ok {
return nil, fmt.Errorf("witness is not BlockProvider [%T]", provider)
}
peer, err := types.NewNodeID(p.String())
if err != nil {
return nil, fmt.Errorf("invalid provider (%s) node id: %w", p.String(), err)
}
wg.Add(1)
go func(p *BlockProvider, peer types.NodeID) {
defer wg.Done()
timer := time.NewTimer(0)
defer timer.Stop()
var iterCount int64
for {
iterCount++
if err := s.paramsSendCh.Send(ctx, p2p.Envelope{
To: peer,
Message: &ssproto.ParamsRequest{
Height: uint64(height),
},
}); err != nil {
// this only errors if
// the context is
// canceled which we
// don't need to
// propagate here
return
}
// jitter+backoff the retry loop
timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout +
time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec
select {
case <-timer.C:
continue
case <-ctx.Done():
return
case params, ok := <-s.paramsRecvCh:
if !ok {
return
}
select {
case <-ctx.Done():
return
case out <- params:
return
}
}
}
}(p, peer)
} }
sig := make(chan struct{})
go func() { wg.Wait(); close(sig) }()
return sig, nil
} }
}
// tryGetConsensusParamsFromWitnesses attempts to get consensus
// parameters from the light clients available witnesses. If both
// return parameters are nil, then it can be retried.
func (s *stateProviderP2P) tryGetConsensusParamsFromWitnesses(
ctx context.Context,
height int64,
) (*types.ConsensusParams, error) {
timer := time.NewTimer(0) timer := time.NewTimer(0)
defer timer.Stop() defer timer.Stop()
for _, provider := range s.lc.Witnesses() {
p, ok := provider.(*BlockProvider)
if !ok {
panic("expected p2p state provider to use p2p block providers")
}
// extract the nodeID of the provider
peer, err := types.NewNodeID(p.String())
var iterCount int64
for {
iterCount++
sig, err := retryAll()
if err != nil { if err != nil {
return nil, fmt.Errorf("invalid provider (%s) node id: %w", p.String(), err)
}
if err := s.paramsSendCh.Send(ctx, p2p.Envelope{
To: peer,
Message: &ssproto.ParamsRequest{
Height: uint64(height),
},
}); err != nil {
return nil, err
return types.ConsensusParams{}, err
} }
timer.Reset(consensusParamsResponseTimeout)
select { select {
// if we get no response from this provider we move on to the next one
case <-timer.C:
continue
case <-ctx.Done():
return nil, ctx.Err()
case params, ok := <-s.paramsRecvCh:
if !ok {
return nil, errors.New("params channel closed")
case <-sig:
// jitter+backoff the retry loop
timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout +
time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec
select {
case param := <-out:
return param, nil
case <-ctx.Done():
return types.ConsensusParams{}, ctx.Err()
case <-timer.C:
} }
return &params, nil
case <-ctx.Done():
return types.ConsensusParams{}, ctx.Err()
case param := <-out:
return param, nil
} }
} }
// signal to caller to retry.
return nil, nil
} }

Loading…
Cancel
Save