From 485c96b0d386d5461d397fef76ae75df0d56c885 Mon Sep 17 00:00:00 2001 From: William Banfield <4561443+williambanfield@users.noreply.github.com> Date: Fri, 18 Mar 2022 14:59:44 -0400 Subject: [PATCH] consensus: change lock handling in reactor and handleMsg for RoundState (forward-port #7994 #7992) (#8139) Related to #8157 --- internal/consensus/reactor.go | 31 +++++++++++-- internal/consensus/state.go | 82 +++++++++++++++++++++-------------- 2 files changed, 76 insertions(+), 37 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 52e547bf6..c8d296ff9 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -119,6 +119,7 @@ type Reactor struct { mtx sync.RWMutex peers map[types.NodeID]*PeerState waitSync bool + rs *cstypes.RoundState readySignal chan struct{} // closed when the node is ready to start consensus stateCh *p2p.Channel @@ -166,6 +167,7 @@ func NewReactor( logger: logger, state: cs, waitSync: waitSync, + rs: cs.GetRoundState(), peers: make(map[types.NodeID]*PeerState), eventBus: eventBus, Metrics: metrics, @@ -199,6 +201,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { go r.peerStatsRoutine(ctx) r.subscribeToBroadcastEvents() + go r.updateRoundStateRoutine() if !r.WaitSync() { if err := r.state.Start(ctx); err != nil { @@ -407,10 +410,30 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep { func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID) error { return r.stateCh.Send(ctx, p2p.Envelope{ To: peerID, - Message: makeRoundStepMessage(r.state.GetRoundState()), + Message: makeRoundStepMessage(r.getRoundState()), }) } +func (r *Reactor) updateRoundStateRoutine() { + t := time.NewTicker(100 * time.Microsecond) + defer t.Stop() + for range t.C { + if !r.IsRunning() { + return + } + rs := r.state.GetRoundState() + r.mtx.Lock() + r.rs = rs + r.mtx.Unlock() + } +} + +func (r *Reactor) getRoundState() *cstypes.RoundState { + r.mtx.RLock() + defer r.mtx.RUnlock() + return r.rs +} + func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) { logger := r.logger.With("height", prs.Height).With("peer", ps.peerID) @@ -493,7 +516,7 @@ OUTER_LOOP: default: } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() // Send proposal Block parts? @@ -751,7 +774,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) { default: } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() switch logThrottle { @@ -842,7 +865,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { return } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() // TODO create more reliable coppies of these // structures so the following go routines don't race diff --git a/internal/consensus/state.go b/internal/consensus/state.go index d82877ec1..bd79f4f83 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -934,7 +934,6 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) { func (cs *State) handleMsg(ctx context.Context, mi msgInfo) { cs.mtx.Lock() defer cs.mtx.Unlock() - var ( added bool err error @@ -951,6 +950,24 @@ func (cs *State) handleMsg(ctx context.Context, mi msgInfo) { case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit added, err = cs.addProposalBlockPart(ctx, msg, peerID) + + // We unlock here to yield to any routines that need to read the the RoundState. + // Previously, this code held the lock from the point at which the final block + // part was received until the block executed against the application. + // This prevented the reactor from being able to retrieve the most updated + // version of the RoundState. The reactor needs the updated RoundState to + // gossip the now completed block. + // + // This code can be further improved by either always operating on a copy + // of RoundState and only locking when switching out State's copy of + // RoundState with the updated copy or by emitting RoundState events in + // more places for routines depending on it to listen for. + cs.mtx.Unlock() + + cs.mtx.Lock() + if added && cs.ProposalBlockParts.IsComplete() { + cs.handleCompleteProposal(ctx, msg.Height) + } if added { select { case cs.statsMsgQueue <- mi: @@ -2153,44 +2170,43 @@ func (cs *State) addProposalBlockPart( if err := cs.eventBus.PublishEventCompleteProposal(ctx, cs.CompleteProposalEvent()); err != nil { cs.logger.Error("failed publishing event complete proposal", "err", err) } + } - // Update Valid* if we can. - prevotes := cs.Votes.Prevotes(cs.Round) - blockID, hasTwoThirds := prevotes.TwoThirdsMajority() - if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) { - if cs.ProposalBlock.HashesTo(blockID.Hash) { - cs.logger.Debug( - "updating valid block to new proposal block", - "valid_round", cs.Round, - "valid_block_hash", cs.ProposalBlock.Hash(), - ) + return added, nil +} +func (cs *State) handleCompleteProposal(ctx context.Context, height int64) { + // Update Valid* if we can. + prevotes := cs.Votes.Prevotes(cs.Round) + blockID, hasTwoThirds := prevotes.TwoThirdsMajority() + if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) { + if cs.ProposalBlock.HashesTo(blockID.Hash) { + cs.logger.Debug( + "updating valid block to new proposal block", + "valid_round", cs.Round, + "valid_block_hash", cs.ProposalBlock.Hash(), + ) - cs.ValidRound = cs.Round - cs.ValidBlock = cs.ProposalBlock - cs.ValidBlockParts = cs.ProposalBlockParts - } - // TODO: In case there is +2/3 majority in Prevotes set for some - // block and cs.ProposalBlock contains different block, either - // proposer is faulty or voting power of faulty processes is more - // than 1/3. We should trigger in the future accountability - // procedure at this point. + cs.ValidRound = cs.Round + cs.ValidBlock = cs.ProposalBlock + cs.ValidBlockParts = cs.ProposalBlockParts } + // TODO: In case there is +2/3 majority in Prevotes set for some + // block and cs.ProposalBlock contains different block, either + // proposer is faulty or voting power of faulty processes is more + // than 1/3. We should trigger in the future accountability + // procedure at this point. + } - if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { - // Move onto the next step - cs.enterPrevote(ctx, height, cs.Round) - if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added - cs.enterPrecommit(ctx, height, cs.Round) - } - } else if cs.Step == cstypes.RoundStepCommit { - // If we're waiting on the proposal block... - cs.tryFinalizeCommit(ctx, height) + if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { + // Move onto the next step + cs.enterPrevote(ctx, height, cs.Round) + if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added + cs.enterPrecommit(ctx, height, cs.Round) } - - return added, nil + } else if cs.Step == cstypes.RoundStepCommit { + // If we're waiting on the proposal block... + cs.tryFinalizeCommit(ctx, height) } - - return added, nil } // Attempt to add the vote. if its a duplicate signature, dupeout the validator