Browse Source

service: cleanup close channel in reactors (#7399)

pull/7402/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
0ff3d4b89d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 26 additions and 142 deletions
  1. +2
    -24
      internal/blocksync/reactor.go
  2. +6
    -41
      internal/consensus/reactor.go
  3. +3
    -22
      internal/evidence/reactor.go
  4. +3
    -13
      internal/statesync/dispatcher.go
  5. +5
    -27
      internal/statesync/reactor.go
  6. +0
    -1
      internal/statesync/reactor_test.go
  7. +5
    -12
      internal/statesync/syncer.go
  8. +2
    -2
      internal/statesync/syncer_test.go

+ 2
- 24
internal/blocksync/reactor.go View File

@ -86,7 +86,6 @@ type Reactor struct {
// blockSyncCh.Out.
blockSyncOutBridgeCh chan p2p.Envelope
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
@ -138,7 +137,6 @@ func NewReactor(
blockSyncCh: blockSyncCh,
blockSyncOutBridgeCh: make(chan p2p.Envelope),
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
metrics: metrics,
syncStartTime: time.Time{},
}
@ -184,10 +182,6 @@ func (r *Reactor) OnStop() {
// wait for the poolRoutine and requestRoutine goroutines to gracefully exit
r.poolWG.Wait()
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
<-r.peerUpdates.Done()
}
@ -295,6 +289,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on block sync channel; closing...")
return
case envelope := <-r.blockSyncCh.In:
if err := r.handleMessage(r.blockSyncCh.ID, envelope); err != nil {
@ -304,14 +299,8 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) {
Err: err,
}
}
case envelope := <-r.blockSyncOutBridgeCh:
r.blockSyncCh.Out <- envelope
case <-r.closeCh:
r.logger.Debug("stopped listening on block sync channel; closing...")
return
}
}
}
@ -350,13 +339,10 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on peer updates channel; closing...")
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
case <-r.closeCh:
r.logger.Debug("stopped listening on peer updates channel; closing...")
return
}
}
}
@ -391,24 +377,18 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
for {
select {
case <-r.closeCh:
return
case <-ctx.Done():
return
case request := <-r.requestsCh:
r.blockSyncOutBridgeCh <- p2p.Envelope{
To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height},
}
case pErr := <-r.errorsCh:
r.blockSyncCh.Error <- p2p.PeerError{
NodeID: pErr.peerID,
Err: pErr.err,
}
case <-statusUpdateTicker.C:
r.poolWG.Add(1)
@ -598,8 +578,6 @@ FOR_LOOP:
case <-ctx.Done():
break FOR_LOOP
case <-r.closeCh:
break FOR_LOOP
case <-r.pool.exitedCh:
break FOR_LOOP
}


+ 6
- 41
internal/consensus/reactor.go View File

