diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index 2ff93a023..844cb5e32 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -297,3 +297,16 @@ func (l *peerList) All() []types.NodeID { defer l.mtx.Unlock() return l.peers } + +func (l *peerList) Contains(id types.NodeID) bool { + l.mtx.Lock() + defer l.mtx.Unlock() + + for _, p := range l.peers { + if id == p { + return true + } + } + + return false +} diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 0d8ab7f4c..a20c8ca41 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -254,11 +254,11 @@ func (r *Reactor) OnStop() { // Wait for all p2p Channels to be closed before returning. This ensures we // can easily reason about synchronization of all p2p Channels and ensure no // panics will occur. + <-r.peerUpdates.Done() <-r.snapshotCh.Done() <-r.chunkCh.Done() <-r.blockCh.Done() <-r.paramsCh.Done() - <-r.peerUpdates.Done() } // Sync runs a state sync, fetching snapshots and providing chunks to the @@ -1013,9 +1013,11 @@ func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) error { iter++ select { case <-ctx.Done(): - return fmt.Errorf("operation canceled while waiting for peers after %s", time.Since(startAt)) + return fmt.Errorf("operation canceled while waiting for peers after %.2fs [%d/%d]", + time.Since(startAt).Seconds(), r.peers.Len(), numPeers) case <-r.closeCh: - return fmt.Errorf("shutdown while waiting for peers after %s", time.Since(startAt)) + return fmt.Errorf("shutdown while waiting for peers after %.2fs [%d/%d]", + time.Since(startAt).Seconds(), r.peers.Len(), numPeers) case <-t.C: continue case <-logT.C: diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 7bcfe39af..b90e5fd78 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -525,25 +525,39 @@ func TestReactor_StateProviderP2P(t *testing.T) { rts.reactor.cfg.UseP2P = true rts.reactor.cfg.TrustHeight = 1 rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash()) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + + for _, p := range []types.NodeID{peerA, peerB} { + if !rts.reactor.peers.Contains(p) { + rts.reactor.peers.Append(p) + } + } + require.True(t, rts.reactor.peers.Len() >= 2, "peer network not configured") + + bctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ictx, cancel := context.WithTimeout(bctx, time.Second) defer cancel() rts.reactor.mtx.Lock() - err := rts.reactor.initStateProvider(ctx, factory.DefaultTestChainID, 1) + err := rts.reactor.initStateProvider(ictx, factory.DefaultTestChainID, 1) rts.reactor.mtx.Unlock() require.NoError(t, err) rts.reactor.syncer.stateProvider = rts.reactor.stateProvider - appHash, err := rts.reactor.stateProvider.AppHash(ctx, 5) + actx, cancel := context.WithTimeout(bctx, 10*time.Second) + defer cancel() + + appHash, err := rts.reactor.stateProvider.AppHash(actx, 5) require.NoError(t, err) require.Len(t, appHash, 32) - state, err := rts.reactor.stateProvider.State(ctx, 5) + state, err := rts.reactor.stateProvider.State(actx, 5) require.NoError(t, err) require.Equal(t, appHash, state.AppHash) require.Equal(t, types.DefaultConsensusParams(), &state.ConsensusParams) - commit, err := rts.reactor.stateProvider.Commit(ctx, 5) + commit, err := rts.reactor.stateProvider.Commit(actx, 5) require.NoError(t, err) require.Equal(t, commit.BlockID, state.LastBlockID)