Browse Source

consensus: change lock handling in reactor and handleMsg for RoundState (forward-port #7994 #7992) (#8138)

This ports the changes from #7992 and #7994 from the v0.34.x branch to v0.35.x.
pull/8168/head
William Banfield 3 years ago
committed by GitHub
parent
commit
114a41f6cc
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 38 deletions
  1. +27
    -4
      internal/consensus/reactor.go
  2. +50
    -34
      internal/consensus/state.go

+ 27
- 4
internal/consensus/reactor.go View File

@ -131,6 +131,7 @@ type Reactor struct {
mtx sync.RWMutex mtx sync.RWMutex
peers map[types.NodeID]*PeerState peers map[types.NodeID]*PeerState
waitSync bool waitSync bool
rs *cstypes.RoundState
readySignal chan struct{} // closed when the node is ready to start consensus readySignal chan struct{} // closed when the node is ready to start consensus
stateCh *p2p.Channel stateCh *p2p.Channel
@ -161,6 +162,7 @@ func NewReactor(
r := &Reactor{ r := &Reactor{
state: cs, state: cs,
waitSync: waitSync, waitSync: waitSync,
rs: cs.GetRoundState(),
peers: make(map[types.NodeID]*PeerState), peers: make(map[types.NodeID]*PeerState),
Metrics: NopMetrics(), Metrics: NopMetrics(),
stateCh: stateCh, stateCh: stateCh,
@ -198,6 +200,7 @@ func (r *Reactor) OnStart() error {
go r.peerStatsRoutine() go r.peerStatsRoutine()
r.subscribeToBroadcastEvents() r.subscribeToBroadcastEvents()
go r.updateRoundStateRoutine()
if !r.WaitSync() { if !r.WaitSync() {
if err := r.state.Start(); err != nil { if err := r.state.Start(); err != nil {
@ -443,7 +446,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
} }
func (r *Reactor) sendNewRoundStepMessage(peerID types.NodeID) { func (r *Reactor) sendNewRoundStepMessage(peerID types.NodeID) {
rs := r.state.GetRoundState()
rs := r.getRoundState()
msg := makeRoundStepMessage(rs) msg := makeRoundStepMessage(rs)
select { select {
case <-r.closeCh: case <-r.closeCh:
@ -455,6 +458,26 @@ func (r *Reactor) sendNewRoundStepMessage(peerID types.NodeID) {
} }
} }
func (r *Reactor) updateRoundStateRoutine() {
t := time.NewTicker(100 * time.Microsecond)
defer t.Stop()
for range t.C {
if !r.IsRunning() {
return
}
rs := r.state.GetRoundState()
r.mtx.Lock()
r.rs = rs
r.mtx.Unlock()
}
}
func (r *Reactor) getRoundState() *cstypes.RoundState {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.rs
}
func (r *Reactor) gossipDataForCatchup(rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) { func (r *Reactor) gossipDataForCatchup(rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) {
logger := r.Logger.With("height", prs.Height).With("peer", ps.peerID) logger := r.Logger.With("height", prs.Height).With("peer", ps.peerID)
@ -539,7 +562,7 @@ OUTER_LOOP:
default: default:
} }
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState() prs := ps.GetRoundState()
// Send proposal Block parts? // Send proposal Block parts?
@ -766,7 +789,7 @@ OUTER_LOOP:
default: default:
} }
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState() prs := ps.GetRoundState()
switch logThrottle { switch logThrottle {
@ -848,7 +871,7 @@ func (r *Reactor) queryMaj23Routine(ps *PeerState) {
return return
} }
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState() prs := ps.GetRoundState()
// TODO create more reliable coppies of these // TODO create more reliable coppies of these
// structures so the following go routines don't race // structures so the following go routines don't race


+ 50
- 34
internal/consensus/state.go View File

@ -864,7 +864,6 @@ func (cs *State) receiveRoutine(maxSteps int) {
func (cs *State) handleMsg(mi msgInfo) { func (cs *State) handleMsg(mi msgInfo) {
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock() defer cs.mtx.Unlock()
var ( var (
added bool added bool
err error err error
@ -881,6 +880,24 @@ func (cs *State) handleMsg(mi msgInfo) {
case *BlockPartMessage: case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
added, err = cs.addProposalBlockPart(msg, peerID) added, err = cs.addProposalBlockPart(msg, peerID)
// We unlock here to yield to any routines that need to read the the RoundState.
// Previously, this code held the lock from the point at which the final block
// part was received until the block executed against the application.
// This prevented the reactor from being able to retrieve the most updated
// version of the RoundState. The reactor needs the updated RoundState to
// gossip the now completed block.
//
// This code can be further improved by either always operating on a copy
// of RoundState and only locking when switching out State's copy of
// RoundState with the updated copy or by emitting RoundState events in
// more places for routines depending on it to listen for.
cs.mtx.Unlock()
cs.mtx.Lock()
if added && cs.ProposalBlockParts.IsComplete() {
cs.handleCompleteProposal(msg.Height)
}
if added { if added {
cs.statsMsgQueue <- mi cs.statsMsgQueue <- mi
} }
@ -1941,44 +1958,43 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID types.NodeID
if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil { if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil {
cs.Logger.Error("failed publishing event complete proposal", "err", err) cs.Logger.Error("failed publishing event complete proposal", "err", err)
} }
}
return added, nil
}
// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.Logger.Debug(
"updating valid block to new proposal block",
"valid_round", cs.Round,
"valid_block_hash", cs.ProposalBlock.Hash(),
)
func (cs *State) handleCompleteProposal(blockHeight int64) {
// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.Logger.Debug(
"updating valid block to new proposal block",
"valid_round", cs.Round,
"valid_block_hash", cs.ProposalBlock.Hash(),
)
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
}
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
} }
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(height, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(height, cs.Round)
}
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(height)
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
}
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(blockHeight, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(blockHeight, cs.Round)
} }
return added, nil
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(blockHeight)
} }
return added, nil
} }
// Attempt to add the vote. if its a duplicate signature, dupeout the validator // Attempt to add the vote. if its a duplicate signature, dupeout the validator


Loading…
Cancel
Save