From 3269134b789bdb2b3b150bbfbc42c90a7234cdf2 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Tue, 22 Mar 2022 14:56:14 -0400 Subject: [PATCH] evidence: use fewer tracking threads --- internal/evidence/reactor.go | 119 ++++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 49 deletions(-) diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 1011f732f..29df8ece2 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -52,7 +52,7 @@ type Reactor struct { mtx sync.Mutex - peerRoutines map[types.NodeID]context.CancelFunc + peerRoutines map[types.NodeID]struct{} } // NewReactor returns a reference to a new evidence reactor, which implements the @@ -75,7 +75,7 @@ func NewReactor( evpool: evpool, evidenceCh: evidenceCh, peerUpdates: peerUpdates, - peerRoutines: make(map[types.NodeID]context.CancelFunc), + peerRoutines: make(map[types.NodeID]struct{}), } r.BaseService = *service.NewBaseService(logger, "Evidence", r) @@ -90,6 +90,7 @@ func NewReactor( func (r *Reactor) OnStart(ctx context.Context) error { go r.processEvidenceCh(ctx) go r.processPeerUpdates(ctx) + go r.broadcastEvidenceLoop(ctx) return nil } @@ -209,22 +210,15 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // a new done channel so we can explicitly close the goroutine if the peer // is later removed, we increment the waitgroup so the reactor can stop // safely, and finally start the goroutine to broadcast evidence to that peer. - _, ok := r.peerRoutines[peerUpdate.NodeID] - if !ok { - pctx, pcancel := context.WithCancel(ctx) - r.peerRoutines[peerUpdate.NodeID] = pcancel - go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID) - } + + r.peerRoutines[peerUpdate.NodeID] = struct{}{} case p2p.PeerStatusDown: // Check if we've started an evidence broadcasting goroutine for this peer. // If we have, we signal to terminate the goroutine via the channel's closure. // This will internally decrement the peer waitgroup and remove the peer // from the map of peer evidence broadcasting goroutines. - closer, ok := r.peerRoutines[peerUpdate.NodeID] - if ok { - closer() - } + delete(r.peerRoutines, peerUpdate.NodeID) } } @@ -243,33 +237,16 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { } // broadcastEvidenceLoop starts a blocking process that continuously reads pieces -// of evidence off of a linked-list and sends the evidence in a p2p Envelope to -// the given peer by ID. This should be invoked in a goroutine per unique peer -// ID via an appropriate PeerUpdate. The goroutine can be signaled to gracefully -// exit by either explicitly closing the provided doneCh or by the reactor -// signaling to stop. +// of evidence off of a linked-list and sends the evidence to all +// currently connected peers. // // TODO: This should be refactored so that we do not blindly gossip evidence // that the peer has already received or may not be ready for. // // REF: https://github.com/tendermint/tendermint/issues/4727 -func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID) { +func (r *Reactor) broadcastEvidenceLoop(ctx context.Context) { var next *clist.CElement - defer func() { - r.mtx.Lock() - delete(r.peerRoutines, peerID) - r.mtx.Unlock() - - if e := recover(); e != nil { - r.logger.Error( - "recovering from broadcasting evidence loop", - "err", e, - "stack", string(debug.Stack()), - ) - } - }() - timer := time.NewTimer(0) defer timer.Stop() @@ -295,30 +272,74 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID panic(fmt.Errorf("failed to convert evidence: %w", err)) } - // Send the evidence to the corresponding peer. Note, the peer may be behind - // and thus would not be able to process the evidence correctly. Also, the - // peer may receive this piece of evidence multiple times if it added and - // removed frequently from the broadcasting peer. - - if err := r.evidenceCh.Send(ctx, p2p.Envelope{ - To: peerID, - Message: evProto, - }); err != nil { + // convert current peers into a channel to consume + // from worker threads + peers, num := func() (chan types.NodeID, int) { + r.mtx.Lock() + defer r.mtx.Unlock() + peers := make(chan types.NodeID, len(r.peerRoutines)) + for id := range r.peerRoutines { + select { + case peers <- id: + case <-ctx.Done(): + return nil, 0 + } + } + return peers, len(r.peerRoutines) + }() + if num == 0 { return } - r.logger.Debug("gossiped evidence to peer", "evidence", ev, "peer", peerID) - select { - case <-timer.C: - // start from the beginning after broadcastEvidenceIntervalS seconds - timer.Reset(time.Second * broadcastEvidenceIntervalS) - next = nil + wg := &sync.WaitGroup{} + + // TODO(tycho): make a better decision about the number of + // parallel sends that we should make. This is just + // one thread per peer, which is probably fine given + // the amount of buffering in the p2p channels. In the + // future we might want to have this resemble numCPU + // or something similar. + for i := 0; i < num; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for peer := range peers { + if err := r.evidenceCh.Send(ctx, p2p.Envelope{ + To: peer, + Message: evProto, + }); err != nil { + return + } + r.logger.Debug("gossiped evidence to peer", "evidence", ev, "peer", peer) + } + }() + } - case <-next.NextWaitChan(): - next = next.Next() + // wait for all the sends to complete. + signal := make(chan struct{}) + go func() { defer close(signal); wg.Wait() }() + select { case <-ctx.Done(): return + case <-signal: + // Send the evidence to the corresponding peer. Note, the peer may be behind + // and thus would not be able to process the evidence correctly. Also, the + // peer may receive this piece of evidence multiple times if it added and + // removed frequently from the broadcasting peer. + + select { + case <-ctx.Done(): + return + case <-timer.C: + // start from the beginning after broadcastEvidenceIntervalS seconds + timer.Reset(time.Second * broadcastEvidenceIntervalS) + next = nil + + case <-next.NextWaitChan(): + next = next.Next() + + } } } }