Browse Source

consensus: change lock handling in and handleMsg reactor for RoundState

pull/8139/head
William Banfield 3 years ago
parent
commit
f553f4d747
No known key found for this signature in database GPG Key ID: EFAD3442BF29E3AC
2 changed files with 84 additions and 39 deletions
  1. +27
    -4
      internal/consensus/reactor.go
  2. +57
    -35
      internal/consensus/state.go

+ 27
- 4
internal/consensus/reactor.go View File

@ -119,6 +119,7 @@ type Reactor struct {
mtx sync.RWMutex mtx sync.RWMutex
peers map[types.NodeID]*PeerState peers map[types.NodeID]*PeerState
waitSync bool waitSync bool
rs *cstypes.RoundState
readySignal chan struct{} // closed when the node is ready to start consensus readySignal chan struct{} // closed when the node is ready to start consensus
stateCh *p2p.Channel stateCh *p2p.Channel
@ -166,6 +167,7 @@ func NewReactor(
logger: logger, logger: logger,
state: cs, state: cs,
waitSync: waitSync, waitSync: waitSync,
rs: cs.GetRoundState(),
peers: make(map[types.NodeID]*PeerState), peers: make(map[types.NodeID]*PeerState),
eventBus: eventBus, eventBus: eventBus,
Metrics: metrics, Metrics: metrics,
@ -199,6 +201,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
go r.peerStatsRoutine(ctx) go r.peerStatsRoutine(ctx)
r.subscribeToBroadcastEvents() r.subscribeToBroadcastEvents()
go r.updateRoundStateRoutine()
if !r.WaitSync() { if !r.WaitSync() {
if err := r.state.Start(ctx); err != nil { if err := r.state.Start(ctx); err != nil {
@ -411,10 +414,30 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID) error { func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID) error {
return r.stateCh.Send(ctx, p2p.Envelope{ return r.stateCh.Send(ctx, p2p.Envelope{
To: peerID, To: peerID,
Message: makeRoundStepMessage(r.state.GetRoundState()),
Message: makeRoundStepMessage(r.getRoundState()),
}) })
} }
func (r *Reactor) updateRoundStateRoutine() {
t := time.NewTicker(100 * time.Microsecond)
defer t.Stop()
for range t.C {
if !r.IsRunning() {
return
}
rs := r.getRoundState()
r.mtx.Lock()
r.rs = rs
r.mtx.Unlock()
}
}
func (conR *Reactor) getRoundState() *cstypes.RoundState {
conR.mtx.RLock()
defer conR.mtx.RUnlock()
return conR.rs
}
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) { func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) {
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID) logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
@ -497,7 +520,7 @@ OUTER_LOOP:
default: default:
} }
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState() prs := ps.GetRoundState()
// Send proposal Block parts? // Send proposal Block parts?
@ -755,7 +778,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
default: default:
} }
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState() prs := ps.GetRoundState()
switch logThrottle { switch logThrottle {
@ -846,7 +869,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
return return
} }
rs := r.state.GetRoundState()
rs := r.getRoundState()
prs := ps.GetRoundState() prs := ps.GetRoundState()
// TODO create more reliable coppies of these // TODO create more reliable coppies of these
// structures so the following go routines don't race // structures so the following go routines don't race


+ 57
- 35
internal/consensus/state.go View File

@ -948,9 +948,6 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) {
// state transitions on complete-proposal, 2/3-any, 2/3-one // state transitions on complete-proposal, 2/3-any, 2/3-one
func (cs *State) handleMsg(ctx context.Context, mi msgInfo) { func (cs *State) handleMsg(ctx context.Context, mi msgInfo) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
var ( var (
added bool added bool
err error err error
@ -962,15 +959,37 @@ func (cs *State) handleMsg(ctx context.Context, mi msgInfo) {
case *ProposalMessage: case *ProposalMessage:
// will not cause transition. // will not cause transition.
// once proposal is set, we can receive block parts // once proposal is set, we can receive block parts
cs.mtx.Lock()
err = cs.setProposal(msg.Proposal, mi.ReceiveTime) err = cs.setProposal(msg.Proposal, mi.ReceiveTime)
cs.mtx.Unlock()
case *BlockPartMessage: case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
cs.mtx.Lock()
added, err = cs.addProposalBlockPart(ctx, msg, peerID) added, err = cs.addProposalBlockPart(ctx, msg, peerID)
// We unlock here to yield to any routines that need to read the the RoundState.
// Previously, this code held the lock from the point at which the final block
// part was recieved until the block executed against the application.
// This prevented the reactor from being able to retrieve the most updated
// version of the RoundState. The reactor needs the updated RoundState to
// gossip the now completed block.
//
// This code can be further improved by either always operating on a copy
// of RoundState and only locking when switching out State's copy of
// RoundState with the updated copy or by emitting RoundState events in
// more places for routines depending on it to listen for.
cs.mtx.Unlock()
cs.mtx.Lock()
if added && cs.ProposalBlockParts.IsComplete() {
cs.handleCompleteProposal(ctx, msg.Height)
}
if added { if added {
select { select {
case cs.statsMsgQueue <- mi: case cs.statsMsgQueue <- mi:
case <-ctx.Done(): case <-ctx.Done():
cs.mtx.Unlock()
return return
} }
} }
@ -984,18 +1003,22 @@ func (cs *State) handleMsg(ctx context.Context, mi msgInfo) {
) )
err = nil err = nil
} }
cs.mtx.Unlock()
case *VoteMessage: case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature // attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition // if the vote gives us a 2/3-any or 2/3-one, we transition
cs.mtx.Lock()
added, err = cs.tryAddVote(ctx, msg.Vote, peerID) added, err = cs.tryAddVote(ctx, msg.Vote, peerID)
if added { if added {
select { select {
case cs.statsMsgQueue <- mi: case cs.statsMsgQueue <- mi:
case <-ctx.Done(): case <-ctx.Done():
cs.mtx.Unlock()
return return
} }
} }
cs.mtx.Unlock()
// TODO: punish peer // TODO: punish peer
// We probably don't want to stop the peer here. The vote does not // We probably don't want to stop the peer here. The vote does not
@ -2169,44 +2192,43 @@ func (cs *State) addProposalBlockPart(
if err := cs.eventBus.PublishEventCompleteProposal(ctx, cs.CompleteProposalEvent()); err != nil { if err := cs.eventBus.PublishEventCompleteProposal(ctx, cs.CompleteProposalEvent()); err != nil {
cs.logger.Error("failed publishing event complete proposal", "err", err) cs.logger.Error("failed publishing event complete proposal", "err", err)
} }
}
// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.logger.Debug(
"updating valid block to new proposal block",
"valid_round", cs.Round,
"valid_block_hash", cs.ProposalBlock.Hash(),
)
return added, nil
}
func (cs *State) handleCompleteProposal(ctx context.Context, height int64) {
// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.logger.Debug(
"updating valid block to new proposal block",
"valid_round", cs.Round,
"valid_block_hash", cs.ProposalBlock.Hash(),
)
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
}
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
} }
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
}
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(ctx, height, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(ctx, height, cs.Round)
}
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(ctx, height)
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(ctx, height, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(ctx, height, cs.Round)
} }
return added, nil
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(ctx, height)
} }
return added, nil
} }
// Attempt to add the vote. if its a duplicate signature, dupeout the validator // Attempt to add the vote. if its a duplicate signature, dupeout the validator


Loading…
Cancel
Save