From f553f4d747d010f66234453e5ecd0f02efc8a94b Mon Sep 17 00:00:00 2001 From: William Banfield Date: Wed, 9 Mar 2022 17:52:29 -0500 Subject: [PATCH] consensus: change lock handling in and handleMsg reactor for RoundState --- internal/consensus/reactor.go | 31 ++++++++++-- internal/consensus/state.go | 92 ++++++++++++++++++++++------------- 2 files changed, 84 insertions(+), 39 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index efb3f2d04..8af9e9143 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -119,6 +119,7 @@ type Reactor struct { mtx sync.RWMutex peers map[types.NodeID]*PeerState waitSync bool + rs *cstypes.RoundState readySignal chan struct{} // closed when the node is ready to start consensus stateCh *p2p.Channel @@ -166,6 +167,7 @@ func NewReactor( logger: logger, state: cs, waitSync: waitSync, + rs: cs.GetRoundState(), peers: make(map[types.NodeID]*PeerState), eventBus: eventBus, Metrics: metrics, @@ -199,6 +201,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { go r.peerStatsRoutine(ctx) r.subscribeToBroadcastEvents() + go r.updateRoundStateRoutine() if !r.WaitSync() { if err := r.state.Start(ctx); err != nil { @@ -411,10 +414,30 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep { func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID) error { return r.stateCh.Send(ctx, p2p.Envelope{ To: peerID, - Message: makeRoundStepMessage(r.state.GetRoundState()), + Message: makeRoundStepMessage(r.getRoundState()), }) } +func (r *Reactor) updateRoundStateRoutine() { + t := time.NewTicker(100 * time.Microsecond) + defer t.Stop() + for range t.C { + if !r.IsRunning() { + return + } + rs := r.getRoundState() + r.mtx.Lock() + r.rs = rs + r.mtx.Unlock() + } +} + +func (conR *Reactor) getRoundState() *cstypes.RoundState { + conR.mtx.RLock() + defer conR.mtx.RUnlock() + return conR.rs +} + func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) { logger := r.logger.With("height", prs.Height).With("peer", ps.peerID) @@ -497,7 +520,7 @@ OUTER_LOOP: default: } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() // Send proposal Block parts? @@ -755,7 +778,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) { default: } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() switch logThrottle { @@ -846,7 +869,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { return } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() // TODO create more reliable coppies of these // structures so the following go routines don't race diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 92b5b84d6..d5001b0a2 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -948,9 +948,6 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) { // state transitions on complete-proposal, 2/3-any, 2/3-one func (cs *State) handleMsg(ctx context.Context, mi msgInfo) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - var ( added bool err error @@ -962,15 +959,37 @@ func (cs *State) handleMsg(ctx context.Context, mi msgInfo) { case *ProposalMessage: // will not cause transition. // once proposal is set, we can receive block parts + cs.mtx.Lock() err = cs.setProposal(msg.Proposal, mi.ReceiveTime) + cs.mtx.Unlock() case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit + cs.mtx.Lock() added, err = cs.addProposalBlockPart(ctx, msg, peerID) + + // We unlock here to yield to any routines that need to read the the RoundState. + // Previously, this code held the lock from the point at which the final block + // part was recieved until the block executed against the application. + // This prevented the reactor from being able to retrieve the most updated + // version of the RoundState. The reactor needs the updated RoundState to + // gossip the now completed block. + // + // This code can be further improved by either always operating on a copy + // of RoundState and only locking when switching out State's copy of + // RoundState with the updated copy or by emitting RoundState events in + // more places for routines depending on it to listen for. + cs.mtx.Unlock() + + cs.mtx.Lock() + if added && cs.ProposalBlockParts.IsComplete() { + cs.handleCompleteProposal(ctx, msg.Height) + } if added { select { case cs.statsMsgQueue <- mi: case <-ctx.Done(): + cs.mtx.Unlock() return } } @@ -984,18 +1003,22 @@ func (cs *State) handleMsg(ctx context.Context, mi msgInfo) { ) err = nil } + cs.mtx.Unlock() case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition + cs.mtx.Lock() added, err = cs.tryAddVote(ctx, msg.Vote, peerID) if added { select { case cs.statsMsgQueue <- mi: case <-ctx.Done(): + cs.mtx.Unlock() return } } + cs.mtx.Unlock() // TODO: punish peer // We probably don't want to stop the peer here. The vote does not @@ -2169,44 +2192,43 @@ func (cs *State) addProposalBlockPart( if err := cs.eventBus.PublishEventCompleteProposal(ctx, cs.CompleteProposalEvent()); err != nil { cs.logger.Error("failed publishing event complete proposal", "err", err) } + } - // Update Valid* if we can. - prevotes := cs.Votes.Prevotes(cs.Round) - blockID, hasTwoThirds := prevotes.TwoThirdsMajority() - if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) { - if cs.ProposalBlock.HashesTo(blockID.Hash) { - cs.logger.Debug( - "updating valid block to new proposal block", - "valid_round", cs.Round, - "valid_block_hash", cs.ProposalBlock.Hash(), - ) + return added, nil +} +func (cs *State) handleCompleteProposal(ctx context.Context, height int64) { + // Update Valid* if we can. + prevotes := cs.Votes.Prevotes(cs.Round) + blockID, hasTwoThirds := prevotes.TwoThirdsMajority() + if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) { + if cs.ProposalBlock.HashesTo(blockID.Hash) { + cs.logger.Debug( + "updating valid block to new proposal block", + "valid_round", cs.Round, + "valid_block_hash", cs.ProposalBlock.Hash(), + ) - cs.ValidRound = cs.Round - cs.ValidBlock = cs.ProposalBlock - cs.ValidBlockParts = cs.ProposalBlockParts - } - // TODO: In case there is +2/3 majority in Prevotes set for some - // block and cs.ProposalBlock contains different block, either - // proposer is faulty or voting power of faulty processes is more - // than 1/3. We should trigger in the future accountability - // procedure at this point. + cs.ValidRound = cs.Round + cs.ValidBlock = cs.ProposalBlock + cs.ValidBlockParts = cs.ProposalBlockParts } + // TODO: In case there is +2/3 majority in Prevotes set for some + // block and cs.ProposalBlock contains different block, either + // proposer is faulty or voting power of faulty processes is more + // than 1/3. We should trigger in the future accountability + // procedure at this point. + } - if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { - // Move onto the next step - cs.enterPrevote(ctx, height, cs.Round) - if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added - cs.enterPrecommit(ctx, height, cs.Round) - } - } else if cs.Step == cstypes.RoundStepCommit { - // If we're waiting on the proposal block... - cs.tryFinalizeCommit(ctx, height) + if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { + // Move onto the next step + cs.enterPrevote(ctx, height, cs.Round) + if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added + cs.enterPrecommit(ctx, height, cs.Round) } - - return added, nil + } else if cs.Step == cstypes.RoundStepCommit { + // If we're waiting on the proposal block... + cs.tryFinalizeCommit(ctx, height) } - - return added, nil } // Attempt to add the vote. if its a duplicate signature, dupeout the validator