diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 46bacf5cc..42900a7d4 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -241,8 +241,12 @@ func (cs *State) GetLastHeight() int64 { // GetRoundState returns a shallow copy of the internal consensus state. func (cs *State) GetRoundState() *cstypes.RoundState { cs.mtx.RLock() + defer cs.mtx.RUnlock() + + // NOTE: this might be dodgy, as RoundState itself isn't thread + // safe as it contains a number of pointers and is explicitly + // not thread safe. rs := cs.RoundState // copy - cs.mtx.RUnlock() return &rs } diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index b3b7c81a3..9b8f68a5e 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -331,7 +331,7 @@ func TestStateFullRound1(t *testing.T) { t.Error(err) } - voteCh := subscribeUnBuffered(cs.eventBus, types.EventQueryVote) + voteCh := subscribe(cs.eventBus, types.EventQueryVote) propCh := subscribe(cs.eventBus, types.EventQueryCompleteProposal) newRoundCh := subscribe(cs.eventBus, types.EventQueryNewRound) @@ -361,7 +361,7 @@ func TestStateFullRoundNil(t *testing.T) { cs, vss := randState(config, 1) height, round := cs.Height, cs.Round - voteCh := subscribeUnBuffered(cs.eventBus, types.EventQueryVote) + voteCh := subscribe(cs.eventBus, types.EventQueryVote) cs.enterPrevote(height, round) cs.startRoutines(4) @@ -382,7 +382,7 @@ func TestStateFullRound2(t *testing.T) { vs2 := vss[1] height, round := cs1.Height, cs1.Round - voteCh := subscribeUnBuffered(cs1.eventBus, types.EventQueryVote) + voteCh := subscribe(cs1.eventBus, types.EventQueryVote) newBlockCh := subscribe(cs1.eventBus, types.EventQueryNewBlock) // start round and wait for propose and prevote @@ -428,7 +428,7 @@ func TestStateLockNoPOL(t *testing.T) { timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) - voteCh := subscribeUnBuffered(cs1.eventBus, types.EventQueryVote) + voteCh := subscribe(cs1.eventBus, types.EventQueryVote) proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) @@ -1971,12 +1971,3 @@ func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.Messa } return sub.Out() } - -// subscribe subscribes test client to the given query and returns a channel with cap = 0. -func subscribeUnBuffered(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.Message { - sub, err := eventBus.SubscribeUnbuffered(context.Background(), testSubscriber, q) - if err != nil { - panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, q)) - } - return sub.Out() -}