You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

441 lines
12 KiB

abci: localClient improvements & bugfixes & pubsub Unsubscribe issues (#2748) * use READ lock/unlock in ConsensusState#GetLastHeight Refs #2721 * do not use defers when there's no need * fix peer formatting (output its address instead of the pointer) ``` [54310]: E[11-02|11:59:39.851] Connection failed @ sendRoutine module=p2p peer=0xb78f00 conn=MConn{74.207.236.148:26656} err="pong timeout" ``` https://github.com/tendermint/tendermint/issues/2721#issuecomment-435326581 * panic if peer has no state https://github.com/tendermint/tendermint/issues/2721#issuecomment-435347165 It's confusing that sometimes we check if peer has a state, but most of the times we expect it to be there 1. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/mempool/reactor.go#L138 2. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/rpc/core/consensus.go#L196 (edited) I will change everything to always assume peer has a state and panic otherwise that should help identify issues earlier * abci/localclient: extend lock on app callback App callback should be protected by lock as well (note this was already done for InitChainAsync, why not for others???). Otherwise, when we execute the block, tx might come in and call the callback in the same time we're updating it in execBlockOnProxyApp => DATA RACE Fixes #2721 Consensus state is locked ``` goroutine 113333 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00180009c, 0xc0000c7e00) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*RWMutex).RLock(0xc001800090) /usr/local/go/src/sync/rwmutex.go:50 +0x4e github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).GetRoundState(0xc001800000, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:218 +0x46 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).queryMaj23Routine(0xc0017def80, 0x11104a0, 0xc0072488f0, 0xc007248 9c0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:735 +0x16d created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).AddPeer /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:172 +0x236 ``` because localClient is locked ``` goroutine 1899 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00003363c, 0xc0000cb500) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*Mutex).Lock(0xc000033638) /usr/local/go/src/sync/mutex.go:134 +0xff github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).SetResponseCallback(0xc0001fb560, 0xc007868540) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:32 +0x33 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnConsensus).SetResponseCallback(0xc00002f750, 0xc007868540) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:57 +0x40 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.execBlockOnProxyApp(0x1104e20, 0xc002ca0ba0, 0x11092a0, 0xc00002f750, 0xc0001fe960, 0xc000bfc660, 0x110cfe0, 0xc000090330, 0xc9d12, 0xc000d9d5a0, ...) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:230 +0x1fd github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.(*BlockExecutor).ApplyBlock(0xc002c2a230, 0x7, 0x0, 0xc000eae880, 0x6, 0xc002e52c60, 0x16, 0x1f927, 0xc9d12, 0xc000d9d5a0, ...) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:96 +0x142 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).finalizeCommit(0xc001800000, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1339 +0xa3e github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryFinalizeCommit(0xc001800000, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1270 +0x451 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit.func1(0xc001800000, 0x0, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1218 +0x90 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit(0xc001800000, 0x1f928, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1247 +0x6b8 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xc003bc7ad0, 0xc003bc7b10) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1659 +0xbad github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xf1, 0xf1) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1517 +0x59 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg(0xc001800000, 0xd98200, 0xc0070dbed0, 0xc000cf4cc0, 0x28) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:660 +0x64b github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine(0xc001800000, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:617 +0x670 created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).OnStart /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:311 +0x132 ``` tx comes in and CheckTx is executed right when we execute the block ``` goroutine 111044 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00003363c, 0x0) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*Mutex).Lock(0xc000033638) /usr/local/go/src/sync/mutex.go:134 +0xff github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).CheckTxAsync(0xc0001fb0e0, 0xc002d94500, 0x13f, 0x280, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:85 +0x47 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnMempool).CheckTxAsync(0xc00002f720, 0xc002d94500, 0x13f, 0x280, 0x1) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:114 +0x51 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool.(*Mempool).CheckTx(0xc002d3a320, 0xc002d94500, 0x13f, 0x280, 0xc0072355f0, 0x0, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool/mempool.go:316 +0x17b github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core.BroadcastTxSync(0xc002d94500, 0x13f, 0x280, 0x0, 0x0, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core/mempool.go:93 +0xb8 reflect.Value.call(0xd85560, 0x10326c0, 0x13, 0xec7b8b, 0x4, 0xc00663f180, 0x1, 0x1, 0xc00663f180, 0xc00663f188, ...) /usr/local/go/src/reflect/value.go:447 +0x449 reflect.Value.Call(0xd85560, 0x10326c0, 0x13, 0xc00663f180, 0x1, 0x1, 0x0, 0x0, 0xc005cc9344) /usr/local/go/src/reflect/value.go:308 +0xa4 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.makeHTTPHandler.func2(0x1102060, 0xc00663f100, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/handlers.go:269 +0x188 net/http.HandlerFunc.ServeHTTP(0xc002c81f20, 0x1102060, 0xc00663f100, 0xc0082d7900) /usr/local/go/src/net/http/server.go:1964 +0x44 net/http.(*ServeMux).ServeHTTP(0xc002c81b60, 0x1102060, 0xc00663f100, 0xc0082d7900) /usr/local/go/src/net/http/server.go:2361 +0x127 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.maxBytesHandler.ServeHTTP(0x10f8a40, 0xc002c81b60, 0xf4240, 0x1102060, 0xc00663f100, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:219 +0xcf github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.RecoverAndLogHandler.func1(0x1103220, 0xc00121e620, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:192 +0x394 net/http.HandlerFunc.ServeHTTP(0xc002c06ea0, 0x1103220, 0xc00121e620, 0xc0082d7900) /usr/local/go/src/net/http/server.go:1964 +0x44 net/http.serverHandler.ServeHTTP(0xc001a1aa90, 0x1103220, 0xc00121e620, 0xc0082d7900) /usr/local/go/src/net/http/server.go:2741 +0xab net/http.(*conn).serve(0xc00785a3c0, 0x11041a0, 0xc000f844c0) /usr/local/go/src/net/http/server.go:1847 +0x646 created by net/http.(*Server).Serve /usr/local/go/src/net/http/server.go:2851 +0x2f5 ``` * consensus: use read lock in Receive#VoteMessage * use defer to unlock mutex because application might panic * use defer in every method of the localClient * add a changelog entry * drain channels before Unsubscribe(All) Read https://github.com/tendermint/tendermint/blob/55362ed76630f3e1ebec159a598f6a9fb5892cb1/libs/pubsub/pubsub.go#L13 for the detailed explanation of the issue. We'll need to fix it someday. Make sure to keep an eye on https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-033-pubsub.md * retry instead of panic when peer has no state in reactors other than consensus in /dump_consensus_state RPC endpoint, skip a peer with no state * rpc/core/mempool: simplify error messages * rpc/core/mempool: use time.After instead of timer also, do not log DeliverTx result (to be consistent with other memthods) * unlock before calling the callback in reqRes#SetCallback
6 years ago
[libs/pubsub] fix memory leak Refs #1755 I started with writing a test for wsConnection (WebsocketManager) where I: - create a WS connection - do a simple echo call - close it No leaking goroutines, nor any leaking memory were detected. For useful shortcuts see my blog post https://blog.cosmos.network/debugging-the-memory-leak-in-tendermint-210186711420 Then I went to the rpc tests to see if calling Subscribe results in memory growth. It did. I used a slightly modified version of TestHeaderEvents function: ``` func TestHeaderEvents(t *testing.T) { // memory heap before f, err := os.Create("/tmp/mem1.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() for i := 0; i < 100; i++ { c := getHTTPClient() err = c.Start() require.Nil(t, err) evtTyp := types.EventNewBlockHeader evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(t, err) _, ok := evt.(types.EventDataNewBlockHeader) require.True(t, ok) c.Stop() c = nil } runtime.GC() // memory heap before f, err = os.Create("/tmp/mem2.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() // dump all running goroutines time.Sleep(10 * time.Second) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } ``` ``` Showing nodes accounting for 35159.16kB, 100% of 35159.16kB total Showing top 10 nodes out of 48 flat flat% sum% cum cum% 32022.23kB 91.08% 91.08% 32022.23kB 91.08% github.com/tendermint/tendermint/libs/pubsub/query.(*QueryParser).Init 1056.33kB 3.00% 94.08% 1056.33kB 3.00% bufio.NewReaderSize 528.17kB 1.50% 95.58% 528.17kB 1.50% bufio.NewWriterSize 528.17kB 1.50% 97.09% 528.17kB 1.50% github.com/tendermint/tendermint/consensus.NewConsensusState 512.19kB 1.46% 98.54% 512.19kB 1.46% runtime.malg 512.08kB 1.46% 100% 512.08kB 1.46% syscall.ByteSliceFromString 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).(github.com/tendermint/tendermint/consensus.defaultDecideProposal)-fm 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).defaultDecideProposal 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).enterNewRound ``` 100 subscriptions produce 32MB. Again, no additional goroutines are running after the end of the test (wsConnection readRoutine and writeRoutine both finishes). **It means that some exiting goroutine or object is holding a reference to the *Query objects, which are leaking.** One of them is pubsub#loop. It's using state.queries to map queries to clients and state.clients to map clients to queries. Before this commit, we're not thoroughly cleaning state.queries, which was the reason for memory leakage.
7 years ago
  1. // Package pubsub implements a pub-sub model with a single publisher (Server)
  2. // and multiple subscribers (clients).
  3. //
  4. // Though you can have multiple publishers by sharing a pointer to a server or
  5. // by giving the same channel to each publisher and publishing messages from
  6. // that channel (fan-in).
  7. //
  8. // Clients subscribe for messages, which could be of any type, using a query.
  9. // When some message is published, we match it with all queries. If there is a
  10. // match, this message will be pushed to all clients, subscribed to that query.
  11. // See query subpackage for our implementation.
  12. //
  13. // Due to the blocking send implementation, a single subscriber can freeze an
  14. // entire server by not reading messages before it unsubscribes. To avoid such
  15. // scenario, subscribers must either:
  16. //
  17. // a) make sure they continue to read from the out channel until
  18. // Unsubscribe(All) is called
  19. //
  20. // s.Subscribe(ctx, sub, qry, out)
  21. // go func() {
  22. // for msg := range out {
  23. // // handle msg
  24. // // will exit automatically when out is closed by Unsubscribe(All)
  25. // }
  26. // }()
  27. // s.UnsubscribeAll(ctx, sub)
  28. //
  29. // b) drain the out channel before calling Unsubscribe(All)
  30. //
  31. // s.Subscribe(ctx, sub, qry, out)
  32. // defer func() {
  33. // // drain out to make sure we don't block
  34. // LOOP:
  35. // for {
  36. // select {
  37. // case <-out:
  38. // default:
  39. // break LOOP
  40. // }
  41. // }
  42. // s.UnsubscribeAll(ctx, sub)
  43. // }()
  44. // for msg := range out {
  45. // // handle msg
  46. // if err != nil {
  47. // return err
  48. // }
  49. // }
  50. //
  51. package pubsub
  52. import (
  53. "context"
  54. "errors"
  55. "sync"
  56. cmn "github.com/tendermint/tendermint/libs/common"
  57. )
  58. type operation int
  59. const (
  60. sub operation = iota
  61. pub
  62. unsub
  63. shutdown
  64. )
  65. var (
  66. // ErrSubscriptionNotFound is returned when a client tries to unsubscribe
  67. // from not existing subscription.
  68. ErrSubscriptionNotFound = errors.New("subscription not found")
  69. // ErrAlreadySubscribed is returned when a client tries to subscribe twice or
  70. // more using the same query.
  71. ErrAlreadySubscribed = errors.New("already subscribed")
  72. )
  73. type cmd struct {
  74. op operation
  75. query Query
  76. ch chan<- interface{}
  77. clientID string
  78. msg interface{}
  79. tags TagMap
  80. }
  81. // Query defines an interface for a query to be used for subscribing.
  82. type Query interface {
  83. Matches(tags TagMap) bool
  84. String() string
  85. }
  86. // Server allows clients to subscribe/unsubscribe for messages, publishing
  87. // messages with or without tags, and manages internal state.
  88. type Server struct {
  89. cmn.BaseService
  90. cmds chan cmd
  91. cmdsCap int
  92. mtx sync.RWMutex
  93. subscriptions map[string]map[string]struct{} // subscriber -> query (string) -> empty struct
  94. }
  95. // Option sets a parameter for the server.
  96. type Option func(*Server)
  97. // TagMap is used to associate tags to a message.
  98. // They can be queried by subscribers to choose messages they will received.
  99. type TagMap interface {
  100. // Get returns the value for a key, or nil if no value is present.
  101. // The ok result indicates whether value was found in the tags.
  102. Get(key string) (value string, ok bool)
  103. // Len returns the number of tags.
  104. Len() int
  105. }
  106. type tagMap map[string]string
  107. var _ TagMap = (*tagMap)(nil)
  108. // NewTagMap constructs a new immutable tag set from a map.
  109. func NewTagMap(data map[string]string) TagMap {
  110. return tagMap(data)
  111. }
  112. // Get returns the value for a key, or nil if no value is present.
  113. // The ok result indicates whether value was found in the tags.
  114. func (ts tagMap) Get(key string) (value string, ok bool) {
  115. value, ok = ts[key]
  116. return
  117. }
  118. // Len returns the number of tags.
  119. func (ts tagMap) Len() int {
  120. return len(ts)
  121. }
  122. // NewServer returns a new server. See the commentary on the Option functions
  123. // for a detailed description of how to configure buffering. If no options are
  124. // provided, the resulting server's queue is unbuffered.
  125. func NewServer(options ...Option) *Server {
  126. s := &Server{
  127. subscriptions: make(map[string]map[string]struct{}),
  128. }
  129. s.BaseService = *cmn.NewBaseService(nil, "PubSub", s)
  130. for _, option := range options {
  131. option(s)
  132. }
  133. // if BufferCapacity option was not set, the channel is unbuffered
  134. s.cmds = make(chan cmd, s.cmdsCap)
  135. return s
  136. }
  137. // BufferCapacity allows you to specify capacity for the internal server's
  138. // queue. Since the server, given Y subscribers, could only process X messages,
  139. // this option could be used to survive spikes (e.g. high amount of
  140. // transactions during peak hours).
  141. func BufferCapacity(cap int) Option {
  142. return func(s *Server) {
  143. if cap > 0 {
  144. s.cmdsCap = cap
  145. }
  146. }
  147. }
  148. // BufferCapacity returns capacity of the internal server's queue.
  149. func (s *Server) BufferCapacity() int {
  150. return s.cmdsCap
  151. }
  152. // Subscribe creates a subscription for the given client. It accepts a channel
  153. // on which messages matching the given query can be received. An error will be
  154. // returned to the caller if the context is canceled or if subscription already
  155. // exist for pair clientID and query.
  156. func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
  157. s.mtx.RLock()
  158. clientSubscriptions, ok := s.subscriptions[clientID]
  159. if ok {
  160. _, ok = clientSubscriptions[query.String()]
  161. }
  162. s.mtx.RUnlock()
  163. if ok {
  164. return ErrAlreadySubscribed
  165. }
  166. select {
  167. case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
  168. s.mtx.Lock()
  169. if _, ok = s.subscriptions[clientID]; !ok {
  170. s.subscriptions[clientID] = make(map[string]struct{})
  171. }
  172. s.subscriptions[clientID][query.String()] = struct{}{}
  173. s.mtx.Unlock()
  174. return nil
  175. case <-ctx.Done():
  176. return ctx.Err()
  177. case <-s.Quit():
  178. return nil
  179. }
  180. }
  181. // Unsubscribe removes the subscription on the given query. An error will be
  182. // returned to the caller if the context is canceled or if subscription does
  183. // not exist.
  184. func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
  185. s.mtx.RLock()
  186. clientSubscriptions, ok := s.subscriptions[clientID]
  187. if ok {
  188. _, ok = clientSubscriptions[query.String()]
  189. }
  190. s.mtx.RUnlock()
  191. if !ok {
  192. return ErrSubscriptionNotFound
  193. }
  194. select {
  195. case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}:
  196. s.mtx.Lock()
  197. delete(clientSubscriptions, query.String())
  198. if len(clientSubscriptions) == 0 {
  199. delete(s.subscriptions, clientID)
  200. }
  201. s.mtx.Unlock()
  202. return nil
  203. case <-ctx.Done():
  204. return ctx.Err()
  205. case <-s.Quit():
  206. return nil
  207. }
  208. }
  209. // UnsubscribeAll removes all client subscriptions. An error will be returned
  210. // to the caller if the context is canceled or if subscription does not exist.
  211. func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
  212. s.mtx.RLock()
  213. _, ok := s.subscriptions[clientID]
  214. s.mtx.RUnlock()
  215. if !ok {
  216. return ErrSubscriptionNotFound
  217. }
  218. select {
  219. case s.cmds <- cmd{op: unsub, clientID: clientID}:
  220. s.mtx.Lock()
  221. delete(s.subscriptions, clientID)
  222. s.mtx.Unlock()
  223. return nil
  224. case <-ctx.Done():
  225. return ctx.Err()
  226. case <-s.Quit():
  227. return nil
  228. }
  229. }
  230. // Publish publishes the given message. An error will be returned to the caller
  231. // if the context is canceled.
  232. func (s *Server) Publish(ctx context.Context, msg interface{}) error {
  233. return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]string)))
  234. }
  235. // PublishWithTags publishes the given message with the set of tags. The set is
  236. // matched with clients queries. If there is a match, the message is sent to
  237. // the client.
  238. func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error {
  239. select {
  240. case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
  241. return nil
  242. case <-ctx.Done():
  243. return ctx.Err()
  244. case <-s.Quit():
  245. return nil
  246. }
  247. }
  248. // OnStop implements Service.OnStop by shutting down the server.
  249. func (s *Server) OnStop() {
  250. s.cmds <- cmd{op: shutdown}
  251. }
  252. // NOTE: not goroutine safe
  253. type state struct {
  254. // query string -> client -> ch
  255. queryToChanMap map[string]map[string]chan<- interface{}
  256. // client -> query string -> struct{}
  257. clientToQueryMap map[string]map[string]struct{}
  258. // query string -> queryPlusRefCount
  259. queries map[string]*queryPlusRefCount
  260. }
  261. // queryPlusRefCount holds a pointer to a query and reference counter. When
  262. // refCount is zero, query will be removed.
  263. type queryPlusRefCount struct {
  264. q Query
  265. refCount int
  266. }
  267. // OnStart implements Service.OnStart by starting the server.
  268. func (s *Server) OnStart() error {
  269. go s.loop(state{
  270. queryToChanMap: make(map[string]map[string]chan<- interface{}),
  271. clientToQueryMap: make(map[string]map[string]struct{}),
  272. queries: make(map[string]*queryPlusRefCount),
  273. })
  274. return nil
  275. }
  276. // OnReset implements Service.OnReset
  277. func (s *Server) OnReset() error {
  278. return nil
  279. }
  280. func (s *Server) loop(state state) {
  281. loop:
  282. for cmd := range s.cmds {
  283. switch cmd.op {
  284. case unsub:
  285. if cmd.query != nil {
  286. state.remove(cmd.clientID, cmd.query)
  287. } else {
  288. state.removeAll(cmd.clientID)
  289. }
  290. case shutdown:
  291. for clientID := range state.clientToQueryMap {
  292. state.removeAll(clientID)
  293. }
  294. break loop
  295. case sub:
  296. state.add(cmd.clientID, cmd.query, cmd.ch)
  297. case pub:
  298. state.send(cmd.msg, cmd.tags)
  299. }
  300. }
  301. }
  302. func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
  303. qStr := q.String()
  304. // initialize clientToChannelMap per query if needed
  305. if _, ok := state.queryToChanMap[qStr]; !ok {
  306. state.queryToChanMap[qStr] = make(map[string]chan<- interface{})
  307. }
  308. // create subscription
  309. state.queryToChanMap[qStr][clientID] = ch
  310. // initialize queries if needed
  311. if _, ok := state.queries[qStr]; !ok {
  312. state.queries[qStr] = &queryPlusRefCount{q: q, refCount: 0}
  313. }
  314. // increment reference counter
  315. state.queries[qStr].refCount++
  316. // add client if needed
  317. if _, ok := state.clientToQueryMap[clientID]; !ok {
  318. state.clientToQueryMap[clientID] = make(map[string]struct{})
  319. }
  320. state.clientToQueryMap[clientID][qStr] = struct{}{}
  321. }
  322. func (state *state) remove(clientID string, q Query) {
  323. qStr := q.String()
  324. clientToChannelMap, ok := state.queryToChanMap[qStr]
  325. if !ok {
  326. return
  327. }
  328. ch, ok := clientToChannelMap[clientID]
  329. if !ok {
  330. return
  331. }
  332. close(ch)
  333. // remove the query from client map.
  334. // if client is not subscribed to anything else, remove it.
  335. delete(state.clientToQueryMap[clientID], qStr)
  336. if len(state.clientToQueryMap[clientID]) == 0 {
  337. delete(state.clientToQueryMap, clientID)
  338. }
  339. // remove the client from query map.
  340. // if query has no other clients subscribed, remove it.
  341. delete(state.queryToChanMap[qStr], clientID)
  342. if len(state.queryToChanMap[qStr]) == 0 {
  343. delete(state.queryToChanMap, qStr)
  344. }
  345. // decrease ref counter in queries
  346. state.queries[qStr].refCount--
  347. // remove the query if nobody else is using it
  348. if state.queries[qStr].refCount == 0 {
  349. delete(state.queries, qStr)
  350. }
  351. }
  352. func (state *state) removeAll(clientID string) {
  353. queryMap, ok := state.clientToQueryMap[clientID]
  354. if !ok {
  355. return
  356. }
  357. for qStr := range queryMap {
  358. ch := state.queryToChanMap[qStr][clientID]
  359. close(ch)
  360. // remove the client from query map.
  361. // if query has no other clients subscribed, remove it.
  362. delete(state.queryToChanMap[qStr], clientID)
  363. if len(state.queryToChanMap[qStr]) == 0 {
  364. delete(state.queryToChanMap, qStr)
  365. }
  366. // decrease ref counter in queries
  367. state.queries[qStr].refCount--
  368. // remove the query if nobody else is using it
  369. if state.queries[qStr].refCount == 0 {
  370. delete(state.queries, qStr)
  371. }
  372. }
  373. // remove the client.
  374. delete(state.clientToQueryMap, clientID)
  375. }
  376. func (state *state) send(msg interface{}, tags TagMap) {
  377. for qStr, clientToChannelMap := range state.queryToChanMap {
  378. q := state.queries[qStr].q
  379. if q.Matches(tags) {
  380. for _, ch := range clientToChannelMap {
  381. ch <- msg
  382. }
  383. }
  384. }
  385. }