Browse Source

consensus: change lock handling in reactor and handleMsg for RoundState

pull/8138/head
William Banfield 2 years ago
parent
commit
3ba143bf8c
No known key found for this signature in database GPG Key ID: EFAD3442BF29E3AC
2 changed files with 83 additions and 40 deletions
  1. +27
    -4
      internal/consensus/reactor.go
  2. +56
    -36
      internal/consensus/state.go

+ 27
- 4
internal/consensus/reactor.go View File

@ -131,6 +131,7 @@ type Reactor struct {
mtx sync.RWMutex
peers map[types.NodeID]*PeerState
waitSync bool
rs *cstypes.RoundState
readySignal chan struct{} // closed when the node is ready to start consensus
stateCh *p2p.Channel
@ -161,6 +162,7 @@ func NewReactor(
r := &Reactor{
state: cs,
waitSync: waitSync,
rs: cs.GetRoundState(),
peers: make(map[types.NodeID]*PeerState),
Metrics: NopMetrics(),
stateCh: stateCh,
@ -198,6 +200,7 @@ func (r *Reactor) OnStart() error {
go r.peerStatsRoutine()
r.subscribeToBroadcastEvents()
go r.updateRoundStateRoutine()
if !r.WaitSync() {
if err := r.state.Start(); err != nil {
@ -443,7 +446,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
}
func (r *Reactor) sendNewRoundStepMessage(peerID types.NodeID) {
rs := r.state.GetRoundState()
rs := r.getRoundState()
msg := makeRoundStepMessage(rs)
select {
case <-r.closeCh:
@ -455,6 +458,26 @@ func (r *Reactor) sendNewRoundStepMessage(peerID types.NodeID) {
}
}
func (r *Reactor) updateRoundStateRoutine() {
t := time.NewTicker(100 * time.Microsecond)
defer t.Stop()
for range t.C {
if !r.IsRunning() {
return
}
rs := r.getRoundState()
r.mtx.Lock()
r.rs = rs
r.mtx.Unlock()
}
}
func (conR *Reactor) getRoundState() *cstypes.RoundState {
conR.mtx.RLock()
defer conR.mtx.RUnlock()
return conR.rs
}
func (r *Reactor) gossipDataForCatchup(rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) {
logger := r.Logger.With("height", prs.Height).With("peer", ps.peerID)
@ -539,7 +562,7 @@ OUTER_LOOP:
default:
}
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState()
// Send proposal Block parts?
@ -766,7 +789,7 @@ OUTER_LOOP:
default:
}
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState()
switch logThrottle {
@ -848,7 +871,7 @@ func (r *Reactor) queryMaj23Routine(ps *PeerState) {
return
}
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState()
// TODO create more reliable coppies of these
// structures so the following go routines don't race


+ 56
- 36
internal/consensus/state.go View File

@ -862,9 +862,6 @@ func (cs *State) receiveRoutine(maxSteps int) {
// state transitions on complete-proposal, 2/3-any, 2/3-one
func (cs *State) handleMsg(mi msgInfo) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
var (
added bool
err error
@ -876,11 +873,32 @@ func (cs *State) handleMsg(mi msgInfo) {
case *ProposalMessage:
// will not cause transition.
// once proposal is set, we can receive block parts
cs.mtx.Lock()
err = cs.setProposal(msg.Proposal)
cs.mtx.Unlock()
case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
cs.mtx.Lock()
added, err = cs.addProposalBlockPart(msg, peerID)
// We unlock here to yield to any routines that need to read the the RoundState.
// Previously, this code held the lock from the point at which the final block
// part was recieved until the block executed against the application.
// This prevented the reactor from being able to retrieve the most updated
// version of the RoundState. The reactor needs the updated RoundState to
// gossip the now completed block.
//
// This code can be further improved by either always operating on a copy
// of RoundState and only locking when switching out State's copy of
// RoundState with the updated copy or by emitting RoundState events in
// more places for routines depending on it to listen for.
cs.mtx.Unlock()
cs.mtx.Lock()
if added && cs.ProposalBlockParts.IsComplete() {
cs.handleCompleteProposal(msg.Height)
}
if added {
cs.statsMsgQueue <- mi
}
@ -894,14 +912,17 @@ func (cs *State) handleMsg(mi msgInfo) {
)
err = nil
}
cs.mtx.Unlock()
case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition
cs.mtx.Lock()
added, err = cs.tryAddVote(msg.Vote, peerID)
if added {
cs.statsMsgQueue <- mi
}
cs.mtx.Unlock()
// if err == ErrAddingVote {
// TODO: punish peer
@ -1941,44 +1962,43 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID types.NodeID
if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil {
cs.Logger.Error("failed publishing event complete proposal", "err", err)
}
}
return added, nil
}
// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.Logger.Debug(
"updating valid block to new proposal block",
"valid_round", cs.Round,
"valid_block_hash", cs.ProposalBlock.Hash(),
)
func (cs *State) handleCompleteProposal(blockHeight int64) {
// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if hasTwoThirds && !blockID.IsZero() && (cs.ValidRound < cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.Logger.Debug(
"updating valid block to new proposal block",
"valid_round", cs.Round,
"valid_block_hash", cs.ProposalBlock.Hash(),
)
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
}
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
}
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(height, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(height, cs.Round)
}
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(height)
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
}
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(blockHeight, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(blockHeight, cs.Round)
}
return added, nil
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(blockHeight)
}
return added, nil
}
// Attempt to add the vote. if its a duplicate signature, dupeout the validator


Loading…
Cancel
Save