diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index cf1a10623..ea6f4a14d 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -80,15 +80,7 @@ type Reactor struct { blockSync *atomicBool blockSyncCh *p2p.Channel - // blockSyncOutBridgeCh defines a channel that acts as a bridge between sending Envelope - // messages that the reactor will consume in processBlockSyncCh and receiving messages - // from the peer updates channel and other goroutines. We do this instead of directly - // sending on blockSyncCh.Out to avoid race conditions in the case where other goroutines - // send Envelopes directly to the to blockSyncCh.Out channel, since processBlockSyncCh - // may close the blockSyncCh.Out channel at the same time that other goroutines send to - // blockSyncCh.Out. - blockSyncOutBridgeCh chan p2p.Envelope - peerUpdates *p2p.PeerUpdates + peerUpdates *p2p.PeerUpdates requestsCh <-chan BlockRequest errorsCh <-chan peerError @@ -119,17 +111,16 @@ func NewReactor( } r := &Reactor{ - logger: logger, - stateStore: stateStore, - blockExec: blockExec, - store: store, - consReactor: consReactor, - blockSync: newAtomicBool(blockSync), - blockSyncCh: blockSyncCh, - blockSyncOutBridgeCh: make(chan p2p.Envelope), - peerUpdates: peerUpdates, - metrics: metrics, - eventBus: eventBus, + logger: logger, + stateStore: stateStore, + blockExec: blockExec, + store: store, + consReactor: consReactor, + blockSync: newAtomicBool(blockSync), + blockSyncCh: blockSyncCh, + peerUpdates: peerUpdates, + metrics: metrics, + eventBus: eventBus, } r.BaseService = *service.NewBaseService(logger, "BlockSync", r) @@ -175,7 +166,6 @@ func (r *Reactor) OnStart(ctx context.Context) error { } go r.processBlockSyncCh(ctx) - go r.processBlockSyncBridge(ctx) go r.processPeerUpdates(ctx) return nil @@ -306,21 +296,8 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) { } } -func (r *Reactor) processBlockSyncBridge(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case envelope := <-r.blockSyncOutBridgeCh: - if err := r.blockSyncCh.Send(ctx, envelope); err != nil { - return - } - } - } -} - // processPeerUpdate processes a PeerUpdate. -func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) // XXX: Pool#RedoRequest can sometimes give us an empty peer. @@ -331,12 +308,20 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { switch peerUpdate.Status { case p2p.PeerStatusUp: // send a status update the newly added peer - r.blockSyncOutBridgeCh <- p2p.Envelope{ + if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ To: peerUpdate.NodeID, Message: &bcproto.StatusResponse{ Base: r.store.Base(), Height: r.store.Height(), }, + }); err != nil { + r.pool.RemovePeer(peerUpdate.NodeID) + if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ + NodeID: peerUpdate.NodeID, + Err: err, + }); err != nil { + return + } } case p2p.PeerStatusDown: @@ -353,7 +338,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { case <-ctx.Done(): return case peerUpdate := <-r.peerUpdates.Updates(): - r.processPeerUpdate(peerUpdate) + r.processPeerUpdate(ctx, peerUpdate) } } } @@ -372,7 +357,6 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error { r.syncStartTime = time.Now() go r.requestRoutine(ctx) - go r.poolRoutine(ctx, true) return nil @@ -387,15 +371,17 @@ func (r *Reactor) requestRoutine(ctx context.Context) { case <-ctx.Done(): return case request := <-r.requestsCh: - select { - case <-ctx.Done(): - return - case r.blockSyncOutBridgeCh <- p2p.Envelope{ + if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ To: request.PeerID, Message: &bcproto.BlockRequest{Height: request.Height}, - }: + }); err != nil { + if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ + NodeID: request.PeerID, + Err: err, + }); err != nil { + return + } } - case pErr := <-r.errorsCh: if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: pErr.peerID, @@ -404,16 +390,12 @@ func (r *Reactor) requestRoutine(ctx context.Context) { return } case <-statusUpdateTicker.C: - go func() { - select { - case <-ctx.Done(): - return - case r.blockSyncOutBridgeCh <- p2p.Envelope{ - Broadcast: true, - Message: &bcproto.StatusRequest{}, - }: - } - }() + if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ + Broadcast: true, + Message: &bcproto.StatusRequest{}, + }); err != nil { + return + } } } }