From eed617c2d9da5e6ba1d742b5f59940dec6682f99 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 9 Feb 2022 14:47:50 -0500 Subject: [PATCH] consensus: refactor operations in consensus queryMaj23Routine (#7791) --- internal/consensus/reactor.go | 173 +++++++++++++++------------------- 1 file changed, 78 insertions(+), 95 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 26c91b2d8..f52c7b00d 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -749,7 +749,6 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) { timer := time.NewTimer(0) defer timer.Stop() -OUTER_LOOP: for { if !r.IsRunning() { return @@ -776,7 +775,7 @@ OUTER_LOOP: if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps); err != nil { return } else if ok { - continue OUTER_LOOP + continue } } @@ -786,7 +785,7 @@ OUTER_LOOP: return } else if ok { logger.Debug("picked rs.LastCommit to send", "height", prs.Height) - continue OUTER_LOOP + continue } } @@ -800,7 +799,7 @@ OUTER_LOOP: return } else if ok { logger.Debug("picked Catchup commit to send", "height", prs.Height) - continue OUTER_LOOP + continue } } } @@ -825,7 +824,6 @@ OUTER_LOOP: return case <-timer.C: } - continue OUTER_LOOP } } @@ -835,24 +833,37 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { timer := time.NewTimer(0) defer timer.Stop() -OUTER_LOOP: + ctx, cancel := context.WithCancel(ctx) + defer cancel() + for { - if !r.IsRunning() { + if !ps.IsRunning() { return } select { case <-ctx.Done(): return - default: + case <-timer.C: + } + + if !ps.IsRunning() { + return } - // maybe send Height/Round/Prevotes - { - rs := r.state.GetRoundState() - prs := ps.GetRoundState() + rs := r.state.GetRoundState() + prs := ps.GetRoundState() + // TODO create more reliable coppies of these + // structures so the following go routines don't race + + wg := &sync.WaitGroup{} + + if rs.Height == prs.Height { + wg.Add(1) + go func(rs *cstypes.RoundState, prs *cstypes.PeerRoundState) { + defer wg.Done() - if rs.Height == prs.Height { + // maybe send Height/Round/Prevotes if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { if err := r.stateCh.Send(ctx, p2p.Envelope{ To: ps.peerID, @@ -863,25 +874,38 @@ OUTER_LOOP: BlockID: maj23.ToProto(), }, }); err != nil { - return - } - - timer.Reset(r.state.config.PeerQueryMaj23SleepDuration) - select { - case <-timer.C: - case <-ctx.Done(): - return + cancel() } } + }(rs, prs) + + if prs.ProposalPOLRound >= 0 { + wg.Add(1) + go func(rs *cstypes.RoundState, prs *cstypes.PeerRoundState) { + defer wg.Done() + + // maybe send Height/Round/ProposalPOL + if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { + if err := r.stateCh.Send(ctx, p2p.Envelope{ + To: ps.peerID, + Message: &tmcons.VoteSetMaj23{ + Height: prs.Height, + Round: prs.ProposalPOLRound, + Type: tmproto.PrevoteType, + BlockID: maj23.ToProto(), + }, + }); err != nil { + cancel() + } + } + }(rs, prs) } - } - // maybe send Height/Round/Precommits - { - rs := r.state.GetRoundState() - prs := ps.GetRoundState() + wg.Add(1) + go func(rs *cstypes.RoundState, prs *cstypes.PeerRoundState) { + defer wg.Done() - if rs.Height == prs.Height { + // maybe send Height/Round/Precommits if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { if err := r.stateCh.Send(ctx, p2p.Envelope{ To: ps.peerID, @@ -892,88 +916,47 @@ OUTER_LOOP: BlockID: maj23.ToProto(), }, }); err != nil { - return - } - - select { - case <-timer.C: - timer.Reset(r.state.config.PeerQueryMaj23SleepDuration) - case <-ctx.Done(): - return - } - } - } - } - - // maybe send Height/Round/ProposalPOL - { - rs := r.state.GetRoundState() - prs := ps.GetRoundState() - - if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 { - if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { - if err := r.stateCh.Send(ctx, p2p.Envelope{ - To: ps.peerID, - Message: &tmcons.VoteSetMaj23{ - Height: prs.Height, - Round: prs.ProposalPOLRound, - Type: tmproto.PrevoteType, - BlockID: maj23.ToProto(), - }, - }); err != nil { - return - } - - timer.Reset(r.state.config.PeerQueryMaj23SleepDuration) - select { - case <-timer.C: - case <-ctx.Done(): - return + cancel() } } - } + }(rs, prs) } // Little point sending LastCommitRound/LastCommit, these are fleeting and // non-blocking. - - // maybe send Height/CatchupCommitRound/CatchupCommit - { - prs := ps.GetRoundState() - - if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= r.state.blockStore.Height() && - prs.Height >= r.state.blockStore.Base() { - if commit := r.state.LoadCommit(prs.Height); commit != nil { - if err := r.stateCh.Send(ctx, p2p.Envelope{ - To: ps.peerID, - Message: &tmcons.VoteSetMaj23{ - Height: prs.Height, - Round: commit.Round, - Type: tmproto.PrecommitType, - BlockID: commit.BlockID.ToProto(), - }, - }); err != nil { - return - } - - timer.Reset(r.state.config.PeerQueryMaj23SleepDuration) - select { - case <-timer.C: - case <-ctx.Done(): - return + if prs.CatchupCommitRound != -1 && prs.Height > 0 { + wg.Add(1) + go func(rs *cstypes.RoundState, prs *cstypes.PeerRoundState) { + defer wg.Done() + + if prs.Height <= r.state.blockStore.Height() && prs.Height >= r.state.blockStore.Base() { + // maybe send Height/CatchupCommitRound/CatchupCommit + if commit := r.state.LoadCommit(prs.Height); commit != nil { + if err := r.stateCh.Send(ctx, p2p.Envelope{ + To: ps.peerID, + Message: &tmcons.VoteSetMaj23{ + Height: prs.Height, + Round: commit.Round, + Type: tmproto.PrecommitType, + BlockID: commit.BlockID.ToProto(), + }, + }); err != nil { + cancel() + } } } - } + }(rs, prs) } - timer.Reset(r.state.config.PeerQueryMaj23SleepDuration) + waitSignal := make(chan struct{}) + go func() { defer close(waitSignal); wg.Wait() }() + select { - case <-timer.C: + case <-waitSignal: + timer.Reset(r.state.config.PeerQueryMaj23SleepDuration) case <-ctx.Done(): return } - - continue OUTER_LOOP } }