@ -126,13 +126,6 @@ type Reactor struct {
voteCh *p2p.Channel
voteSetBitsCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
// NOTE: We need a dedicated stateCloseCh channel for signaling closure of
// the StateChannel due to the fact that the StateChannel message handler
// performs a send on the VoteSetBitsChannel. This is an antipattern, so having
// this dedicated channel,stateCloseCh, is necessary in order to avoid data races.
stateCloseCh chan struct{}
closeCh chan struct{}
}
// NewReactor returns a reference to a new consensus reactor, which implements
@ -162,8 +155,6 @@ func NewReactor(
voteCh: voteCh,
voteSetBitsCh: voteSetBitsCh,
peerUpdates: peerUpdates,
stateCloseCh: make(chan struct{}),
closeCh: make(chan struct{}),
}
r.BaseService = *service.NewBaseService(logger, "Consensus", r)
@ -230,14 +221,6 @@ func (r *Reactor) OnStop() {
}
r.mtx.Unlock()
// Close the StateChannel goroutine separately since it uses its own channel
// to signal closure.
close(r.stateCloseCh)
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
<-r.peerUpdates.Done()
}
@ -993,8 +976,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
case p2p.PeerStatusUp:
// Do not allow starting new broadcasting goroutines after reactor shutdown
// has been initiated. This can happen after we've manually closed all
// peer goroutines and closed r.closeCh, but the router still sends in-flight
// peer updates.
// peer goroutines, but the router still sends in-flight peer updates.
if !r.IsRunning() {
return
}
@ -1337,6 +1319,7 @@ func (r *Reactor) processStateCh(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on StateChannel; closing...")
return
case envelope := <-r.stateCh.In:
if err := r.handleMessage(r.stateCh.ID, envelope); err != nil {
@ -1346,10 +1329,6 @@ func (r *Reactor) processStateCh(ctx context.Context) {
Err: err,
}
}
case <-r.stateCloseCh:
r.logger.Debug("stopped listening on StateChannel; closing...")
return
}
}
}
@ -1363,6 +1342,7 @@ func (r *Reactor) processDataCh(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on DataChannel; closing...")
return
case envelope := <-r.dataCh.In:
if err := r.handleMessage(r.dataCh.ID, envelope); err != nil {
@ -1372,10 +1352,6 @@ func (r *Reactor) processDataCh(ctx context.Context) {
Err: err,
}
}
case <-r.closeCh:
r.logger.Debug("stopped listening on DataChannel; closing...")
return
}
}
}
@ -1389,6 +1365,7 @@ func (r *Reactor) processVoteCh(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on VoteChannel; closing...")
return
case envelope := <-r.voteCh.In:
if err := r.handleMessage(r.voteCh.ID, envelope); err != nil {
@ -1398,10 +1375,6 @@ func (r *Reactor) processVoteCh(ctx context.Context) {
Err: err,
}
}
case <-r.closeCh:
r.logger.Debug("stopped listening on VoteChannel; closing...")
return
}
}
}
@ -1415,6 +1388,7 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on VoteSetBitsChannel; closing...")
return
case envelope := <-r.voteSetBitsCh.In:
if err := r.handleMessage(r.voteSetBitsCh.ID, envelope); err != nil {
@ -1424,10 +1398,6 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) {
Err: err,
}
}
case <-r.closeCh:
r.logger.Debug("stopped listening on VoteSetBitsChannel; closing...")
return
}
}
}
@ -1441,13 +1411,10 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on peer updates channel; closing...")
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
case <-r.closeCh:
r.logger.Debug("stopped listening on peer updates channel; closing...")
return
}
}
}
@ -1486,8 +1453,6 @@ func (r *Reactor) peerStatsRoutine(ctx context.Context) {
}
case <-ctx.Done():
return
case <-r.closeCh:
return
}
}
}


+ 3
- 22
internal/evidence/reactor.go View File

@ -50,7 +50,6 @@ type Reactor struct {
evpool *Pool
evidenceCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
peerWG sync.WaitGroup
@ -72,7 +71,6 @@ func NewReactor(
evpool: evpool,
evidenceCh: evidenceCh,
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
peerRoutines: make(map[types.NodeID]*tmsync.Closer),
}
@ -104,10 +102,6 @@ func (r *Reactor) OnStop() {
// exit.
r.peerWG.Wait()
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
// Wait for all p2p Channels to be closed before returning. This ensures we
// can easily reason about synchronization of all p2p Channels and ensure no
// panics will occur.
@ -188,6 +182,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on evidence channel; closing...")
return
case envelope := <-r.evidenceCh.In:
if err := r.handleMessage(r.evidenceCh.ID, envelope); err != nil {
@ -197,10 +192,6 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) {
Err: err,
}
}
case <-r.closeCh:
r.logger.Debug("stopped listening on evidence channel; closing...")
return
}
}
}
@ -226,8 +217,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
case p2p.PeerStatusUp:
// Do not allow starting new evidence broadcast loops after reactor shutdown
// has been initiated. This can happen after we've manually closed all
// peer broadcast loops and closed r.closeCh, but the router still sends
// in-flight peer updates.
// peer broadcast loops, but the router still sends in-flight peer updates.
if !r.IsRunning() {
return
}
@ -268,8 +258,6 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
case <-ctx.Done():
return
case <-r.closeCh:
r.logger.Debug("stopped listening on peer updates channel; closing...")
return
}
@ -323,11 +311,6 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
// The peer is marked for removal via a PeerUpdate as the doneCh was
// explicitly closed to signal we should exit.
return
case <-r.closeCh:
// The reactor has signaled that we are stopped and thus we should
// implicitly exit this peer's goroutine.
return
}
}
@ -366,9 +349,7 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
// explicitly closed to signal we should exit.
return
case <-r.closeCh:
// The reactor has signaled that we are stopped and thus we should
// implicitly exit this peer's goroutine.
case <-ctx.Done():
return
}
}


+ 3
- 13
internal/statesync/dispatcher.go View File

