Browse Source

rpc/client/http: Do not drop events even if the `out` channel is full (#6163)

```
// unbuffered
out, err := httpClient.Subscribe(ctx, "event.type=NewTx and account.name=Jack", 0)

// buffered
out, err := httpClient.Subscribe(ctx, "event.type=NewTx AND account.name=Jack", 20)
```

Before: when the `out` channel is buffered and becomes full, we drop an event (+ log the error)
After: when the `out` channel is buffered and becomes full, we block

**Before it was not apparent to the app when an event was dropped (looking at the logs is manual task). After this PR, if the user does not read from `out` on 1 subscription, all other subscriptions will be stuck too.**

Closes #6161
pull/6177/head
Anton Kaliaev 4 years ago
committed by GitHub
parent
commit
0f4124fb54
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 16 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +21
    -16
      rpc/client/http/http.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -65,6 +65,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [consensus] \#5987 Remove `time_iota_ms` from consensus params. Merge `tmproto.ConsensusParams` and `abci.ConsensusParams`. (@marbar3778) - [consensus] \#5987 Remove `time_iota_ms` from consensus params. Merge `tmproto.ConsensusParams` and `abci.ConsensusParams`. (@marbar3778)
- [types] \#5994 Reduce the use of protobuf types in core logic. (@marbar3778) - [types] \#5994 Reduce the use of protobuf types in core logic. (@marbar3778)
- `ConsensusParams`, `BlockParams`, `ValidatorParams`, `EvidenceParams`, `VersionParams`, `sm.Version` and `version.Consensus` have become native types. They still utilize protobuf when being sent over the wire or written to disk. - `ConsensusParams`, `BlockParams`, `ValidatorParams`, `EvidenceParams`, `VersionParams`, `sm.Version` and `version.Consensus` have become native types. They still utilize protobuf when being sent over the wire or written to disk.
- [rpc/client/http] \#6163 Do not drop events even if the `out` channel is full (@melekes)
- [node] \#6059 Validate and complete genesis doc before saving to state store (@silasdavis) - [node] \#6059 Validate and complete genesis doc before saving to state store (@silasdavis)
- [state] \#6067 Batch save state data (@githubsands & @cmwaters) - [state] \#6067 Batch save state data (@githubsands & @cmwaters)


+ 21
- 16
rpc/client/http/http.go View File

@ -582,10 +582,15 @@ func (w *WSEvents) OnStop() {
} }
// Subscribe implements EventsClient by using WSClient to subscribe given // Subscribe implements EventsClient by using WSClient to subscribe given
// subscriber to query. By default, returns a channel with cap=1. Error is
// subscriber to query. By default, it returns a channel with cap=1. Error is
// returned if it fails to subscribe. // returned if it fails to subscribe.
// //
// Channel is never closed to prevent clients from seeing an erroneous event.
// When reading from the channel, keep in mind there's a single events loop, so
// if you don't read events for this subscription fast enough, other
// subscriptions will slow down in effect.
//
// The channel is never closed to prevent clients from seeing an erroneous
// event.
// //
// It returns an error if WSEvents is not running. // It returns an error if WSEvents is not running.
func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string, func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string,
@ -662,12 +667,15 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error
func (w *WSEvents) redoSubscriptionsAfter(d time.Duration) { func (w *WSEvents) redoSubscriptionsAfter(d time.Duration) {
time.Sleep(d) time.Sleep(d)
w.mtx.RLock()
defer w.mtx.RUnlock()
ctx := context.Background()
w.mtx.Lock()
defer w.mtx.Unlock()
for q := range w.subscriptions { for q := range w.subscriptions {
err := w.ws.Subscribe(context.Background(), q)
err := w.ws.Subscribe(ctx, q)
if err != nil { if err != nil {
w.Logger.Error("Failed to resubscribe", "err", err)
w.Logger.Error("failed to resubscribe", "query", q, "err", err)
delete(w.subscriptions, q)
} }
} }
} }
@ -706,18 +714,15 @@ func (w *WSEvents) eventListener() {
} }
w.mtx.RLock() w.mtx.RLock()
if out, ok := w.subscriptions[result.Query]; ok {
if cap(out) == 0 {
out <- *result
} else {
select {
case out <- *result:
default:
w.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query)
}
out, ok := w.subscriptions[result.Query]
w.mtx.RUnlock()
if ok {
select {
case out <- *result:
case <-w.Quit():
return
} }
} }
w.mtx.RUnlock()
case <-w.Quit(): case <-w.Quit():
return return
} }


Loading…
Cancel
Save