Browse Source

consensus: delay start of peer routines (#7753)

pull/7769/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
cb98d515dd
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 40 deletions
  1. +0
    -2
      internal/consensus/peer_state.go
  2. +33
    -38
      internal/consensus/reactor.go

+ 0
- 2
internal/consensus/peer_state.go View File

@ -43,8 +43,6 @@ type PeerState struct {
running bool running bool
PRS cstypes.PeerRoundState `json:"round_state"` PRS cstypes.PeerRoundState `json:"round_state"`
Stats *peerStateStats `json:"stats"` Stats *peerStateStats `json:"stats"`
broadcastWG sync.WaitGroup
} }
// NewPeerState returns a new PeerState for the given node ID. // NewPeerState returns a new PeerState for the given node ID.


+ 33
- 38
internal/consensus/reactor.go View File

@ -116,9 +116,10 @@ type Reactor struct {
eventBus *eventbus.EventBus eventBus *eventbus.EventBus
Metrics *Metrics Metrics *Metrics
mtx sync.RWMutex
peers map[types.NodeID]*PeerState
waitSync bool
mtx sync.RWMutex
peers map[types.NodeID]*PeerState
waitSync bool
readySignal chan struct{} // closed when the node is ready to start consensus
stateCh *p2p.Channel stateCh *p2p.Channel
dataCh *p2p.Channel dataCh *p2p.Channel
@ -171,9 +172,14 @@ func NewReactor(
voteCh: voteCh, voteCh: voteCh,
voteSetBitsCh: voteSetBitsCh, voteSetBitsCh: voteSetBitsCh,
peerUpdates: peerUpdates, peerUpdates: peerUpdates,
readySignal: make(chan struct{}),
} }
r.BaseService = *service.NewBaseService(logger, "Consensus", r) r.BaseService = *service.NewBaseService(logger, "Consensus", r)
if !r.waitSync {
close(r.readySignal)
}
return r, nil return r, nil
} }
@ -222,15 +228,6 @@ func (r *Reactor) OnStop() {
if !r.WaitSync() { if !r.WaitSync() {
r.state.Wait() r.state.Wait()
} }
r.mtx.Lock()
// Close and wait for each of the peers to shutdown.
// This is safe to perform with the lock since none of the peers require the
// lock to complete any of the methods that the waitgroup is waiting on.
for _, state := range r.peers {
state.broadcastWG.Wait()
}
r.mtx.Unlock()
} }
// SetEventBus sets the reactor's event bus. // SetEventBus sets the reactor's event bus.
@ -263,6 +260,7 @@ func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL
r.mtx.Lock() r.mtx.Lock()
r.waitSync = false r.waitSync = false
close(r.readySignal)
r.mtx.Unlock() r.mtx.Unlock()
r.Metrics.BlockSyncing.Set(0) r.Metrics.BlockSyncing.Set(0)
@ -492,8 +490,6 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState) { func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState) {
logger := r.logger.With("peer", ps.peerID) logger := r.logger.With("peer", ps.peerID)
defer ps.broadcastWG.Done()
timer := time.NewTimer(0) timer := time.NewTimer(0)
defer timer.Stop() defer timer.Stop()
@ -747,8 +743,6 @@ func (r *Reactor) gossipVotesForHeight(
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) { func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
logger := r.logger.With("peer", ps.peerID) logger := r.logger.With("peer", ps.peerID)
defer ps.broadcastWG.Done()
// XXX: simple hack to throttle logs upon sleep // XXX: simple hack to throttle logs upon sleep
logThrottle := 0 logThrottle := 0
@ -838,8 +832,6 @@ OUTER_LOOP:
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes // NOTE: `queryMaj23Routine` has a simple crude design since it only comes
// into play for liveness when there's a signature DDoS attack happening. // into play for liveness when there's a signature DDoS attack happening.
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
defer ps.broadcastWG.Done()
timer := time.NewTimer(0) timer := time.NewTimer(0)
defer timer.Stop() defer timer.Stop()
@ -1005,12 +997,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
return return
} }
var (
ps *PeerState
ok bool
)
ps, ok = r.peers[peerUpdate.NodeID]
ps, ok := r.peers[peerUpdate.NodeID]
if !ok { if !ok {
ps = NewPeerState(r.logger, peerUpdate.NodeID) ps = NewPeerState(r.logger, peerUpdate.NodeID)
r.peers[peerUpdate.NodeID] = ps r.peers[peerUpdate.NodeID] = ps
@ -1021,19 +1008,31 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// when the peer is removed. We also set the running state to ensure we // when the peer is removed. We also set the running state to ensure we
// do not spawn multiple instances of the same goroutines and finally we // do not spawn multiple instances of the same goroutines and finally we
// set the waitgroup counter so we know when all goroutines have exited. // set the waitgroup counter so we know when all goroutines have exited.
ps.broadcastWG.Add(3)
ps.SetRunning(true) ps.SetRunning(true)
// start goroutines for this peer
go r.gossipDataRoutine(ctx, ps)
go r.gossipVotesRoutine(ctx, ps)
go r.queryMaj23Routine(ctx, ps)
go func() {
select {
case <-ctx.Done():
return
case <-r.readySignal:
}
// do nothing if the peer has
// stopped while we've been waiting.
if !ps.IsRunning() {
return
}
// start goroutines for this peer
go r.gossipDataRoutine(ctx, ps)
go r.gossipVotesRoutine(ctx, ps)
go r.queryMaj23Routine(ctx, ps)
// Send our state to the peer. If we're block-syncing, broadcast a
// RoundStepMessage later upon SwitchToConsensus().
if !r.WaitSync() {
go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID) }()
}
// Send our state to the peer. If we're block-syncing, broadcast a
// RoundStepMessage later upon SwitchToConsensus().
if !r.waitSync {
go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID) }()
}
}()
} }
case p2p.PeerStatusDown: case p2p.PeerStatusDown:
@ -1041,10 +1040,6 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
if ok && ps.IsRunning() { if ok && ps.IsRunning() {
// signal to all spawned goroutines for the peer to gracefully exit // signal to all spawned goroutines for the peer to gracefully exit
go func() { go func() {
// Wait for all spawned broadcast goroutines to exit before marking the
// peer state as no longer running and removal from the peers map.
ps.broadcastWG.Wait()
r.mtx.Lock() r.mtx.Lock()
delete(r.peers, peerUpdate.NodeID) delete(r.peers, peerUpdate.NodeID)
r.mtx.Unlock() r.mtx.Unlock()


Loading…
Cancel
Save