You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

193 lines
5.9 KiB

  1. # ADR 033: pubsub 2.0
  2. Author: Anton Kaliaev (@melekes)
  3. ## Changelog
  4. 02-10-2018: Initial draft
  5. 16-01-2019: Second version based on our conversation with Jae
  6. 17-01-2019: Third version explaining how new design solves current issues
  7. ## Context
  8. Since the initial version of the pubsub, there's been a number of issues
  9. raised: #951, #1879, #1880. Some of them are high-level issues questioning the
  10. core design choices made. Others are minor and mostly about the interface of
  11. `Subscribe()` / `Publish()` functions.
  12. ### Sync vs Async
  13. Now, when publishing a message to subscribers, we can do it in a goroutine:
  14. _using channels for data transmission_
  15. ```go
  16. for each subscriber {
  17. out := subscriber.outc
  18. go func() {
  19. out <- msg
  20. }
  21. }
  22. ```
  23. _by invoking callback functions_
  24. ```go
  25. for each subscriber {
  26. go subscriber.callbackFn()
  27. }
  28. ```
  29. This gives us greater performance and allows us to avoid "slow client problem"
  30. (when other subscribers have to wait for a slow subscriber). A pool of
  31. goroutines can be used to avoid uncontrolled memory growth.
  32. In certain cases, this is what you want. But in our case, because we need
  33. strict ordering of events (if event A was published before B, the guaranteed
  34. delivery order will be A -> B), we can't publish msg in a new goroutine every time.
  35. We can also have a goroutine per subscriber, although we'd need to be careful
  36. with the number of subscribers. It's more difficult to implement as well +
  37. unclear if we'll benefit from it (cause we'd be forced to create N additional
  38. channels to distribute msg to these goroutines).
  39. ### Non-blocking send
  40. There is also a question whenever we should have a non-blocking send:
  41. ```go
  42. for each subscriber {
  43. out := subscriber.outc
  44. select {
  45. case out <- msg:
  46. default:
  47. log("subscriber %v buffer is full, skipping...")
  48. }
  49. }
  50. ```
  51. This fixes the "slow client problem", but there is no way for a slow client to
  52. know if it had missed a message. We could return a second channel and close it
  53. to indicate subscription termination. On the other hand, if we're going to
  54. stick with blocking send, **devs must always ensure subscriber's handling code
  55. does not block**, which is a hard task to put on their shoulders.
  56. The interim option is to run goroutines pool for a single message, wait for all
  57. goroutines to finish. This will solve "slow client problem", but we'd still
  58. have to wait `max(goroutine_X_time)` before we can publish the next message.
  59. ### Channels vs Callbacks
  60. Yet another question is whether we should use channels for message transmission or
  61. call subscriber-defined callback functions. Callback functions give subscribers
  62. more flexibility - you can use mutexes in there, channels, spawn goroutines,
  63. anything you really want. But they also carry local scope, which can result in
  64. memory leaks and/or memory usage increase.
  65. Go channels are de-facto standard for carrying data between goroutines.
  66. ### Why `Subscribe()` accepts an `out` channel?
  67. Because in our tests, we create buffered channels (cap: 1). Alternatively, we
  68. can make capacity an argument.
  69. ## Decision
  70. Change Subscribe() function to return a `Subscription` struct:
  71. ```go
  72. type Subscription struct {
  73. // private fields
  74. }
  75. func (s *Subscription) Out() <-chan MsgAndTags
  76. func (s *Subscription) Cancelled() <-chan struct{}
  77. func (s *Subscription) Err() error
  78. ```
  79. Out returns a channel onto which messages and tags are published.
  80. Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from
  81. receiving a nil message.
  82. Cancelled returns a channel that's closed when the subscription is terminated
  83. and supposed to be used in a select statement.
  84. If Cancelled is not closed yet, Err() returns nil.
  85. If Cancelled is closed, Err returns a non-nil error explaining why:
  86. Unsubscribed if the subscriber choose to unsubscribe,
  87. OutOfCapacity if the subscriber is not pulling messages fast enough and the Out channel become full.
  88. After Err returns a non-nil error, successive calls to Err() return the same error.
  89. ```go
  90. subscription, err := pubsub.Subscribe(...)
  91. if err != nil {
  92. // ...
  93. }
  94. for {
  95. select {
  96. case msgAndTags <- subscription.Out():
  97. // ...
  98. case <-subscription.Cancelled():
  99. return subscription.Err()
  100. }
  101. ```
  102. Make Out() channel buffered (cap: 1) by default. In most cases, we want to
  103. terminate the slow subscriber. Only in rare cases, we want to block the pubsub
  104. (e.g. when debugging consensus). This should lower the chances of the pubsub
  105. being frozen.
  106. ```go
  107. // outCap can be used to set capacity of Out channel (1 by default). Set to 0
  108. for unbuffered channel (WARNING: it may block the pubsub).
  109. Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (Subscription, error) {
  110. ```
  111. Also, Out() channel should return tags along with a message:
  112. ```go
  113. type MsgAndTags struct {
  114. Msg interface{}
  115. Tags TagMap
  116. }
  117. ```
  118. to inform clients of which Tags were used with Msg.
  119. ### How this new design solves the current issues?
  120. https://github.com/tendermint/tendermint/issues/951 (https://github.com/tendermint/tendermint/issues/1880)
  121. Because of non-blocking send, situation where we'll deadlock is not possible
  122. anymore. If the client stops reading messages, it will be removed.
  123. https://github.com/tendermint/tendermint/issues/1879
  124. MsgAndTags is used now instead of a plain message.
  125. ### Future problems and their possible solutions
  126. https://github.com/tendermint/tendermint/issues/2826
  127. One question I am still pondering about: how to prevent pubsub from slowing
  128. down consensus. We can increase the pubsub queue size (which is 0 now). Also,
  129. it's probably a good idea to limit the total number of subscribers.
  130. This can be made automatically. Say we set queue size to 1000 and, when it's >=
  131. 80% full, refuse new subscriptions.
  132. ## Status
  133. In review
  134. ## Consequences
  135. ### Positive
  136. - more idiomatic interface
  137. - subscribers know what tags msg was published with
  138. - subscribers aware of the reason their subscription was cancelled
  139. ### Negative
  140. - (since v1) no concurrency when it comes to publishing messages
  141. ### Neutral