You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

247 lines
7.5 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. # ADR 033: pubsub 2.0
  2. Author: Anton Kaliaev (@melekes)
  3. ## Changelog
  4. 02-10-2018: Initial draft
  5. 16-01-2019: Second version based on our conversation with Jae
  6. 17-01-2019: Third version explaining how new design solves current issues
  7. 25-01-2019: Fourth version to treat buffered and unbuffered channels differently
  8. ## Context
  9. Since the initial version of the pubsub, there's been a number of issues
  10. raised: [#951], [#1879], [#1880]. Some of them are high-level issues questioning the
  11. core design choices made. Others are minor and mostly about the interface of
  12. `Subscribe()` / `Publish()` functions.
  13. ### Sync vs Async
  14. Now, when publishing a message to subscribers, we can do it in a goroutine:
  15. _using channels for data transmission_
  16. ```go
  17. for each subscriber {
  18. out := subscriber.outc
  19. go func() {
  20. out <- msg
  21. }
  22. }
  23. ```
  24. _by invoking callback functions_
  25. ```go
  26. for each subscriber {
  27. go subscriber.callbackFn()
  28. }
  29. ```
  30. This gives us greater performance and allows us to avoid "slow client problem"
  31. (when other subscribers have to wait for a slow subscriber). A pool of
  32. goroutines can be used to avoid uncontrolled memory growth.
  33. In certain cases, this is what you want. But in our case, because we need
  34. strict ordering of events (if event A was published before B, the guaranteed
  35. delivery order will be A -> B), we can't publish msg in a new goroutine every time.
  36. We can also have a goroutine per subscriber, although we'd need to be careful
  37. with the number of subscribers. It's more difficult to implement as well +
  38. unclear if we'll benefit from it (cause we'd be forced to create N additional
  39. channels to distribute msg to these goroutines).
  40. ### Non-blocking send
  41. There is also a question whenever we should have a non-blocking send.
  42. Currently, sends are blocking, so publishing to one client can block on
  43. publishing to another. This means a slow or unresponsive client can halt the
  44. system. Instead, we can use a non-blocking send:
  45. ```go
  46. for each subscriber {
  47. out := subscriber.outc
  48. select {
  49. case out <- msg:
  50. default:
  51. log("subscriber %v buffer is full, skipping...")
  52. }
  53. }
  54. ```
  55. This fixes the "slow client problem", but there is no way for a slow client to
  56. know if it had missed a message. We could return a second channel and close it
  57. to indicate subscription termination. On the other hand, if we're going to
  58. stick with blocking send, **devs must always ensure subscriber's handling code
  59. does not block**, which is a hard task to put on their shoulders.
  60. The interim option is to run goroutines pool for a single message, wait for all
  61. goroutines to finish. This will solve "slow client problem", but we'd still
  62. have to wait `max(goroutine_X_time)` before we can publish the next message.
  63. ### Channels vs Callbacks
  64. Yet another question is whether we should use channels for message transmission or
  65. call subscriber-defined callback functions. Callback functions give subscribers
  66. more flexibility - you can use mutexes in there, channels, spawn goroutines,
  67. anything you really want. But they also carry local scope, which can result in
  68. memory leaks and/or memory usage increase.
  69. Go channels are de-facto standard for carrying data between goroutines.
  70. ### Why `Subscribe()` accepts an `out` channel?
  71. Because in our tests, we create buffered channels (cap: 1). Alternatively, we
  72. can make capacity an argument and return a channel.
  73. ## Decision
  74. ### MsgAndTags
  75. Use a `MsgAndTags` struct on the subscription channel to indicate what tags the
  76. msg matched.
  77. ```go
  78. type MsgAndTags struct {
  79. Msg interface{}
  80. Tags TagMap
  81. }
  82. ```
  83. ### Subscription Struct
  84. Change `Subscribe()` function to return a `Subscription` struct:
  85. ```go
  86. type Subscription struct {
  87. // private fields
  88. }
  89. func (s *Subscription) Out() <-chan MsgAndTags
  90. func (s *Subscription) Canceled() <-chan struct{}
  91. func (s *Subscription) Err() error
  92. ```
  93. `Out()` returns a channel onto which messages and tags are published.
  94. `Unsubscribe`/`UnsubscribeAll` does not close the channel to avoid clients from
  95. receiving a nil message.
  96. `Canceled()` returns a channel that's closed when the subscription is terminated
  97. and supposed to be used in a select statement.
  98. If the channel returned by `Canceled()` is not closed yet, `Err()` returns nil.
  99. If the channel is closed, `Err()` returns a non-nil error explaining why:
  100. `ErrUnsubscribed` if the subscriber choose to unsubscribe,
  101. `ErrOutOfCapacity` if the subscriber is not pulling messages fast enough and the channel returned by `Out()` became full.
  102. After `Err()` returns a non-nil error, successive calls to `Err() return the same error.
  103. ```go
  104. subscription, err := pubsub.Subscribe(...)
  105. if err != nil {
  106. // ...
  107. }
  108. for {
  109. select {
  110. case msgAndTags <- subscription.Out():
  111. // ...
  112. case <-subscription.Canceled():
  113. return subscription.Err()
  114. }
  115. ```
  116. ### Capacity and Subscriptions
  117. Make the `Out()` channel buffered (with capacity 1) by default. In most cases, we want to
  118. terminate the slow subscriber. Only in rare cases, we want to block the pubsub
  119. (e.g. when debugging consensus). This should lower the chances of the pubsub
  120. being frozen.
  121. ```go
  122. // outCap can be used to set capacity of Out channel
  123. // (1 by default, must be greater than 0).
  124. Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (Subscription, error) {
  125. ```
  126. Use a different function for an unbuffered channel:
  127. ```go
  128. // Subscription uses an unbuffered channel. Publishing will block.
  129. SubscribeUnbuffered(ctx context.Context, clientID string, query Query) (Subscription, error) {
  130. ```
  131. SubscribeUnbuffered should not be exposed to users.
  132. ### Blocking/Nonblocking
  133. The publisher should treat these kinds of channels separately.
  134. It should block on unbuffered channels (for use with internal consensus events
  135. in the consensus tests) and not block on the buffered ones. If a client is too
  136. slow to keep up with it's messages, it's subscription is terminated:
  137. for each subscription {
  138. out := subscription.outChan
  139. if cap(out) == 0 {
  140. // block on unbuffered channel
  141. out <- msg
  142. } else {
  143. // don't block on buffered channels
  144. select {
  145. case out <- msg:
  146. default:
  147. // set the error, notify on the cancel chan
  148. subscription.err = fmt.Errorf("client is too slow for msg)
  149. close(subscription.cancelChan)
  150. // ... unsubscribe and close out
  151. }
  152. }
  153. }
  154. ### How this new design solves the current issues?
  155. [#951] ([#1880]):
  156. Because of non-blocking send, situation where we'll deadlock is not possible
  157. anymore. If the client stops reading messages, it will be removed.
  158. [#1879]:
  159. MsgAndTags is used now instead of a plain message.
  160. ### Future problems and their possible solutions
  161. [#2826]
  162. One question I am still pondering about: how to prevent pubsub from slowing
  163. down consensus. We can increase the pubsub queue size (which is 0 now). Also,
  164. it's probably a good idea to limit the total number of subscribers.
  165. This can be made automatically. Say we set queue size to 1000 and, when it's >=
  166. 80% full, refuse new subscriptions.
  167. ## Status
  168. Implemented
  169. ## Consequences
  170. ### Positive
  171. - more idiomatic interface
  172. - subscribers know what tags msg was published with
  173. - subscribers aware of the reason their subscription was canceled
  174. ### Negative
  175. - (since v1) no concurrency when it comes to publishing messages
  176. ### Neutral
  177. [#951]: https://github.com/tendermint/tendermint/issues/951
  178. [#1879]: https://github.com/tendermint/tendermint/issues/1879
  179. [#1880]: https://github.com/tendermint/tendermint/issues/1880
  180. [#2826]: https://github.com/tendermint/tendermint/issues/2826