|
@ -5,13 +5,15 @@ Author: Anton Kaliaev (@melekes) |
|
|
## Changelog |
|
|
## Changelog |
|
|
|
|
|
|
|
|
02-10-2018: Initial draft |
|
|
02-10-2018: Initial draft |
|
|
|
|
|
|
|
|
16-01-2019: Second version based on our conversation with Jae |
|
|
16-01-2019: Second version based on our conversation with Jae |
|
|
|
|
|
|
|
|
17-01-2019: Third version explaining how new design solves current issues |
|
|
17-01-2019: Third version explaining how new design solves current issues |
|
|
|
|
|
|
|
|
## Context |
|
|
## Context |
|
|
|
|
|
|
|
|
Since the initial version of the pubsub, there's been a number of issues |
|
|
Since the initial version of the pubsub, there's been a number of issues |
|
|
raised: #951, #1879, #1880. Some of them are high-level issues questioning the |
|
|
|
|
|
|
|
|
raised: [#951], [#1879], [#1880]. Some of them are high-level issues questioning the |
|
|
core design choices made. Others are minor and mostly about the interface of |
|
|
core design choices made. Others are minor and mostly about the interface of |
|
|
`Subscribe()` / `Publish()` functions. |
|
|
`Subscribe()` / `Publish()` functions. |
|
|
|
|
|
|
|
@ -91,7 +93,7 @@ can make capacity an argument. |
|
|
|
|
|
|
|
|
## Decision |
|
|
## Decision |
|
|
|
|
|
|
|
|
Change Subscribe() function to return a `Subscription` struct: |
|
|
|
|
|
|
|
|
Change `Subscribe()` function to return a `Subscription` struct: |
|
|
|
|
|
|
|
|
```go |
|
|
```go |
|
|
type Subscription struct { |
|
|
type Subscription struct { |
|
@ -103,18 +105,18 @@ func (s *Subscription) Cancelled() <-chan struct{} |
|
|
func (s *Subscription) Err() error |
|
|
func (s *Subscription) Err() error |
|
|
``` |
|
|
``` |
|
|
|
|
|
|
|
|
Out returns a channel onto which messages and tags are published. |
|
|
|
|
|
Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from |
|
|
|
|
|
|
|
|
`Out()` returns a channel onto which messages and tags are published. |
|
|
|
|
|
`Unsubscribe`/`UnsubscribeAll` does not close the channel to avoid clients from |
|
|
receiving a nil message. |
|
|
receiving a nil message. |
|
|
|
|
|
|
|
|
Cancelled returns a channel that's closed when the subscription is terminated |
|
|
|
|
|
|
|
|
`Cancelled()` returns a channel that's closed when the subscription is terminated |
|
|
and supposed to be used in a select statement. |
|
|
and supposed to be used in a select statement. |
|
|
|
|
|
|
|
|
If Cancelled is not closed yet, Err() returns nil. |
|
|
|
|
|
If Cancelled is closed, Err returns a non-nil error explaining why: |
|
|
|
|
|
Unsubscribed if the subscriber choose to unsubscribe, |
|
|
|
|
|
OutOfCapacity if the subscriber is not pulling messages fast enough and the Out channel become full. |
|
|
|
|
|
After Err returns a non-nil error, successive calls to Err() return the same error. |
|
|
|
|
|
|
|
|
If the channel returned by `Cancelled()` is not closed yet, `Err()` returns nil. |
|
|
|
|
|
If the channel is closed, `Err()` returns a non-nil error explaining why: |
|
|
|
|
|
`ErrUnsubscribed` if the subscriber choose to unsubscribe, |
|
|
|
|
|
`ErrOutOfCapacity` if the subscriber is not pulling messages fast enough and the channel returned by `Out()` became full. |
|
|
|
|
|
After `Err()` returns a non-nil error, successive calls to `Err() return the same error. |
|
|
|
|
|
|
|
|
```go |
|
|
```go |
|
|
subscription, err := pubsub.Subscribe(...) |
|
|
subscription, err := pubsub.Subscribe(...) |
|
@ -130,18 +132,18 @@ select { |
|
|
} |
|
|
} |
|
|
``` |
|
|
``` |
|
|
|
|
|
|
|
|
Make Out() channel buffered (cap: 1) by default. In most cases, we want to |
|
|
|
|
|
|
|
|
Make the `Out()` channel buffered (with capacity 1) by default. In most cases, we want to |
|
|
terminate the slow subscriber. Only in rare cases, we want to block the pubsub |
|
|
terminate the slow subscriber. Only in rare cases, we want to block the pubsub |
|
|
(e.g. when debugging consensus). This should lower the chances of the pubsub |
|
|
(e.g. when debugging consensus). This should lower the chances of the pubsub |
|
|
being frozen. |
|
|
being frozen. |
|
|
|
|
|
|
|
|
```go |
|
|
```go |
|
|
// outCap can be used to set capacity of Out channel (1 by default). Set to 0 |
|
|
// outCap can be used to set capacity of Out channel (1 by default). Set to 0 |
|
|
for unbuffered channel (WARNING: it may block the pubsub). |
|
|
|
|
|
|
|
|
// for unbuffered channel (WARNING: it may block the pubsub). |
|
|
Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (Subscription, error) { |
|
|
Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (Subscription, error) { |
|
|
``` |
|
|
``` |
|
|
|
|
|
|
|
|
Also, Out() channel should return tags along with a message: |
|
|
|
|
|
|
|
|
Also, the `Out()` channel should return tags along with a message: |
|
|
|
|
|
|
|
|
```go |
|
|
```go |
|
|
type MsgAndTags struct { |
|
|
type MsgAndTags struct { |
|
@ -154,12 +156,12 @@ to inform clients of which Tags were used with Msg. |
|
|
|
|
|
|
|
|
### How this new design solves the current issues? |
|
|
### How this new design solves the current issues? |
|
|
|
|
|
|
|
|
https://github.com/tendermint/tendermint/issues/951 (https://github.com/tendermint/tendermint/issues/1880) |
|
|
|
|
|
|
|
|
[#951] ([#1880]): |
|
|
|
|
|
|
|
|
Because of non-blocking send, situation where we'll deadlock is not possible |
|
|
Because of non-blocking send, situation where we'll deadlock is not possible |
|
|
anymore. If the client stops reading messages, it will be removed. |
|
|
anymore. If the client stops reading messages, it will be removed. |
|
|
|
|
|
|
|
|
https://github.com/tendermint/tendermint/issues/1879 |
|
|
|
|
|
|
|
|
[#1879]: |
|
|
|
|
|
|
|
|
MsgAndTags is used now instead of a plain message. |
|
|
MsgAndTags is used now instead of a plain message. |
|
|
|
|
|
|
|
@ -191,3 +193,8 @@ In review |
|
|
- (since v1) no concurrency when it comes to publishing messages |
|
|
- (since v1) no concurrency when it comes to publishing messages |
|
|
|
|
|
|
|
|
### Neutral |
|
|
### Neutral |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
[#951]: https://github.com/tendermint/tendermint/issues/951 |
|
|
|
|
|
[#1879]: https://github.com/tendermint/tendermint/issues/1879 |
|
|
|
|
|
[#1880]: https://github.com/tendermint/tendermint/issues/1880 |