You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

351 lines
8.9 KiB

  1. // Package pubsub implements a pub-sub model with a single publisher (Server)
  2. // and multiple subscribers (clients).
  3. //
  4. // Though you can have multiple publishers by sharing a pointer to a server or
  5. // by giving the same channel to each publisher and publishing messages from
  6. // that channel (fan-in).
  7. //
  8. // Clients subscribe for messages, which could be of any type, using a query.
  9. // When some message is published, we match it with all queries. If there is a
  10. // match, this message will be pushed to all clients, subscribed to that query.
  11. // See query subpackage for our implementation.
  12. package pubsub
  13. import (
  14. "context"
  15. "errors"
  16. "sync"
  17. cmn "github.com/tendermint/tmlibs/common"
  18. )
  19. type operation int
  20. const (
  21. sub operation = iota
  22. pub
  23. unsub
  24. shutdown
  25. )
  26. var (
  27. // ErrSubscriptionNotFound is returned when a client tries to unsubscribe
  28. // from not existing subscription.
  29. ErrSubscriptionNotFound = errors.New("subscription not found")
  30. // ErrAlreadySubscribed is returned when a client tries to subscribe twice or
  31. // more using the same query.
  32. ErrAlreadySubscribed = errors.New("already subscribed")
  33. )
  34. type cmd struct {
  35. op operation
  36. query Query
  37. ch chan<- interface{}
  38. clientID string
  39. msg interface{}
  40. tags TagMap
  41. }
  42. // Query defines an interface for a query to be used for subscribing.
  43. type Query interface {
  44. Matches(tags TagMap) bool
  45. String() string
  46. }
  47. // Server allows clients to subscribe/unsubscribe for messages, publishing
  48. // messages with or without tags, and manages internal state.
  49. type Server struct {
  50. cmn.BaseService
  51. cmds chan cmd
  52. cmdsCap int
  53. mtx sync.RWMutex
  54. subscriptions map[string]map[string]Query // subscriber -> query (string) -> Query
  55. }
  56. // Option sets a parameter for the server.
  57. type Option func(*Server)
  58. // TagMap is used to associate tags to a message.
  59. // They can be queried by subscribers to choose messages they will received.
  60. type TagMap interface {
  61. // Get returns the value for a key, or nil if no value is present.
  62. // The ok result indicates whether value was found in the tags.
  63. Get(key string) (value string, ok bool)
  64. // Len returns the number of tags.
  65. Len() int
  66. }
  67. type tagMap map[string]string
  68. var _ TagMap = (*tagMap)(nil)
  69. // NewTagMap constructs a new immutable tag set from a map.
  70. func NewTagMap(data map[string]string) TagMap {
  71. return tagMap(data)
  72. }
  73. // Get returns the value for a key, or nil if no value is present.
  74. // The ok result indicates whether value was found in the tags.
  75. func (ts tagMap) Get(key string) (value string, ok bool) {
  76. value, ok = ts[key]
  77. return
  78. }
  79. // Len returns the number of tags.
  80. func (ts tagMap) Len() int {
  81. return len(ts)
  82. }
  83. // NewServer returns a new server. See the commentary on the Option functions
  84. // for a detailed description of how to configure buffering. If no options are
  85. // provided, the resulting server's queue is unbuffered.
  86. func NewServer(options ...Option) *Server {
  87. s := &Server{
  88. subscriptions: make(map[string]map[string]Query),
  89. }
  90. s.BaseService = *cmn.NewBaseService(nil, "PubSub", s)
  91. for _, option := range options {
  92. option(s)
  93. }
  94. // if BufferCapacity option was not set, the channel is unbuffered
  95. s.cmds = make(chan cmd, s.cmdsCap)
  96. return s
  97. }
  98. // BufferCapacity allows you to specify capacity for the internal server's
  99. // queue. Since the server, given Y subscribers, could only process X messages,
  100. // this option could be used to survive spikes (e.g. high amount of
  101. // transactions during peak hours).
  102. func BufferCapacity(cap int) Option {
  103. return func(s *Server) {
  104. if cap > 0 {
  105. s.cmdsCap = cap
  106. }
  107. }
  108. }
  109. // BufferCapacity returns capacity of the internal server's queue.
  110. func (s *Server) BufferCapacity() int {
  111. return s.cmdsCap
  112. }
  113. // Subscribe creates a subscription for the given client. It accepts a channel
  114. // on which messages matching the given query can be received. An error will be
  115. // returned to the caller if the context is canceled or if subscription already
  116. // exist for pair clientID and query.
  117. func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
  118. s.mtx.RLock()
  119. clientSubscriptions, ok := s.subscriptions[clientID]
  120. if ok {
  121. _, ok = clientSubscriptions[query.String()]
  122. }
  123. s.mtx.RUnlock()
  124. if ok {
  125. return ErrAlreadySubscribed
  126. }
  127. select {
  128. case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
  129. s.mtx.Lock()
  130. if _, ok = s.subscriptions[clientID]; !ok {
  131. s.subscriptions[clientID] = make(map[string]Query)
  132. }
  133. // preserve original query
  134. // see Unsubscribe
  135. s.subscriptions[clientID][query.String()] = query
  136. s.mtx.Unlock()
  137. return nil
  138. case <-ctx.Done():
  139. return ctx.Err()
  140. }
  141. }
  142. // Unsubscribe removes the subscription on the given query. An error will be
  143. // returned to the caller if the context is canceled or if subscription does
  144. // not exist.
  145. func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
  146. var origQuery Query
  147. s.mtx.RLock()
  148. clientSubscriptions, ok := s.subscriptions[clientID]
  149. if ok {
  150. origQuery, ok = clientSubscriptions[query.String()]
  151. }
  152. s.mtx.RUnlock()
  153. if !ok {
  154. return ErrSubscriptionNotFound
  155. }
  156. // original query is used here because we're using pointers as map keys
  157. select {
  158. case s.cmds <- cmd{op: unsub, clientID: clientID, query: origQuery}:
  159. s.mtx.Lock()
  160. delete(clientSubscriptions, query.String())
  161. s.mtx.Unlock()
  162. return nil
  163. case <-ctx.Done():
  164. return ctx.Err()
  165. }
  166. }
  167. // UnsubscribeAll removes all client subscriptions. An error will be returned
  168. // to the caller if the context is canceled or if subscription does not exist.
  169. func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
  170. s.mtx.RLock()
  171. _, ok := s.subscriptions[clientID]
  172. s.mtx.RUnlock()
  173. if !ok {
  174. return ErrSubscriptionNotFound
  175. }
  176. select {
  177. case s.cmds <- cmd{op: unsub, clientID: clientID}:
  178. s.mtx.Lock()
  179. delete(s.subscriptions, clientID)
  180. s.mtx.Unlock()
  181. return nil
  182. case <-ctx.Done():
  183. return ctx.Err()
  184. }
  185. }
  186. // Publish publishes the given message. An error will be returned to the caller
  187. // if the context is canceled.
  188. func (s *Server) Publish(ctx context.Context, msg interface{}) error {
  189. return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]string)))
  190. }
  191. // PublishWithTags publishes the given message with the set of tags. The set is
  192. // matched with clients queries. If there is a match, the message is sent to
  193. // the client.
  194. func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error {
  195. select {
  196. case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
  197. return nil
  198. case <-ctx.Done():
  199. return ctx.Err()
  200. }
  201. }
  202. // OnStop implements Service.OnStop by shutting down the server.
  203. func (s *Server) OnStop() {
  204. s.cmds <- cmd{op: shutdown}
  205. }
  206. // NOTE: not goroutine safe
  207. type state struct {
  208. // query -> client -> ch
  209. queries map[Query]map[string]chan<- interface{}
  210. // client -> query -> struct{}
  211. clients map[string]map[Query]struct{}
  212. }
  213. // OnStart implements Service.OnStart by starting the server.
  214. func (s *Server) OnStart() error {
  215. go s.loop(state{
  216. queries: make(map[Query]map[string]chan<- interface{}),
  217. clients: make(map[string]map[Query]struct{}),
  218. })
  219. return nil
  220. }
  221. // OnReset implements Service.OnReset
  222. func (s *Server) OnReset() error {
  223. return nil
  224. }
  225. func (s *Server) loop(state state) {
  226. loop:
  227. for cmd := range s.cmds {
  228. switch cmd.op {
  229. case unsub:
  230. if cmd.query != nil {
  231. state.remove(cmd.clientID, cmd.query)
  232. } else {
  233. state.removeAll(cmd.clientID)
  234. }
  235. case shutdown:
  236. for clientID := range state.clients {
  237. state.removeAll(clientID)
  238. }
  239. break loop
  240. case sub:
  241. state.add(cmd.clientID, cmd.query, cmd.ch)
  242. case pub:
  243. state.send(cmd.msg, cmd.tags)
  244. }
  245. }
  246. }
  247. func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
  248. // add query if needed
  249. if _, ok := state.queries[q]; !ok {
  250. state.queries[q] = make(map[string]chan<- interface{})
  251. }
  252. // create subscription
  253. state.queries[q][clientID] = ch
  254. // add client if needed
  255. if _, ok := state.clients[clientID]; !ok {
  256. state.clients[clientID] = make(map[Query]struct{})
  257. }
  258. state.clients[clientID][q] = struct{}{}
  259. }
  260. func (state *state) remove(clientID string, q Query) {
  261. clientToChannelMap, ok := state.queries[q]
  262. if !ok {
  263. return
  264. }
  265. ch, ok := clientToChannelMap[clientID]
  266. if ok {
  267. close(ch)
  268. delete(state.clients[clientID], q)
  269. // if it not subscribed to anything else, remove the client
  270. if len(state.clients[clientID]) == 0 {
  271. delete(state.clients, clientID)
  272. }
  273. delete(state.queries[q], clientID)
  274. if len(state.queries[q]) == 0 {
  275. delete(state.queries, q)
  276. }
  277. }
  278. }
  279. func (state *state) removeAll(clientID string) {
  280. queryMap, ok := state.clients[clientID]
  281. if !ok {
  282. return
  283. }
  284. for q := range queryMap {
  285. ch := state.queries[q][clientID]
  286. close(ch)
  287. delete(state.queries[q], clientID)
  288. if len(state.queries[q]) == 0 {
  289. delete(state.queries, q)
  290. }
  291. }
  292. delete(state.clients, clientID)
  293. }
  294. func (state *state) send(msg interface{}, tags TagMap) {
  295. for q, clientToChannelMap := range state.queries {
  296. if q.Matches(tags) {
  297. for _, ch := range clientToChannelMap {
  298. ch <- msg
  299. }
  300. }
  301. }
  302. }