You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

404 lines
11 KiB

6 years ago
6 years ago
[libs/pubsub] fix memory leak Refs #1755 I started with writing a test for wsConnection (WebsocketManager) where I: - create a WS connection - do a simple echo call - close it No leaking goroutines, nor any leaking memory were detected. For useful shortcuts see my blog post https://blog.cosmos.network/debugging-the-memory-leak-in-tendermint-210186711420 Then I went to the rpc tests to see if calling Subscribe results in memory growth. It did. I used a slightly modified version of TestHeaderEvents function: ``` func TestHeaderEvents(t *testing.T) { // memory heap before f, err := os.Create("/tmp/mem1.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() for i := 0; i < 100; i++ { c := getHTTPClient() err = c.Start() require.Nil(t, err) evtTyp := types.EventNewBlockHeader evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(t, err) _, ok := evt.(types.EventDataNewBlockHeader) require.True(t, ok) c.Stop() c = nil } runtime.GC() // memory heap before f, err = os.Create("/tmp/mem2.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() // dump all running goroutines time.Sleep(10 * time.Second) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } ``` ``` Showing nodes accounting for 35159.16kB, 100% of 35159.16kB total Showing top 10 nodes out of 48 flat flat% sum% cum cum% 32022.23kB 91.08% 91.08% 32022.23kB 91.08% github.com/tendermint/tendermint/libs/pubsub/query.(*QueryParser).Init 1056.33kB 3.00% 94.08% 1056.33kB 3.00% bufio.NewReaderSize 528.17kB 1.50% 95.58% 528.17kB 1.50% bufio.NewWriterSize 528.17kB 1.50% 97.09% 528.17kB 1.50% github.com/tendermint/tendermint/consensus.NewConsensusState 512.19kB 1.46% 98.54% 512.19kB 1.46% runtime.malg 512.08kB 1.46% 100% 512.08kB 1.46% syscall.ByteSliceFromString 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).(github.com/tendermint/tendermint/consensus.defaultDecideProposal)-fm 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).defaultDecideProposal 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).enterNewRound ``` 100 subscriptions produce 32MB. Again, no additional goroutines are running after the end of the test (wsConnection readRoutine and writeRoutine both finishes). **It means that some exiting goroutine or object is holding a reference to the *Query objects, which are leaking.** One of them is pubsub#loop. It's using state.queries to map queries to clients and state.clients to map clients to queries. Before this commit, we're not thoroughly cleaning state.queries, which was the reason for memory leakage.
7 years ago
  1. // Package pubsub implements a pub-sub model with a single publisher (Server)
  2. // and multiple subscribers (clients).
  3. //
  4. // Though you can have multiple publishers by sharing a pointer to a server or
  5. // by giving the same channel to each publisher and publishing messages from
  6. // that channel (fan-in).
  7. //
  8. // Clients subscribe for messages, which could be of any type, using a query.
  9. // When some message is published, we match it with all queries. If there is a
  10. // match, this message will be pushed to all clients, subscribed to that query.
  11. // See query subpackage for our implementation.
  12. //
  13. // Example:
  14. //
  15. // q, err := query.New("account.name='John'")
  16. // if err != nil {
  17. // return err
  18. // }
  19. // ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
  20. // defer cancel()
  21. // subscription, err := pubsub.Subscribe(ctx, "johns-transactions", q)
  22. // if err != nil {
  23. // return err
  24. // }
  25. //
  26. // for {
  27. // select {
  28. // case msg <- subscription.Out():
  29. // // handle msg.Data() and msg.Tags()
  30. // case <-subscription.Cancelled():
  31. // return subscription.Err()
  32. // }
  33. // }
  34. //
  35. package pubsub
  36. import (
  37. "context"
  38. "errors"
  39. "sync"
  40. cmn "github.com/tendermint/tendermint/libs/common"
  41. )
  42. type operation int
  43. const (
  44. sub operation = iota
  45. pub
  46. unsub
  47. shutdown
  48. )
  49. var (
  50. // ErrSubscriptionNotFound is returned when a client tries to unsubscribe
  51. // from not existing subscription.
  52. ErrSubscriptionNotFound = errors.New("subscription not found")
  53. // ErrAlreadySubscribed is returned when a client tries to subscribe twice or
  54. // more using the same query.
  55. ErrAlreadySubscribed = errors.New("already subscribed")
  56. )
  57. // Query defines an interface for a query to be used for subscribing.
  58. type Query interface {
  59. Matches(tags map[string]string) bool
  60. String() string
  61. }
  62. type cmd struct {
  63. op operation
  64. // subscribe, unsubscribe
  65. query Query
  66. subscription *Subscription
  67. clientID string
  68. // publish
  69. msg interface{}
  70. tags map[string]string
  71. }
  72. // Server allows clients to subscribe/unsubscribe for messages, publishing
  73. // messages with or without tags, and manages internal state.
  74. type Server struct {
  75. cmn.BaseService
  76. cmds chan cmd
  77. cmdsCap int
  78. mtx sync.RWMutex
  79. subscriptions map[string]map[string]struct{} // subscriber -> query (string) -> empty struct
  80. }
  81. // Option sets a parameter for the server.
  82. type Option func(*Server)
  83. // NewServer returns a new server. See the commentary on the Option functions
  84. // for a detailed description of how to configure buffering. If no options are
  85. // provided, the resulting server's queue is unbuffered.
  86. func NewServer(options ...Option) *Server {
  87. s := &Server{
  88. subscriptions: make(map[string]map[string]struct{}),
  89. }
  90. s.BaseService = *cmn.NewBaseService(nil, "PubSub", s)
  91. for _, option := range options {
  92. option(s)
  93. }
  94. // if BufferCapacity option was not set, the channel is unbuffered
  95. s.cmds = make(chan cmd, s.cmdsCap)
  96. return s
  97. }
  98. // BufferCapacity allows you to specify capacity for the internal server's
  99. // queue. Since the server, given Y subscribers, could only process X messages,
  100. // this option could be used to survive spikes (e.g. high amount of
  101. // transactions during peak hours).
  102. func BufferCapacity(cap int) Option {
  103. return func(s *Server) {
  104. if cap > 0 {
  105. s.cmdsCap = cap
  106. }
  107. }
  108. }
  109. // BufferCapacity returns capacity of the internal server's queue.
  110. func (s *Server) BufferCapacity() int {
  111. return s.cmdsCap
  112. }
  113. // Subscribe creates a subscription for the given client.
  114. //
  115. // An error will be returned to the caller if the context is canceled or if
  116. // subscription already exist for pair clientID and query.
  117. //
  118. // outCapacity can be used to set a capacity for Subscription#Out channel (1 by
  119. // default). Panics if outCapacity is less than or equal to zero. If you want
  120. // an unbuffered channel, use SubscribeUnbuffered.
  121. func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, outCapacity ...int) (*Subscription, error) {
  122. outCap := 1
  123. if len(outCapacity) > 0 {
  124. if outCapacity[0] <= 0 {
  125. panic("Negative or zero capacity. Use SubscribeUnbuffered if you want an unbuffered channel")
  126. }
  127. outCap = outCapacity[0]
  128. }
  129. return s.subscribe(ctx, clientID, query, outCap)
  130. }
  131. // SubscribeUnbuffered does the same as Subscribe, except it returns a
  132. // subscription with unbuffered channel. Use with caution as it can freeze the
  133. // server.
  134. func (s *Server) SubscribeUnbuffered(ctx context.Context, clientID string, query Query) (*Subscription, error) {
  135. return s.subscribe(ctx, clientID, query, 0)
  136. }
  137. func (s *Server) subscribe(ctx context.Context, clientID string, query Query, outCapacity int) (*Subscription, error) {
  138. s.mtx.RLock()
  139. clientSubscriptions, ok := s.subscriptions[clientID]
  140. if ok {
  141. _, ok = clientSubscriptions[query.String()]
  142. }
  143. s.mtx.RUnlock()
  144. if ok {
  145. return nil, ErrAlreadySubscribed
  146. }
  147. subscription := &Subscription{
  148. out: make(chan Message, outCapacity),
  149. cancelled: make(chan struct{}),
  150. }
  151. select {
  152. case s.cmds <- cmd{op: sub, clientID: clientID, query: query, subscription: subscription}:
  153. s.mtx.Lock()
  154. if _, ok = s.subscriptions[clientID]; !ok {
  155. s.subscriptions[clientID] = make(map[string]struct{})
  156. }
  157. s.subscriptions[clientID][query.String()] = struct{}{}
  158. s.mtx.Unlock()
  159. return subscription, nil
  160. case <-ctx.Done():
  161. return nil, ctx.Err()
  162. case <-s.Quit():
  163. return nil, nil
  164. }
  165. }
  166. // Unsubscribe removes the subscription on the given query. An error will be
  167. // returned to the caller if the context is canceled or if subscription does
  168. // not exist.
  169. func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
  170. s.mtx.RLock()
  171. clientSubscriptions, ok := s.subscriptions[clientID]
  172. if ok {
  173. _, ok = clientSubscriptions[query.String()]
  174. }
  175. s.mtx.RUnlock()
  176. if !ok {
  177. return ErrSubscriptionNotFound
  178. }
  179. select {
  180. case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}:
  181. s.mtx.Lock()
  182. delete(clientSubscriptions, query.String())
  183. if len(clientSubscriptions) == 0 {
  184. delete(s.subscriptions, clientID)
  185. }
  186. s.mtx.Unlock()
  187. return nil
  188. case <-ctx.Done():
  189. return ctx.Err()
  190. case <-s.Quit():
  191. return nil
  192. }
  193. }
  194. // UnsubscribeAll removes all client subscriptions. An error will be returned
  195. // to the caller if the context is canceled or if subscription does not exist.
  196. func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
  197. s.mtx.RLock()
  198. _, ok := s.subscriptions[clientID]
  199. s.mtx.RUnlock()
  200. if !ok {
  201. return ErrSubscriptionNotFound
  202. }
  203. select {
  204. case s.cmds <- cmd{op: unsub, clientID: clientID}:
  205. s.mtx.Lock()
  206. delete(s.subscriptions, clientID)
  207. s.mtx.Unlock()
  208. return nil
  209. case <-ctx.Done():
  210. return ctx.Err()
  211. case <-s.Quit():
  212. return nil
  213. }
  214. }
  215. // Publish publishes the given message. An error will be returned to the caller
  216. // if the context is canceled.
  217. func (s *Server) Publish(ctx context.Context, msg interface{}) error {
  218. return s.PublishWithTags(ctx, msg, make(map[string]string))
  219. }
  220. // PublishWithTags publishes the given message with the set of tags. The set is
  221. // matched with clients queries. If there is a match, the message is sent to
  222. // the client.
  223. func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]string) error {
  224. select {
  225. case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
  226. return nil
  227. case <-ctx.Done():
  228. return ctx.Err()
  229. case <-s.Quit():
  230. return nil
  231. }
  232. }
  233. // OnStop implements Service.OnStop by shutting down the server.
  234. func (s *Server) OnStop() {
  235. s.cmds <- cmd{op: shutdown}
  236. }
  237. // NOTE: not goroutine safe
  238. type state struct {
  239. // query string -> client -> subscription
  240. subscriptions map[string]map[string]*Subscription
  241. // query string -> queryPlusRefCount
  242. queries map[string]*queryPlusRefCount
  243. }
  244. // queryPlusRefCount holds a pointer to a query and reference counter. When
  245. // refCount is zero, query will be removed.
  246. type queryPlusRefCount struct {
  247. q Query
  248. refCount int
  249. }
  250. // OnStart implements Service.OnStart by starting the server.
  251. func (s *Server) OnStart() error {
  252. go s.loop(state{
  253. subscriptions: make(map[string]map[string]*Subscription),
  254. queries: make(map[string]*queryPlusRefCount),
  255. })
  256. return nil
  257. }
  258. // OnReset implements Service.OnReset
  259. func (s *Server) OnReset() error {
  260. return nil
  261. }
  262. func (s *Server) loop(state state) {
  263. loop:
  264. for cmd := range s.cmds {
  265. switch cmd.op {
  266. case unsub:
  267. if cmd.query != nil {
  268. state.remove(cmd.clientID, cmd.query.String(), ErrUnsubscribed)
  269. } else {
  270. state.removeClient(cmd.clientID, ErrUnsubscribed)
  271. }
  272. case shutdown:
  273. state.removeAll(nil)
  274. break loop
  275. case sub:
  276. state.add(cmd.clientID, cmd.query, cmd.subscription)
  277. case pub:
  278. state.send(cmd.msg, cmd.tags)
  279. }
  280. }
  281. }
  282. func (state *state) add(clientID string, q Query, subscription *Subscription) {
  283. qStr := q.String()
  284. // initialize subscription for this client per query if needed
  285. if _, ok := state.subscriptions[qStr]; !ok {
  286. state.subscriptions[qStr] = make(map[string]*Subscription)
  287. }
  288. // create subscription
  289. state.subscriptions[qStr][clientID] = subscription
  290. // initialize query if needed
  291. if _, ok := state.queries[qStr]; !ok {
  292. state.queries[qStr] = &queryPlusRefCount{q: q, refCount: 0}
  293. }
  294. // increment reference counter
  295. state.queries[qStr].refCount++
  296. }
  297. func (state *state) remove(clientID string, qStr string, reason error) {
  298. clientSubscriptions, ok := state.subscriptions[qStr]
  299. if !ok {
  300. return
  301. }
  302. subscription, ok := clientSubscriptions[clientID]
  303. if !ok {
  304. return
  305. }
  306. subscription.mtx.Lock()
  307. subscription.err = reason
  308. subscription.mtx.Unlock()
  309. close(subscription.cancelled)
  310. // remove client from query map.
  311. // if query has no other clients subscribed, remove it.
  312. delete(state.subscriptions[qStr], clientID)
  313. if len(state.subscriptions[qStr]) == 0 {
  314. delete(state.subscriptions, qStr)
  315. }
  316. // decrease ref counter in queries
  317. state.queries[qStr].refCount--
  318. // remove the query if nobody else is using it
  319. if state.queries[qStr].refCount == 0 {
  320. delete(state.queries, qStr)
  321. }
  322. }
  323. func (state *state) removeClient(clientID string, reason error) {
  324. for qStr, clientSubscriptions := range state.subscriptions {
  325. if _, ok := clientSubscriptions[clientID]; ok {
  326. state.remove(clientID, qStr, reason)
  327. }
  328. }
  329. }
  330. func (state *state) removeAll(reason error) {
  331. for qStr, clientSubscriptions := range state.subscriptions {
  332. for clientID := range clientSubscriptions {
  333. state.remove(clientID, qStr, reason)
  334. }
  335. }
  336. }
  337. func (state *state) send(msg interface{}, tags map[string]string) {
  338. for qStr, clientSubscriptions := range state.subscriptions {
  339. q := state.queries[qStr].q
  340. if q.Matches(tags) {
  341. for clientID, subscription := range clientSubscriptions {
  342. if cap(subscription.out) == 0 {
  343. // block on unbuffered channel
  344. subscription.out <- Message{msg, tags}
  345. } else {
  346. // don't block on buffered channels
  347. select {
  348. case subscription.out <- Message{msg, tags}:
  349. default:
  350. state.remove(clientID, qStr, ErrOutOfCapacity)
  351. }
  352. }
  353. }
  354. }
  355. }
  356. }