Browse Source

evidence: use fewer tracking threads

pull/8183/head
tycho garen 3 years ago
parent
commit
3269134b78
1 changed files with 70 additions and 49 deletions
  1. +70
    -49
      internal/evidence/reactor.go

+ 70
- 49
internal/evidence/reactor.go View File

@ -52,7 +52,7 @@ type Reactor struct {
mtx sync.Mutex mtx sync.Mutex
peerRoutines map[types.NodeID]context.CancelFunc
peerRoutines map[types.NodeID]struct{}
} }
// NewReactor returns a reference to a new evidence reactor, which implements the // NewReactor returns a reference to a new evidence reactor, which implements the
@ -75,7 +75,7 @@ func NewReactor(
evpool: evpool, evpool: evpool,
evidenceCh: evidenceCh, evidenceCh: evidenceCh,
peerUpdates: peerUpdates, peerUpdates: peerUpdates,
peerRoutines: make(map[types.NodeID]context.CancelFunc),
peerRoutines: make(map[types.NodeID]struct{}),
} }
r.BaseService = *service.NewBaseService(logger, "Evidence", r) r.BaseService = *service.NewBaseService(logger, "Evidence", r)
@ -90,6 +90,7 @@ func NewReactor(
func (r *Reactor) OnStart(ctx context.Context) error { func (r *Reactor) OnStart(ctx context.Context) error {
go r.processEvidenceCh(ctx) go r.processEvidenceCh(ctx)
go r.processPeerUpdates(ctx) go r.processPeerUpdates(ctx)
go r.broadcastEvidenceLoop(ctx)
return nil return nil
} }
@ -209,22 +210,15 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// a new done channel so we can explicitly close the goroutine if the peer // a new done channel so we can explicitly close the goroutine if the peer
// is later removed, we increment the waitgroup so the reactor can stop // is later removed, we increment the waitgroup so the reactor can stop
// safely, and finally start the goroutine to broadcast evidence to that peer. // safely, and finally start the goroutine to broadcast evidence to that peer.
_, ok := r.peerRoutines[peerUpdate.NodeID]
if !ok {
pctx, pcancel := context.WithCancel(ctx)
r.peerRoutines[peerUpdate.NodeID] = pcancel
go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID)
}
r.peerRoutines[peerUpdate.NodeID] = struct{}{}
case p2p.PeerStatusDown: case p2p.PeerStatusDown:
// Check if we've started an evidence broadcasting goroutine for this peer. // Check if we've started an evidence broadcasting goroutine for this peer.
// If we have, we signal to terminate the goroutine via the channel's closure. // If we have, we signal to terminate the goroutine via the channel's closure.
// This will internally decrement the peer waitgroup and remove the peer // This will internally decrement the peer waitgroup and remove the peer
// from the map of peer evidence broadcasting goroutines. // from the map of peer evidence broadcasting goroutines.
closer, ok := r.peerRoutines[peerUpdate.NodeID]
if ok {
closer()
}
delete(r.peerRoutines, peerUpdate.NodeID)
} }
} }
@ -243,33 +237,16 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
} }
// broadcastEvidenceLoop starts a blocking process that continuously reads pieces // broadcastEvidenceLoop starts a blocking process that continuously reads pieces
// of evidence off of a linked-list and sends the evidence in a p2p Envelope to
// the given peer by ID. This should be invoked in a goroutine per unique peer
// ID via an appropriate PeerUpdate. The goroutine can be signaled to gracefully
// exit by either explicitly closing the provided doneCh or by the reactor
// signaling to stop.
// of evidence off of a linked-list and sends the evidence to all
// currently connected peers.
// //
// TODO: This should be refactored so that we do not blindly gossip evidence // TODO: This should be refactored so that we do not blindly gossip evidence
// that the peer has already received or may not be ready for. // that the peer has already received or may not be ready for.
// //
// REF: https://github.com/tendermint/tendermint/issues/4727 // REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID) {
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context) {
var next *clist.CElement var next *clist.CElement
defer func() {
r.mtx.Lock()
delete(r.peerRoutines, peerID)
r.mtx.Unlock()
if e := recover(); e != nil {
r.logger.Error(
"recovering from broadcasting evidence loop",
"err", e,
"stack", string(debug.Stack()),
)
}
}()
timer := time.NewTimer(0) timer := time.NewTimer(0)
defer timer.Stop() defer timer.Stop()
@ -295,30 +272,74 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
panic(fmt.Errorf("failed to convert evidence: %w", err)) panic(fmt.Errorf("failed to convert evidence: %w", err))
} }
// Send the evidence to the corresponding peer. Note, the peer may be behind
// and thus would not be able to process the evidence correctly. Also, the
// peer may receive this piece of evidence multiple times if it added and
// removed frequently from the broadcasting peer.
if err := r.evidenceCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: evProto,
}); err != nil {
// convert current peers into a channel to consume
// from worker threads
peers, num := func() (chan types.NodeID, int) {
r.mtx.Lock()
defer r.mtx.Unlock()
peers := make(chan types.NodeID, len(r.peerRoutines))
for id := range r.peerRoutines {
select {
case peers <- id:
case <-ctx.Done():
return nil, 0
}
}
return peers, len(r.peerRoutines)
}()
if num == 0 {
return return
} }
r.logger.Debug("gossiped evidence to peer", "evidence", ev, "peer", peerID)
select {
case <-timer.C:
// start from the beginning after broadcastEvidenceIntervalS seconds
timer.Reset(time.Second * broadcastEvidenceIntervalS)
next = nil
wg := &sync.WaitGroup{}
// TODO(tycho): make a better decision about the number of
// parallel sends that we should make. This is just
// one thread per peer, which is probably fine given
// the amount of buffering in the p2p channels. In the
// future we might want to have this resemble numCPU
// or something similar.
for i := 0; i < num; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for peer := range peers {
if err := r.evidenceCh.Send(ctx, p2p.Envelope{
To: peer,
Message: evProto,
}); err != nil {
return
}
r.logger.Debug("gossiped evidence to peer", "evidence", ev, "peer", peer)
}
}()
}
case <-next.NextWaitChan():
next = next.Next()
// wait for all the sends to complete.
signal := make(chan struct{})
go func() { defer close(signal); wg.Wait() }()
select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-signal:
// Send the evidence to the corresponding peer. Note, the peer may be behind
// and thus would not be able to process the evidence correctly. Also, the
// peer may receive this piece of evidence multiple times if it added and
// removed frequently from the broadcasting peer.
select {
case <-ctx.Done():
return
case <-timer.C:
// start from the beginning after broadcastEvidenceIntervalS seconds
timer.Reset(time.Second * broadcastEvidenceIntervalS)
next = nil
case <-next.NextWaitChan():
next = next.Next()
}
} }
} }
} }

Loading…
Cancel
Save