diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go index dfdb2ecf6..033b096ba 100644 --- a/internal/consensus/invalid_test.go +++ b/internal/consensus/invalid_test.go @@ -5,6 +5,7 @@ import ( "errors" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -20,7 +21,7 @@ import ( ) func TestReactorInvalidPrecommit(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() config := configSetup(t) @@ -49,14 +50,14 @@ func TestReactorInvalidPrecommit(t *testing.T) { byzState := rts.states[node.NodeID] byzReactor := rts.reactors[node.NodeID] - calledDoPrevote := false + signal := make(chan struct{}) // Update the doPrevote function to just send a valid precommit for a random // block and otherwise disable the priv validator. byzState.mtx.Lock() privVal := byzState.privValidator byzState.doPrevote = func(ctx context.Context, height int64, round int32) { + defer close(signal) invalidDoPrevoteFunc(ctx, t, height, round, byzState, byzReactor, privVal) - calledDoPrevote = true } byzState.mtx.Unlock() @@ -72,16 +73,30 @@ func TestReactorInvalidPrecommit(t *testing.T) { go func(s eventbus.Subscription) { defer wg.Done() _, err := s.Next(ctx) + if ctx.Err() != nil { + return + } if !assert.NoError(t, err) { cancel() // cancel other subscribers on failure } }(sub) } } + wait := make(chan struct{}) + go func() { defer close(wait); wg.Wait() }() - wg.Wait() - if !calledDoPrevote { - t.Fatal("test failed to run core logic") + select { + case <-wait: + if _, ok := <-signal; !ok { + t.Fatal("test condition did not fire") + } + case <-ctx.Done(): + if _, ok := <-signal; !ok { + t.Fatal("test condition did not fire after timeout") + return + } + case <-signal: + // test passed } } @@ -130,19 +145,27 @@ func invalidDoPrevoteFunc( cs.privValidator = nil // disable priv val so we don't do normal votes cs.mtx.Unlock() - count := 0 + r.mtx.Lock() + ids := make([]types.NodeID, 0, len(r.peers)) for _, ps := range r.peers { + ids = append(ids, ps.peerID) + } + r.mtx.Unlock() + + count := 0 + for _, peerID := range ids { count++ err := r.voteCh.Send(ctx, p2p.Envelope{ - To: ps.peerID, + To: peerID, Message: &tmcons.Vote{ Vote: precommit.ToProto(), }, }) // we want to have sent some of these votes, // but if the test completes without erroring - // and we get here, we shouldn't error - if errors.Is(err, context.Canceled) && count > 1 { + // or not sending any messages, then we should + // error. + if errors.Is(err, context.Canceled) && count > 0 { break } require.NoError(t, err) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 220cc0741..ff6ffef3d 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -894,14 +894,11 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) { } } - rs := cs.RoundState - var mi msgInfo - select { case <-cs.txNotifier.TxsAvailable(): cs.handleTxsAvailable(ctx) - case mi = <-cs.peerMsgQueue: + case mi := <-cs.peerMsgQueue: if err := cs.wal.Write(mi); err != nil { cs.logger.Error("failed writing to WAL", "err", err) } @@ -910,7 +907,7 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) { // may generate internal events (votes, complete proposals, 2/3 majorities) cs.handleMsg(ctx, mi) - case mi = <-cs.internalMsgQueue: + case mi := <-cs.internalMsgQueue: err := cs.wal.WriteSync(mi) // NOTE: fsync if err != nil { panic(fmt.Sprintf( @@ -929,7 +926,7 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) { // if the timeout is relevant to the rs // go to the next step - cs.handleTimeout(ctx, ti, rs) + cs.handleTimeout(ctx, ti, cs.RoundState) case <-ctx.Done(): onExit(cs)