diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index 2e476c25d..3f3e2a117 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -107,7 +107,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, peer types.NodeID, height int // Respond allows the underlying process which receives requests on the // requestCh to respond with the respective light block. A nil response is used to // represent that the receiver of the request does not have a light block at that height. -func (d *Dispatcher) Respond(lb *tmproto.LightBlock, peer types.NodeID) error { +func (d *Dispatcher) Respond(ctx context.Context, lb *tmproto.LightBlock, peer types.NodeID) error { d.mtx.Lock() defer d.mtx.Unlock() @@ -121,8 +121,12 @@ func (d *Dispatcher) Respond(lb *tmproto.LightBlock, peer types.NodeID) error { // If lb is nil we take that to mean that the peer didn't have the requested light // block and thus pass on the nil to the caller. if lb == nil { - answerCh <- nil - return nil + select { + case answerCh <- nil: + return nil + case <-ctx.Done(): + return ctx.Err() + } } block, err := types.LightBlockFromProto(lb) @@ -130,8 +134,12 @@ func (d *Dispatcher) Respond(lb *tmproto.LightBlock, peer types.NodeID) error { return err } - answerCh <- block - return nil + select { + case <-ctx.Done(): + return ctx.Err() + case answerCh <- block: + return nil + } } // Close shuts down the dispatcher and cancels any pending calls awaiting responses. @@ -139,9 +147,11 @@ func (d *Dispatcher) Respond(lb *tmproto.LightBlock, peer types.NodeID) error { func (d *Dispatcher) Close() { d.mtx.Lock() defer d.mtx.Unlock() - for peer, call := range d.calls { + for peer := range d.calls { delete(d.calls, peer) - close(call) + // don't close the channel here as it's closed in + // other handlers, and would otherwise get garbage + // collected. } } diff --git a/internal/statesync/dispatcher_test.go b/internal/statesync/dispatcher_test.go index 918c6ec9e..65c517be4 100644 --- a/internal/statesync/dispatcher_test.go +++ b/internal/statesync/dispatcher_test.go @@ -80,7 +80,7 @@ func TestDispatcherReturnsNoBlock(t *testing.T) { go func() { <-chans.Out - require.NoError(t, d.Respond(nil, peer)) + require.NoError(t, d.Respond(ctx, nil, peer)) cancel() }() @@ -309,7 +309,7 @@ func handleRequests(ctx context.Context, t *testing.T, d *Dispatcher, ch chan p2 peer := request.To resp := mockLBResp(ctx, t, peer, int64(height), time.Now()) block, _ := resp.block.ToProto() - require.NoError(t, d.Respond(block, resp.peer)) + require.NoError(t, d.Respond(ctx, block, resp.peer)) case <-ctx.Done(): return } diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 78c9d8360..6ca0cb6b7 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -740,7 +740,10 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env height = msg.LightBlock.SignedHeader.Header.Height } r.logger.Info("received light block response", "peer", envelope.From, "height", height) - if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil { + if err := r.dispatcher.Respond(ctx, msg.LightBlock, envelope.From); err != nil { + if errors.Is(err, context.Canceled) { + return err + } r.logger.Error("error processing light block response", "err", err, "height", height) } diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 92209b0be..cc3c843e6 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -161,7 +161,7 @@ func setup( } } - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() var err error rts.reactor, err = NewReactor(