@ -27,7 +27,6 @@ var (
type Dispatcher struct {
// the channel with which to send light block requests on
requestCh chan<- p2p.Envelope
closeCh chan struct{}
mtx sync.Mutex
// all pending calls that have been dispatched and are awaiting an answer
@ -37,7 +36,6 @@ type Dispatcher struct {
func NewDispatcher(requestCh chan<- p2p.Envelope) *Dispatcher {
return &Dispatcher{
requestCh: requestCh,
closeCh: make(chan struct{}),
calls: make(map[types.NodeID]chan *types.LightBlock),
}
}
@ -47,7 +45,7 @@ func NewDispatcher(requestCh chan<- p2p.Envelope) *Dispatcher {
// LightBlock response is used to signal that the peer doesn't have the requested LightBlock.
func (d *Dispatcher) LightBlock(ctx context.Context, height int64, peer types.NodeID) (*types.LightBlock, error) {
// dispatch the request to the peer
callCh, err := d.dispatch(peer, height)
callCh, err := d.dispatch(ctx, peer, height)
if err != nil {
return nil, err
}
@ -69,19 +67,16 @@ func (d *Dispatcher) LightBlock(ctx context.Context, height int64, peer types.No
case <-ctx.Done():
return nil, ctx.Err()
case <-d.closeCh:
return nil, errDisconnected
}
}
// dispatch takes a peer and allocates it a channel so long as it's not already
// busy and the receiving channel is still running. It then dispatches the message
func (d *Dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.LightBlock, error) {
func (d *Dispatcher) dispatch(ctx context.Context, peer types.NodeID, height int64) (chan *types.LightBlock, error) {
d.mtx.Lock()
defer d.mtx.Unlock()
select {
case <-d.closeCh:
case <-ctx.Done():
return nil, errDisconnected
default:
}
@ -141,17 +136,12 @@ func (d *Dispatcher) Respond(lb *tmproto.LightBlock, peer types.NodeID) error {
func (d *Dispatcher) Close() {
d.mtx.Lock()
defer d.mtx.Unlock()
close(d.closeCh)
for peer, call := range d.calls {
delete(d.calls, peer)
close(call)
}
}
func (d *Dispatcher) Done() <-chan struct{} {
return d.closeCh
}
//----------------------------------------------------------------
// BlockProvider is a p2p based light provider which uses a dispatcher connected


+ 5
- 27
internal/statesync/reactor.go View File

@ -142,7 +142,6 @@ type Reactor struct {
blockCh *p2p.Channel
paramsCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
// Dispatcher is used to multiplex light block requests and responses over multiple
// peers used by the p2p state provider and in reverse sync.
@ -192,7 +191,6 @@ func NewReactor(
blockCh: blockCh,
paramsCh: paramsCh,
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
tempDir: tempDir,
stateStore: stateStore,
blockStore: blockStore,
@ -227,12 +225,6 @@ func (r *Reactor) OnStart(ctx context.Context) error {
func (r *Reactor) OnStop() {
// tell the dispatcher to stop sending any more requests
r.dispatcher.Close()
// wait for any remaining requests to complete
<-r.dispatcher.Done()
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
<-r.peerUpdates.Done()
}
@ -268,7 +260,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
r.stateProvider,
r.snapshotCh.Out,
r.chunkCh.Out,
ctx.Done(),
r.tempDir,
r.metrics,
)
@ -290,7 +281,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
select {
case <-ctx.Done():
case <-r.closeCh:
case r.snapshotCh.Out <- msg:
}
}
@ -446,9 +436,6 @@ func (r *Reactor) backfill(
// verify all light blocks
for {
select {
case <-r.closeCh:
queue.close()
return nil
case <-ctx.Done():
queue.close()
return nil
@ -816,6 +803,7 @@ func (r *Reactor) processCh(ctx context.Context, ch *p2p.Channel, chName string)
for {
select {
case <-ctx.Done():
r.logger.Debug("channel closed", "channel", chName)
return
case envelope := <-ch.In:
if err := r.handleMessage(ch.ID, envelope); err != nil {
@ -829,17 +817,13 @@ func (r *Reactor) processCh(ctx context.Context, ch *p2p.Channel, chName string)
Err: err,
}
}
case <-r.closeCh:
r.logger.Debug("channel closed", "channel", chName)
return
}
}
}
// processPeerUpdate processes a PeerUpdate, returning an error upon failing to
// handle the PeerUpdate or if a panic is recovered.
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
r.logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
switch peerUpdate.Status {
@ -859,7 +843,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
case p2p.PeerStatusUp:
newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
r.providers[peerUpdate.NodeID] = newProvider
err := r.syncer.AddPeer(peerUpdate.NodeID)
err := r.syncer.AddPeer(ctx, peerUpdate.NodeID)
if err != nil {
r.logger.Error("error adding peer to syncer", "error", err)
return
@ -886,13 +870,10 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
case <-r.closeCh:
r.logger.Debug("stopped listening on peer updates channel; closing...")
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
}
}
}
@ -981,9 +962,6 @@ func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) error {
case <-ctx.Done():
return fmt.Errorf("operation canceled while waiting for peers after %.2fs [%d/%d]",
time.Since(startAt).Seconds(), r.peers.Len(), numPeers)
case <-r.closeCh:
return fmt.Errorf("shutdown while waiting for peers after %.2fs [%d/%d]",
time.Since(startAt).Seconds(), r.peers.Len(), numPeers)
case <-t.C:
continue
case <-logT.C:


+ 0
- 1
internal/statesync/reactor_test.go View File

@ -172,7 +172,6 @@ func setup(
stateProvider,
rts.snapshotOutCh,
rts.chunkOutCh,
ctx.Done(),
"",
rts.reactor.metrics,
)


+ 5
- 12
internal/statesync/syncer.go View File

@ -70,7 +70,6 @@ type syncer struct {
avgChunkTime int64
lastSyncedSnapshotHeight int64
processingSnapshot *snapshot
closeCh <-chan struct{}
}
// newSyncer creates a new syncer.
@ -82,7 +81,6 @@ func newSyncer(
stateProvider StateProvider,
snapshotCh chan<- p2p.Envelope,
chunkCh chan<- p2p.Envelope,
closeCh <-chan struct{},
tempDir string,
metrics *Metrics,
) *syncer {
@ -98,7 +96,6 @@ func newSyncer(
fetchers: cfg.Fetchers,
retryTimeout: cfg.ChunkRequestTimeout,
metrics: metrics,
closeCh: closeCh,
}
}
@ -141,7 +138,7 @@ func (s *syncer) AddSnapshot(peerID types.NodeID, snapshot *snapshot) (bool, err
// AddPeer adds a peer to the pool. For now we just keep it simple and send a
// single request to discover snapshots, later we may want to do retries and stuff.
func (s *syncer) AddPeer(peerID types.NodeID) (err error) {
func (s *syncer) AddPeer(ctx context.Context, peerID types.NodeID) (err error) {
defer func() {
// TODO: remove panic recover once AddPeer can no longer accientally send on
// closed channel.
@ -160,7 +157,7 @@ func (s *syncer) AddPeer(peerID types.NodeID) (err error) {
}
select {
case <-s.closeCh:
case <-ctx.Done():
case s.snapshotCh <- msg:
}
return err
@ -494,8 +491,6 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch
select {
case <-ctx.Done():
return
case <-s.closeCh:
return
case <-time.After(2 * time.Second):
continue
}
@ -511,7 +506,7 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch
ticker := time.NewTicker(s.retryTimeout)
defer ticker.Stop()
s.requestChunk(snapshot, index)
s.requestChunk(ctx, snapshot, index)
select {
case <-chunks.WaitFor(index):
@ -522,8 +517,6 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch
case <-ctx.Done():
return
case <-s.closeCh:
return
}
ticker.Stop()
@ -531,7 +524,7 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch
}
// requestChunk requests a chunk from a peer.
func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunk uint32) {
peer := s.snapshots.GetPeer(snapshot)
if peer == "" {
s.logger.Error("No valid peers found for snapshot", "height", snapshot.Height,
@ -558,7 +551,7 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
select {
case s.chunkCh <- msg:
case <-s.closeCh:
case <-ctx.Done():
}
}


+ 2
- 2
internal/statesync/syncer_test.go View File

@ -78,13 +78,13 @@ func TestSyncer_SyncAny(t *testing.T) {
require.Error(t, err)
// Adding a couple of peers should trigger snapshot discovery messages
err = rts.syncer.AddPeer(peerAID)
err = rts.syncer.AddPeer(ctx, peerAID)
require.NoError(t, err)
e := <-rts.snapshotOutCh
require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
require.Equal(t, peerAID, e.To)
err = rts.syncer.AddPeer(peerBID)
err = rts.syncer.AddPeer(ctx, peerBID)
require.NoError(t, err)
e = <-rts.snapshotOutCh
require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)


Loading…
Cancel
Save