Browse Source

statesync: avoid compounding retry logic for fetching consensus parameters (backport #8032) (#8041)

(cherry picked from commit a965f03c15)
pull/8024/head
mergify[bot] 2 years ago
committed by GitHub
parent
commit
ca69da7533
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 60 deletions
  1. +1
    -1
      internal/statesync/reactor_test.go
  2. +93
    -59
      internal/statesync/stateprovider.go

+ 1
- 1
internal/statesync/reactor_test.go View File

@ -560,7 +560,7 @@ func TestReactor_StateProviderP2P(t *testing.T) {
require.NoError(t, err)
rts.reactor.syncer.stateProvider = rts.reactor.stateProvider
actx, cancel := context.WithTimeout(bctx, 10*time.Second)
actx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
appHash, err := rts.reactor.stateProvider.AppHash(actx, 5)


+ 93
- 59
internal/statesync/stateprovider.go View File

@ -3,9 +3,10 @@ package statesync
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"strings"
"sync"
"time"
dbm "github.com/tendermint/tm-db"
@ -328,7 +329,7 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State,
// We'll also need to fetch consensus params via P2P.
state.ConsensusParams, err = s.consensusParams(ctx, currentLightBlock.Height)
if err != nil {
return sm.State{}, err
return sm.State{}, fmt.Errorf("fetching consensus params: %w", err)
}
// validate the consensus params
if !bytes.Equal(nextLightBlock.ConsensusHash, state.ConsensusParams.HashConsensusParams()) {
@ -352,73 +353,106 @@ func (s *stateProviderP2P) addProvider(p lightprovider.Provider) {
// consensusParams sends out a request for consensus params blocking
// until one is returned.
//
// If it fails to get a valid set of consensus params from any of the
// providers it returns an error; however, it will retry indefinitely
// (with backoff) until the context is canceled.
// It attempts to send requests to all witnesses in parallel, but if
// none responds it will retry them all sometime later until it
// receives some response. This operation will block until it receives
// a response or the context is canceled.
func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (types.ConsensusParams, error) {
iterCount := 0
for {
params, err := s.tryGetConsensusParamsFromWitnesses(ctx, height)
if err != nil {
return types.ConsensusParams{}, err
}
if params != nil {
return *params, nil
}
iterCount++
ctx, cancel := context.WithCancel(ctx)
defer cancel()
select {
case <-ctx.Done():
return types.ConsensusParams{}, ctx.Err()
case <-time.After(time.Duration(iterCount) * consensusParamsResponseTimeout):
out := make(chan types.ConsensusParams)
retryAll := func() (<-chan struct{}, error) {
wg := &sync.WaitGroup{}
for _, provider := range s.lc.Witnesses() {
p, ok := provider.(*BlockProvider)
if !ok {
return nil, fmt.Errorf("witness is not BlockProvider [%T]", provider)
}
peer, err := types.NewNodeID(p.String())
if err != nil {
return nil, fmt.Errorf("invalid provider (%s) node id: %w", p.String(), err)
}
wg.Add(1)
go func(p *BlockProvider, peer types.NodeID, requestCh chan<- p2p.Envelope, responseCh <-chan types.ConsensusParams) {
defer wg.Done()
timer := time.NewTimer(0)
defer timer.Stop()
var iterCount int64
for {
iterCount++
select {
case s.paramsSendCh <- p2p.Envelope{
To: peer,
Message: &ssproto.ParamsRequest{
Height: uint64(height),
},
}:
case <-ctx.Done():
return
}
// jitter+backoff the retry loop
timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout +
time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec
select {
case <-timer.C:
continue
case <-ctx.Done():
return
case params, ok := <-responseCh:
if !ok {
return
}
select {
case <-ctx.Done():
return
case out <- params:
return
}
}
}
}(p, peer, s.paramsSendCh, s.paramsRecvCh)
}
sig := make(chan struct{})
go func() { wg.Wait(); close(sig) }()
return sig, nil
}
}
// tryGetConsensusParamsFromWitnesses attempts to get consensus
// parameters from the light clients available witnesses. If both
// return parameters are nil, then it can be retried.
func (s *stateProviderP2P) tryGetConsensusParamsFromWitnesses(
ctx context.Context,
height int64,
) (*types.ConsensusParams, error) {
for _, provider := range s.lc.Witnesses() {
p, ok := provider.(*BlockProvider)
if !ok {
panic("expected p2p state provider to use p2p block providers")
}
timer := time.NewTimer(0)
defer timer.Stop()
// extract the nodeID of the provider
peer, err := types.NewNodeID(p.String())
var iterCount int64
for {
iterCount++
sig, err := retryAll()
if err != nil {
return nil, fmt.Errorf("invalid provider (%s) node id: %w", p.String(), err)
}
select {
case s.paramsSendCh <- p2p.Envelope{
To: peer,
Message: &ssproto.ParamsRequest{
Height: uint64(height),
},
}:
case <-ctx.Done():
return nil, ctx.Err()
return types.ConsensusParams{}, err
}
select {
// if we get no response from this provider we move on to the next one
case <-time.After(consensusParamsResponseTimeout):
continue
case <-ctx.Done():
return nil, ctx.Err()
case params, ok := <-s.paramsRecvCh:
if !ok {
return nil, errors.New("params channel closed")
case <-sig:
// jitter+backoff the retry loop
timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout +
time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec
select {
case param := <-out:
return param, nil
case <-ctx.Done():
return types.ConsensusParams{}, ctx.Err()
case <-timer.C:
}
return &params, nil
case <-ctx.Done():
return types.ConsensusParams{}, ctx.Err()
case param := <-out:
return param, nil
}
}
// signal to caller to retry.
return nil, nil
}

Loading…
Cancel
Save