diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index fff274be6..2ecfa7958 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -323,6 +323,8 @@ func (w *WSEvents) eventListener() { w.Logger.Error("failed to unmarshal response", "err", err) continue } + // NOTE: writing also happens inside mutex so we can't close a channel in + // Unsubscribe/UnsubscribeAll. w.mtx.RLock() if ch, ok := w.subscriptions[result.Query]; ok { ch <- result.Data diff --git a/rpc/core/events.go b/rpc/core/events.go index 840b971d8..538134b0f 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -50,10 +50,6 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() ch := make(chan interface{}) - es := wsCtx.GetEventSubscriber() - if es == nil { - es = eventBus - } err = eventBusFor(wsCtx).Subscribe(ctx, addr, q, ch) if err != nil { return nil, err