Browse Source

mempool+evidence: simplify cleanup (#7794)

pull/7799/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
81dcc8d1b4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 74 deletions
  1. +12
    -34
      internal/evidence/reactor.go
  2. +8
    -35
      internal/mempool/reactor.go
  3. +1
    -5
      internal/mempool/reactor_test.go

+ 12
- 34
internal/evidence/reactor.go View File

@ -8,7 +8,6 @@ import (
"time"
clist "github.com/tendermint/tendermint/internal/libs/clist"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
@ -51,10 +50,8 @@ type Reactor struct {
evidenceCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
peerWG sync.WaitGroup
mtx sync.Mutex
peerRoutines map[types.NodeID]*tmsync.Closer
peerRoutines map[types.NodeID]context.CancelFunc
}
// NewReactor returns a reference to a new evidence reactor, which implements the
@ -77,7 +74,7 @@ func NewReactor(
evpool: evpool,
evidenceCh: evidenceCh,
peerUpdates: peerUpdates,
peerRoutines: make(map[types.NodeID]*tmsync.Closer),
peerRoutines: make(map[types.NodeID]context.CancelFunc),
}
r.BaseService = *service.NewBaseService(logger, "Evidence", r)
@ -98,16 +95,6 @@ func (r *Reactor) OnStart(ctx context.Context) error {
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
func (r *Reactor) OnStop() {
r.mtx.Lock()
for _, c := range r.peerRoutines {
c.Close()
}
r.mtx.Unlock()
// Wait for all spawned peer evidence broadcasting goroutines to gracefully
// exit.
r.peerWG.Wait()
// Close the evidence db
r.evpool.Close()
}
@ -221,11 +208,9 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// safely, and finally start the goroutine to broadcast evidence to that peer.
_, ok := r.peerRoutines[peerUpdate.NodeID]
if !ok {
closer := tmsync.NewCloser()
r.peerRoutines[peerUpdate.NodeID] = closer
r.peerWG.Add(1)
go r.broadcastEvidenceLoop(ctx, peerUpdate.NodeID, closer)
pctx, pcancel := context.WithCancel(ctx)
r.peerRoutines[peerUpdate.NodeID] = pcancel
go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID)
}
case p2p.PeerStatusDown:
@ -235,7 +220,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// from the map of peer evidence broadcasting goroutines.
closer, ok := r.peerRoutines[peerUpdate.NodeID]
if ok {
closer.Close()
closer()
}
}
}
@ -265,7 +250,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
// that the peer has already received or may not be ready for.
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, closer *tmsync.Closer) {
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID) {
var next *clist.CElement
defer func() {
@ -273,8 +258,6 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
delete(r.peerRoutines, peerID)
r.mtx.Unlock()
r.peerWG.Done()
if e := recover(); e != nil {
r.logger.Error(
"recovering from broadcasting evidence loop",
@ -284,6 +267,9 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
}
}()
timer := time.NewTimer(0)
defer timer.Stop()
for {
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWaitChan() returned nil. So we can go
@ -297,10 +283,6 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
case <-ctx.Done():
return
case <-closer.Done():
// The peer is marked for removal via a PeerUpdate as the doneCh was
// explicitly closed to signal we should exit.
return
}
}
@ -324,18 +306,14 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
r.logger.Debug("gossiped evidence to peer", "evidence", ev, "peer", peerID)
select {
case <-time.After(time.Second * broadcastEvidenceIntervalS):
case <-timer.C:
// start from the beginning after broadcastEvidenceIntervalS seconds
timer.Reset(time.Second * broadcastEvidenceIntervalS)
next = nil
case <-next.NextWaitChan():
next = next.Next()
case <-closer.Done():
// The peer is marked for removal via a PeerUpdate as the doneCh was
// explicitly closed to signal we should exit.
return
case <-ctx.Done():
return
}


+ 8
- 35
internal/mempool/reactor.go View File

@ -10,7 +10,6 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/libs/clist"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
@ -49,16 +48,12 @@ type Reactor struct {
mempoolCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
// peerWG is used to coordinate graceful termination of all peer broadcasting
// goroutines.
peerWG sync.WaitGroup
// observePanic is a function for observing panics that were recovered in methods on
// Reactor. observePanic is called with the recovered value.
observePanic func(interface{})
mtx sync.Mutex
peerRoutines map[types.NodeID]*tmsync.Closer
peerRoutines map[types.NodeID]context.CancelFunc
}
// NewReactor returns a reference to a new reactor.
@ -85,7 +80,7 @@ func NewReactor(
ids: NewMempoolIDs(),
mempoolCh: ch,
peerUpdates: peerUpdates,
peerRoutines: make(map[types.NodeID]*tmsync.Closer),
peerRoutines: make(map[types.NodeID]context.CancelFunc),
observePanic: defaultObservePanic,
}
@ -131,16 +126,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
func (r *Reactor) OnStop() {
r.mtx.Lock()
for _, c := range r.peerRoutines {
c.Close()
}
r.mtx.Unlock()
// wait for all spawned peer tx broadcasting goroutines to gracefully exit
r.peerWG.Wait()
}
func (r *Reactor) OnStop() {}
// handleMempoolMessage handles envelopes sent from peers on the MempoolChannel.
// For every tx in the message, we execute CheckTx. It returns an error if an
@ -248,15 +234,13 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// safely, and finally start the goroutine to broadcast txs to that peer.
_, ok := r.peerRoutines[peerUpdate.NodeID]
if !ok {
closer := tmsync.NewCloser()
r.peerRoutines[peerUpdate.NodeID] = closer
r.peerWG.Add(1)
pctx, pcancel := context.WithCancel(ctx)
r.peerRoutines[peerUpdate.NodeID] = pcancel
r.ids.ReserveForPeer(peerUpdate.NodeID)
// start a broadcast routine ensuring all txs are forwarded to the peer
go r.broadcastTxRoutine(ctx, peerUpdate.NodeID, closer)
go r.broadcastTxRoutine(pctx, peerUpdate.NodeID)
}
}
@ -269,7 +253,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// from the map of peer tx broadcasting goroutines.
closer, ok := r.peerRoutines[peerUpdate.NodeID]
if ok {
closer.Close()
closer()
}
}
}
@ -288,7 +272,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
}
}
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, closer *tmsync.Closer) {
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) {
peerMempoolID := r.ids.GetForPeer(peerID)
var nextGossipTx *clist.CElement
@ -298,8 +282,6 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c
delete(r.peerRoutines, peerID)
r.mtx.Unlock()
r.peerWG.Done()
if e := recover(); e != nil {
r.observePanic(e)
r.logger.Error(
@ -326,11 +308,6 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c
if nextGossipTx = r.mempool.NextGossipTx(); nextGossipTx == nil {
continue
}
case <-closer.Done():
// The peer is marked for removal via a PeerUpdate as the doneCh was
// explicitly closed to signal we should exit.
return
}
}
@ -369,10 +346,6 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, c
select {
case <-nextGossipTx.NextWaitChan():
nextGossipTx = nextGossipTx.Next()
case <-closer.Done():
// The peer is marked for removal via a PeerUpdate as the doneCh was
// explicitly closed to signal we should exit.
return
case <-ctx.Done():
return
}


+ 1
- 5
internal/mempool/reactor_test.go View File

@ -15,7 +15,6 @@ import (
"github.com/tendermint/tendermint/abci/example/kvstore"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
"github.com/tendermint/tendermint/libs/log"
@ -172,9 +171,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
// run the router
rts.start(ctx, t)
closer := tmsync.NewCloser()
primaryReactor.peerWG.Add(1)
go primaryReactor.broadcastTxRoutine(ctx, secondary, closer)
go primaryReactor.broadcastTxRoutine(ctx, secondary)
wg := &sync.WaitGroup{}
for i := 0; i < 50; i++ {
@ -188,7 +185,6 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
err := primaryReactor.Stop()
require.NoError(t, err)
primaryReactor.peerWG.Wait()
wg.Wait()
}


Loading…
Cancel
Save