diff --git a/internal/consensus/peer_state.go b/internal/consensus/peer_state.go index f0130f0e1..77769937d 100644 --- a/internal/consensus/peer_state.go +++ b/internal/consensus/peer_state.go @@ -43,8 +43,6 @@ type PeerState struct { running bool PRS cstypes.PeerRoundState `json:"round_state"` Stats *peerStateStats `json:"stats"` - - broadcastWG sync.WaitGroup } // NewPeerState returns a new PeerState for the given node ID. diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 873326aa5..53b026113 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -116,9 +116,10 @@ type Reactor struct { eventBus *eventbus.EventBus Metrics *Metrics - mtx sync.RWMutex - peers map[types.NodeID]*PeerState - waitSync bool + mtx sync.RWMutex + peers map[types.NodeID]*PeerState + waitSync bool + readySignal chan struct{} // closed when the node is ready to start consensus stateCh *p2p.Channel dataCh *p2p.Channel @@ -171,9 +172,14 @@ func NewReactor( voteCh: voteCh, voteSetBitsCh: voteSetBitsCh, peerUpdates: peerUpdates, + readySignal: make(chan struct{}), } r.BaseService = *service.NewBaseService(logger, "Consensus", r) + if !r.waitSync { + close(r.readySignal) + } + return r, nil } @@ -222,15 +228,6 @@ func (r *Reactor) OnStop() { if !r.WaitSync() { r.state.Wait() } - - r.mtx.Lock() - // Close and wait for each of the peers to shutdown. - // This is safe to perform with the lock since none of the peers require the - // lock to complete any of the methods that the waitgroup is waiting on. - for _, state := range r.peers { - state.broadcastWG.Wait() - } - r.mtx.Unlock() } // SetEventBus sets the reactor's event bus. @@ -263,6 +260,7 @@ func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL r.mtx.Lock() r.waitSync = false + close(r.readySignal) r.mtx.Unlock() r.Metrics.BlockSyncing.Set(0) @@ -492,8 +490,6 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState) { logger := r.logger.With("peer", ps.peerID) - defer ps.broadcastWG.Done() - timer := time.NewTimer(0) defer timer.Stop() @@ -747,8 +743,6 @@ func (r *Reactor) gossipVotesForHeight( func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) { logger := r.logger.With("peer", ps.peerID) - defer ps.broadcastWG.Done() - // XXX: simple hack to throttle logs upon sleep logThrottle := 0 @@ -838,8 +832,6 @@ OUTER_LOOP: // NOTE: `queryMaj23Routine` has a simple crude design since it only comes // into play for liveness when there's a signature DDoS attack happening. func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { - defer ps.broadcastWG.Done() - timer := time.NewTimer(0) defer timer.Stop() @@ -1005,12 +997,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda return } - var ( - ps *PeerState - ok bool - ) - - ps, ok = r.peers[peerUpdate.NodeID] + ps, ok := r.peers[peerUpdate.NodeID] if !ok { ps = NewPeerState(r.logger, peerUpdate.NodeID) r.peers[peerUpdate.NodeID] = ps @@ -1021,19 +1008,31 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // when the peer is removed. We also set the running state to ensure we // do not spawn multiple instances of the same goroutines and finally we // set the waitgroup counter so we know when all goroutines have exited. - ps.broadcastWG.Add(3) ps.SetRunning(true) - // start goroutines for this peer - go r.gossipDataRoutine(ctx, ps) - go r.gossipVotesRoutine(ctx, ps) - go r.queryMaj23Routine(ctx, ps) + go func() { + select { + case <-ctx.Done(): + return + case <-r.readySignal: + } + // do nothing if the peer has + // stopped while we've been waiting. + if !ps.IsRunning() { + return + } + // start goroutines for this peer + go r.gossipDataRoutine(ctx, ps) + go r.gossipVotesRoutine(ctx, ps) + go r.queryMaj23Routine(ctx, ps) + + // Send our state to the peer. If we're block-syncing, broadcast a + // RoundStepMessage later upon SwitchToConsensus(). + if !r.WaitSync() { + go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID) }() + } - // Send our state to the peer. If we're block-syncing, broadcast a - // RoundStepMessage later upon SwitchToConsensus(). - if !r.waitSync { - go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID) }() - } + }() } case p2p.PeerStatusDown: @@ -1041,10 +1040,6 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda if ok && ps.IsRunning() { // signal to all spawned goroutines for the peer to gracefully exit go func() { - // Wait for all spawned broadcast goroutines to exit before marking the - // peer state as no longer running and removal from the peers map. - ps.broadcastWG.Wait() - r.mtx.Lock() delete(r.peers, peerUpdate.NodeID) r.mtx.Unlock()