diff --git a/docs/architecture/adr-033-pubsub.md b/docs/architecture/adr-033-pubsub.md index 55bf320c5..88922646f 100644 --- a/docs/architecture/adr-033-pubsub.md +++ b/docs/architecture/adr-033-pubsub.md @@ -10,6 +10,8 @@ Author: Anton Kaliaev (@melekes) 17-01-2019: Third version explaining how new design solves current issues +25-01-2019: Fourth version to treat buffered and unbuffered channels differently + ## Context Since the initial version of the pubsub, there's been a number of issues @@ -53,7 +55,10 @@ channels to distribute msg to these goroutines). ### Non-blocking send -There is also a question whenever we should have a non-blocking send: +There is also a question whenever we should have a non-blocking send. +Currently, sends are blocking, so publishing to one client can block on +publishing to another. This means a slow or unresponsive client can halt the +system. Instead, we can use a non-blocking send: ```go for each subscriber { @@ -89,10 +94,25 @@ Go channels are de-facto standard for carrying data between goroutines. ### Why `Subscribe()` accepts an `out` channel? Because in our tests, we create buffered channels (cap: 1). Alternatively, we -can make capacity an argument. +can make capacity an argument and return a channel. ## Decision +### MsgAndTags + +Use a `MsgAndTags` struct on the subscription channel to indicate what tags the +msg matched. + +```go +type MsgAndTags struct { + Msg interface{} + Tags TagMap +} +``` + +### Subscription Struct + + Change `Subscribe()` function to return a `Subscription` struct: ```go @@ -132,27 +152,53 @@ select { } ``` +### Capacity and Subscriptions + Make the `Out()` channel buffered (with capacity 1) by default. In most cases, we want to terminate the slow subscriber. Only in rare cases, we want to block the pubsub (e.g. when debugging consensus). This should lower the chances of the pubsub being frozen. ```go -// outCap can be used to set capacity of Out channel (1 by default). Set to 0 -// for unbuffered channel (WARNING: it may block the pubsub). +// outCap can be used to set capacity of Out channel +// (1 by default, must be greater than 0). Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (Subscription, error) { ``` -Also, the `Out()` channel should return tags along with a message: +Use a different function for an unbuffered channel: ```go -type MsgAndTags struct { - Msg interface{} - Tags TagMap -} +// Subscription uses an unbuffered channel. Publishing will block. +SubscribeUnbuffered(ctx context.Context, clientID string, query Query) (Subscription, error) { ``` -to inform clients of which Tags were used with Msg. +SubscribeUnbuffered should not be exposed to users. + +### Blocking/Nonblocking + +The publisher should treat these kinds of channels separately. +It should block on unbuffered channels (for use with internal consensus events +in the consensus tests) and not block on the buffered ones. If a client is too +slow to keep up with it's messages, it's subscription is terminated: + +for each subscription { + out := subscription.outChan + if cap(out) == 0 { + // block on unbuffered channel + out <- msg + } else { + // don't block on buffered channels + select { + case out <- msg: + default: + // set the error, notify on the cancel chan + subscription.err = fmt.Errorf("client is too slow for msg) + close(subscription.cancelChan) + + // ... unsubscribe and close out + } + } +} ### How this new design solves the current issues? @@ -167,7 +213,7 @@ MsgAndTags is used now instead of a plain message. ### Future problems and their possible solutions -https://github.com/tendermint/tendermint/issues/2826 +[#2826] One question I am still pondering about: how to prevent pubsub from slowing down consensus. We can increase the pubsub queue size (which is 0 now). Also, @@ -198,3 +244,4 @@ In review [#951]: https://github.com/tendermint/tendermint/issues/951 [#1879]: https://github.com/tendermint/tendermint/issues/1879 [#1880]: https://github.com/tendermint/tendermint/issues/1880 +[#2826]: https://github.com/tendermint/tendermint/issues/2826