diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index fe4488b38..ac762dff2 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -131,6 +131,7 @@ type Reactor struct { mtx sync.RWMutex peers map[types.NodeID]*PeerState waitSync bool + rs *cstypes.RoundState readySignal chan struct{} // closed when the node is ready to start consensus stateCh *p2p.Channel @@ -161,6 +162,7 @@ func NewReactor( r := &Reactor{ state: cs, waitSync: waitSync, + rs: cs.GetRoundState(), peers: make(map[types.NodeID]*PeerState), Metrics: NopMetrics(), stateCh: stateCh, @@ -198,6 +200,7 @@ func (r *Reactor) OnStart() error { go r.peerStatsRoutine() r.subscribeToBroadcastEvents() + go r.updateRoundStateRoutine() if !r.WaitSync() { if err := r.state.Start(); err != nil { @@ -443,7 +446,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep { } func (r *Reactor) sendNewRoundStepMessage(peerID types.NodeID) { - rs := r.state.GetRoundState() + rs := r.getRoundState() msg := makeRoundStepMessage(rs) select { case <-r.closeCh: @@ -455,6 +458,26 @@ func (r *Reactor) sendNewRoundStepMessage(peerID types.NodeID) { } } +func (r *Reactor) updateRoundStateRoutine() { + t := time.NewTicker(100 * time.Microsecond) + defer t.Stop() + for range t.C { + if !r.IsRunning() { + return + } + rs := r.getRoundState() + r.mtx.Lock() + r.rs = rs + r.mtx.Unlock() + } +} + +func (conR *Reactor) getRoundState() *cstypes.RoundState { + conR.mtx.RLock() + defer conR.mtx.RUnlock() + return conR.rs +} + func (r *Reactor) gossipDataForCatchup(rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) { logger := r.Logger.With("height", prs.Height).With("peer", ps.peerID) @@ -539,7 +562,7 @@ OUTER_LOOP: default: } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() // Send proposal Block parts? @@ -766,7 +789,7 @@ OUTER_LOOP: default: } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() switch logThrottle { @@ -848,7 +871,7 @@ func (r *Reactor) queryMaj23Routine(ps *PeerState) { return } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() // TODO create more reliable coppies of these // structures so the following go routines don't race diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 2a8fa392b..7e487c62e 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -862,9 +862,6 @@ func (cs *State) receiveRoutine(maxSteps int) { // state transitions on complete-proposal, 2/3-any, 2/3-one func (cs *State) handleMsg(mi msgInfo) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - var ( added bool err error @@ -876,11 +873,32 @@ func (cs *State) handleMsg(mi msgInfo) { case *ProposalMessage: // will not cause transition. // once proposal is set, we can receive block parts + cs.mtx.Lock() err = cs.setProposal(msg.Proposal) + cs.mtx.Unlock() case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit + cs.mtx.Lock() added, err = cs.addProposalBlockPart(msg, peerID) + + // We unlock here to yield to any routines that need to read the the RoundState. + // Previously, this code held the lock from the point at which the final block + // part was recieved until the block executed against the application. + // This prevented the reactor from being able to retrieve the most updated + // version of the RoundState. The reactor needs the updated RoundState to + // gossip the now completed block. + // + // This code can be further improved by either always operating on a copy + // of RoundState and only locking when switching out State's copy of + // RoundState with the updated copy or by emitting RoundState events in + // more places for routines depending on it to listen for. + cs.mtx.Unlock() + + cs.mtx.Lock() + if added && cs.ProposalBlockParts.IsComplete() { + cs.handleCompleteProposal(msg.Height) + } if added { cs.statsMsgQueue <- mi } @@ -894,14 +912,17 @@ func (cs *State) handleMsg(mi msgInfo) { ) err = nil } + cs.mtx.Unlock() case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition + cs.mtx.Lock() added, err = cs.tryAddVote(msg.Vote, peerID) if added { cs.statsMsgQueue <- mi } + cs.mtx.Unlock() // if err == ErrAddingVote { // TODO: punish peer @@ -1941,44 +1962,43 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID types.NodeID if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil { cs.Logger.Error("failed publishing event complete proposal", "err", err) } + } + return added, nil +} - // Update Valid* if we can. - prevotes := cs.Votes.Prevotes(cs.Round) - blockID, hasTwoThirds := prevotes.TwoThirdsMajority() - if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) { - if cs.ProposalBlock.HashesTo(blockID.Hash) { - cs.Logger.Debug( - "updating valid block to new proposal block", - "valid_round", cs.Round, - "valid_block_hash", cs.ProposalBlock.Hash(), - ) +func (cs *State) handleCompleteProposal(blockHeight int64) { + // Update Valid* if we can. + prevotes := cs.Votes.Prevotes(cs.Round) + blockID, hasTwoThirds := prevotes.TwoThirdsMajority() + if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) { + if cs.ProposalBlock.HashesTo(blockID.Hash) { + cs.Logger.Debug( + "updating valid block to new proposal block", + "valid_round", cs.Round, + "valid_block_hash", cs.ProposalBlock.Hash(), + ) - cs.ValidRound = cs.Round - cs.ValidBlock = cs.ProposalBlock - cs.ValidBlockParts = cs.ProposalBlockParts - } - // TODO: In case there is +2/3 majority in Prevotes set for some - // block and cs.ProposalBlock contains different block, either - // proposer is faulty or voting power of faulty processes is more - // than 1/3. We should trigger in the future accountability - // procedure at this point. + cs.ValidRound = cs.Round + cs.ValidBlock = cs.ProposalBlock + cs.ValidBlockParts = cs.ProposalBlockParts } - - if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { - // Move onto the next step - cs.enterPrevote(height, cs.Round) - if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added - cs.enterPrecommit(height, cs.Round) - } - } else if cs.Step == cstypes.RoundStepCommit { - // If we're waiting on the proposal block... - cs.tryFinalizeCommit(height) + // TODO: In case there is +2/3 majority in Prevotes set for some + // block and cs.ProposalBlock contains different block, either + // proposer is faulty or voting power of faulty processes is more + // than 1/3. We should trigger in the future accountability + // procedure at this point. + } + + if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { + // Move onto the next step + cs.enterPrevote(blockHeight, cs.Round) + if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added + cs.enterPrecommit(blockHeight, cs.Round) } - - return added, nil + } else if cs.Step == cstypes.RoundStepCommit { + // If we're waiting on the proposal block... + cs.tryFinalizeCommit(blockHeight) } - - return added, nil } // Attempt to add the vote. if its a duplicate signature, dupeout the validator