diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index dd1a224b9..f259cfa58 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -600,7 +600,7 @@ func TestReactor_StateProviderP2P(t *testing.T) { require.NoError(t, err) rts.reactor.syncer.stateProvider = rts.reactor.stateProvider - actx, cancel := context.WithTimeout(ctx, 10*time.Second) + actx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() appHash, err := rts.reactor.stateProvider.AppHash(actx, 5) diff --git a/internal/statesync/stateprovider.go b/internal/statesync/stateprovider.go index ebc772020..3e58ca5f4 100644 --- a/internal/statesync/stateprovider.go +++ b/internal/statesync/stateprovider.go @@ -3,7 +3,6 @@ package statesync import ( "bytes" "context" - "errors" "fmt" "math/rand" "strings" @@ -331,7 +330,7 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State, // We'll also need to fetch consensus params via P2P. state.ConsensusParams, err = s.consensusParams(ctx, currentLightBlock.Height) if err != nil { - return sm.State{}, err + return sm.State{}, fmt.Errorf("fetching consensus params: %w", err) } // validate the consensus params if !bytes.Equal(nextLightBlock.ConsensusHash, state.ConsensusParams.HashConsensusParams()) { @@ -355,80 +354,110 @@ func (s *stateProviderP2P) addProvider(p lightprovider.Provider) { // consensusParams sends out a request for consensus params blocking // until one is returned. // -// If it fails to get a valid set of consensus params from any of the -// providers it returns an error; however, it will retry indefinitely -// (with backoff) until the context is canceled. +// It attempts to send requests to all witnesses in parallel, but if +// none responds it will retry them all sometime later until it +// receives some response. This operation will block until it receives +// a response or the context is canceled. func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (types.ConsensusParams, error) { - var iterCount int64 + ctx, cancel := context.WithCancel(ctx) + defer cancel() - timer := time.NewTimer(0) - defer timer.Stop() - for { - params, err := s.tryGetConsensusParamsFromWitnesses(ctx, height) - if err != nil { - return types.ConsensusParams{}, err - } - if params != nil { - return *params, nil - } - iterCount++ + out := make(chan types.ConsensusParams) - // jitter+backoff the retry loop - timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout + - time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec - select { - case <-ctx.Done(): - return types.ConsensusParams{}, ctx.Err() - case <-timer.C: + retryAll := func() (<-chan struct{}, error) { + wg := &sync.WaitGroup{} + + for _, provider := range s.lc.Witnesses() { + p, ok := provider.(*BlockProvider) + if !ok { + return nil, fmt.Errorf("witness is not BlockProvider [%T]", provider) + } + + peer, err := types.NewNodeID(p.String()) + if err != nil { + return nil, fmt.Errorf("invalid provider (%s) node id: %w", p.String(), err) + } + + wg.Add(1) + go func(p *BlockProvider, peer types.NodeID) { + defer wg.Done() + + timer := time.NewTimer(0) + defer timer.Stop() + var iterCount int64 + + for { + iterCount++ + if err := s.paramsSendCh.Send(ctx, p2p.Envelope{ + To: peer, + Message: &ssproto.ParamsRequest{ + Height: uint64(height), + }, + }); err != nil { + // this only errors if + // the context is + // canceled which we + // don't need to + // propagate here + return + } + + // jitter+backoff the retry loop + timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout + + time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec + + select { + case <-timer.C: + continue + case <-ctx.Done(): + return + case params, ok := <-s.paramsRecvCh: + if !ok { + return + } + select { + case <-ctx.Done(): + return + case out <- params: + return + } + } + } + + }(p, peer) } + sig := make(chan struct{}) + go func() { wg.Wait(); close(sig) }() + return sig, nil } -} -// tryGetConsensusParamsFromWitnesses attempts to get consensus -// parameters from the light clients available witnesses. If both -// return parameters are nil, then it can be retried. -func (s *stateProviderP2P) tryGetConsensusParamsFromWitnesses( - ctx context.Context, - height int64, -) (*types.ConsensusParams, error) { timer := time.NewTimer(0) defer timer.Stop() - for _, provider := range s.lc.Witnesses() { - p, ok := provider.(*BlockProvider) - if !ok { - panic("expected p2p state provider to use p2p block providers") - } - // extract the nodeID of the provider - peer, err := types.NewNodeID(p.String()) + var iterCount int64 + for { + iterCount++ + sig, err := retryAll() if err != nil { - return nil, fmt.Errorf("invalid provider (%s) node id: %w", p.String(), err) - } - - if err := s.paramsSendCh.Send(ctx, p2p.Envelope{ - To: peer, - Message: &ssproto.ParamsRequest{ - Height: uint64(height), - }, - }); err != nil { - return nil, err + return types.ConsensusParams{}, err } - - timer.Reset(consensusParamsResponseTimeout) select { - // if we get no response from this provider we move on to the next one - case <-timer.C: - continue - case <-ctx.Done(): - return nil, ctx.Err() - case params, ok := <-s.paramsRecvCh: - if !ok { - return nil, errors.New("params channel closed") + case <-sig: + // jitter+backoff the retry loop + timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout + + time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec + select { + case param := <-out: + return param, nil + case <-ctx.Done(): + return types.ConsensusParams{}, ctx.Err() + case <-timer.C: } - return ¶ms, nil + case <-ctx.Done(): + return types.ConsensusParams{}, ctx.Err() + case param := <-out: + return param, nil } } - // signal to caller to retry. - return nil, nil }