From ed68222bdd06c18f0a38c4c86b63f612b81354e3 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Thu, 10 Mar 2022 14:35:09 -0500 Subject: [PATCH] avoid calling start --- internal/consensus/byzantine_test.go | 10 ++++------ internal/consensus/common_test.go | 12 ++++++------ internal/consensus/pbts_test.go | 2 ++ internal/consensus/reactor_test.go | 25 +++++++++++++++++-------- internal/consensus/replay_file.go | 1 + internal/consensus/replay_test.go | 2 +- internal/consensus/state_test.go | 6 ++++++ internal/consensus/wal_generator.go | 1 + 8 files changed, 38 insertions(+), 21 deletions(-) diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 46bf09a2a..f08a63d52 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -97,10 +97,9 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { require.NoError(t, err) // set private validator pv := privVals[i] + cs.updateStateFromStore(ctx) cs.SetPrivValidator(ctx, pv) - cs.SetTimeoutTicker(tickerFunc()) - require.NoError(t, cs.Start(ctx)) states[i] = cs }() @@ -256,7 +255,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { if subctx.Err() != nil { return } - if err != nil { t.Errorf("waiting for subscription: %v", err) subcancel() @@ -273,14 +271,14 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { }(i, sub) i++ } - - wg.Wait() + sig := make(chan struct{}) + go func() { defer close(sig); wg.Wait() }() // don't run more assertions if we've encountered a timeout select { + case <-sig: case <-subctx.Done(): t.Fatal("encountered timeout") - default: } pubkey, err := bzNodeState.privValidator.GetPubKey(ctx) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index d6139bc77..68c54ca61 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -39,7 +39,7 @@ const ( testSubscriber = "test-client" // genesis, chain_id, priv_val - ensureTimeout = time.Millisecond * 200 + ensureTimeout = 500 * time.Millisecond ) // A cleanupFunc cleans up any config / test files created for a particular @@ -393,7 +393,7 @@ func subscribeToVoterBuffered(ctx context.Context, t *testing.T, cs *State, addr votesSub, err := cs.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: testSubscriber, Query: types.EventQueryVote, - Limit: 10}) + Limit: 1024}) if err != nil { t.Fatalf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote) } @@ -504,10 +504,8 @@ func newStateWithConfigAndBlockStore( if err != nil { t.Fatal(err) } + cs.updateStateFromStore(ctx) cs.SetPrivValidator(ctx, pv) - if err := cs.Start(ctx); err != nil { - t.Fatal(err) - } return cs } @@ -563,6 +561,8 @@ func makeState(ctx context.Context, t *testing.T, args makeStateArgs) (*State, [ // since cs1 starts at 1 incrementHeight(vss[1:]...) + cs.updateStateFromStore(ctx) + return cs, vss } @@ -827,8 +827,8 @@ func makeConsensusState( l := logger.With("validator", i, "module", "consensus") css[i] = newStateWithConfigAndBlockStore(ctx, t, l, thisConfig, state, privVals[i], app, blockStore) + css[i].updateStateFromStore(ctx) css[i].SetTimeoutTicker(tickerFunc()) - require.NoError(t, css[i].Start(ctx)) } return css, func() { diff --git a/internal/consensus/pbts_test.go b/internal/consensus/pbts_test.go index ba55f7fd9..75fa63aa5 100644 --- a/internal/consensus/pbts_test.go +++ b/internal/consensus/pbts_test.go @@ -118,6 +118,8 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat if err := cs.Start(ctx); err != nil { t.Fatal(err) } + cs.updateStateFromStore(ctx) + vss := make([]*validatorStub, validators) for i := 0; i < validators; i++ { vss[i] = newValidatorStub(privVals[i], int32(i)) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 05403e92f..e3abc67c0 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -118,14 +118,14 @@ func setup( blocksSub, err := state.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: testSubscriber, Query: types.EventQueryNewBlock, - Limit: size, + Limit: 4096, }) require.NoError(t, err) fsSub, err := state.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: testSubscriber, Query: types.EventQueryBlockSyncStatus, - Limit: size, + Limit: 1024, }) require.NoError(t, err) @@ -393,17 +393,25 @@ func TestReactorBasic(t *testing.T) { } }(sub) } + sig := make(chan struct{}) + go func() { defer close(sig); wg.Wait() }() - wg.Wait() - if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) { - t.Fatal("encountered timeout") - } select { + case <-sig: + select { + case <-ctx.Done(): + t.Fatal("encountered timeout") + case err := <-errCh: + if err != nil { + t.Fatal(err) + } + } + case <-ctx.Done(): + t.Fatal("encountered timeout") case err := <-errCh: if err != nil { t.Fatal(err) } - default: } errCh = make(chan error, len(rts.blocksyncSubs)) @@ -515,7 +523,7 @@ func TestReactorWithEvidence(t *testing.T) { cs.SetPrivValidator(ctx, pv) cs.SetTimeoutTicker(tickerFunc()) - require.NoError(t, cs.Start(ctx)) + cs.updateStateFromStore(ctx) states[i] = cs } @@ -523,6 +531,7 @@ func TestReactorWithEvidence(t *testing.T) { rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock for _, reactor := range rts.reactors { + reactor.state.updateStateFromStore(ctx) reactor.SwitchToConsensus(ctx, reactor.state.state, false) } diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index 492d1d1ee..737ffb66e 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -150,6 +150,7 @@ func (pb *playback) replayReset(ctx context.Context, count int, newStepSub event if err != nil { return err } + newCS.updateStateFromStore(ctx) newCS.startForReplay() if err := pb.fp.Close(); err != nil { diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index bb3123af2..690ec3350 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -87,6 +87,7 @@ func startNewStateAndWaitForBlock(ctx context.Context, t *testing.T, consensusRe newBlockSub, err := cs.eventBus.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: testSubscriber, Query: types.EventQueryNewBlock, + Limit: 1024, }) require.NoError(t, err) ctxto, cancel := context.WithTimeout(ctx, 120*time.Second) @@ -179,7 +180,6 @@ LOOP: // clean up WAL file from the previous iteration walFile := cs.config.WalFile() - os.Remove(walFile) // set crashing WAL csWal, err := cs.OpenWAL(ctx, walFile) diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index f008e75d3..6bae30fe2 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -178,6 +178,7 @@ func TestStateEnterProposeYesPrivValidator(t *testing.T) { defer cancel() cs, _ := makeState(ctx, t, makeStateArgs{config: config, validators: 1}) + cs.updateStateFromStore(ctx) height, round := cs.Height, cs.Round // Listen for propose timeout event @@ -706,6 +707,8 @@ func TestStateLock_POLUpdateLock(t *testing.T) { // Generate a new proposal block. cs2 := newState(ctx, t, logger, cs1.state, vs2, kvstore.NewApplication()) + cs2.updateStateFromStore(ctx) + require.NoError(t, err) propR1, propBlockR1 := decideProposal(ctx, t, cs2, vs2, vs2.Height, vs2.Round) propBlockR1Parts, err := propBlockR1.MakePartSet(partSize) @@ -998,6 +1001,7 @@ func TestStateLock_PrevoteNilWhenLockedAndDifferentProposal(t *testing.T) { incrementRound(vs2, vs3, vs4) round++ cs2 := newState(ctx, t, logger, cs1.state, vs2, kvstore.NewApplication()) + cs2.updateStateFromStore(ctx) propR1, propBlockR1 := decideProposal(ctx, t, cs2, vs2, vs2.Height, vs2.Round) propBlockR1Parts, err := propBlockR1.MakePartSet(types.BlockPartSizeBytes) require.NoError(t, err) @@ -1105,6 +1109,7 @@ func TestStateLock_POLDoesNotUnlock(t *testing.T) { round++ incrementRound(vs2, vs3, vs4) cs2 := newState(ctx, t, logger, cs1.state, vs2, kvstore.NewApplication()) + cs2.updateStateFromStore(ctx) prop, propBlock := decideProposal(ctx, t, cs2, vs2, vs2.Height, vs2.Round) propBlockParts, err := propBlock.MakePartSet(types.BlockPartSizeBytes) require.NoError(t, err) @@ -2721,6 +2726,7 @@ func subscribe( sub, err := eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ ClientID: testSubscriber, Query: q, + Limit: 1024, }) require.NoErrorf(t, err, "Failed to subscribe %q to %v: %v", testSubscriber, q, err) ch := make(chan tmpubsub.Message) diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index b11930f16..c510fcb71 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -85,6 +85,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr if err != nil { t.Fatal(err) } + consensusState.updateStateFromStore(ctx) if privValidator != nil && privValidator != (*privval.FilePV)(nil) { consensusState.SetPrivValidator(ctx, privValidator)