diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index f177caad1..8abbc8082 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -65,6 +65,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [consensus] \#5987 Remove `time_iota_ms` from consensus params. Merge `tmproto.ConsensusParams` and `abci.ConsensusParams`. (@marbar3778) - [types] \#5994 Reduce the use of protobuf types in core logic. (@marbar3778) - `ConsensusParams`, `BlockParams`, `ValidatorParams`, `EvidenceParams`, `VersionParams`, `sm.Version` and `version.Consensus` have become native types. They still utilize protobuf when being sent over the wire or written to disk. +- [rpc/client/http] \#6163 Do not drop events even if the `out` channel is full (@melekes) - [node] \#6059 Validate and complete genesis doc before saving to state store (@silasdavis) - [state] \#6067 Batch save state data (@githubsands & @cmwaters) diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index e92a3884c..5f79d6a8d 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -582,10 +582,15 @@ func (w *WSEvents) OnStop() { } // Subscribe implements EventsClient by using WSClient to subscribe given -// subscriber to query. By default, returns a channel with cap=1. Error is +// subscriber to query. By default, it returns a channel with cap=1. Error is // returned if it fails to subscribe. // -// Channel is never closed to prevent clients from seeing an erroneous event. +// When reading from the channel, keep in mind there's a single events loop, so +// if you don't read events for this subscription fast enough, other +// subscriptions will slow down in effect. +// +// The channel is never closed to prevent clients from seeing an erroneous +// event. // // It returns an error if WSEvents is not running. func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string, @@ -662,12 +667,15 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error func (w *WSEvents) redoSubscriptionsAfter(d time.Duration) { time.Sleep(d) - w.mtx.RLock() - defer w.mtx.RUnlock() + ctx := context.Background() + + w.mtx.Lock() + defer w.mtx.Unlock() for q := range w.subscriptions { - err := w.ws.Subscribe(context.Background(), q) + err := w.ws.Subscribe(ctx, q) if err != nil { - w.Logger.Error("Failed to resubscribe", "err", err) + w.Logger.Error("failed to resubscribe", "query", q, "err", err) + delete(w.subscriptions, q) } } } @@ -706,18 +714,15 @@ func (w *WSEvents) eventListener() { } w.mtx.RLock() - if out, ok := w.subscriptions[result.Query]; ok { - if cap(out) == 0 { - out <- *result - } else { - select { - case out <- *result: - default: - w.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query) - } + out, ok := w.subscriptions[result.Query] + w.mtx.RUnlock() + if ok { + select { + case out <- *result: + case <-w.Quit(): + return } } - w.mtx.RUnlock() case <-w.Quit(): return }