You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

393 lines
10 KiB

[libs/pubsub] fix memory leak Refs #1755 I started with writing a test for wsConnection (WebsocketManager) where I: - create a WS connection - do a simple echo call - close it No leaking goroutines, nor any leaking memory were detected. For useful shortcuts see my blog post https://blog.cosmos.network/debugging-the-memory-leak-in-tendermint-210186711420 Then I went to the rpc tests to see if calling Subscribe results in memory growth. It did. I used a slightly modified version of TestHeaderEvents function: ``` func TestHeaderEvents(t *testing.T) { // memory heap before f, err := os.Create("/tmp/mem1.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() for i := 0; i < 100; i++ { c := getHTTPClient() err = c.Start() require.Nil(t, err) evtTyp := types.EventNewBlockHeader evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(t, err) _, ok := evt.(types.EventDataNewBlockHeader) require.True(t, ok) c.Stop() c = nil } runtime.GC() // memory heap before f, err = os.Create("/tmp/mem2.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() // dump all running goroutines time.Sleep(10 * time.Second) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } ``` ``` Showing nodes accounting for 35159.16kB, 100% of 35159.16kB total Showing top 10 nodes out of 48 flat flat% sum% cum cum% 32022.23kB 91.08% 91.08% 32022.23kB 91.08% github.com/tendermint/tendermint/libs/pubsub/query.(*QueryParser).Init 1056.33kB 3.00% 94.08% 1056.33kB 3.00% bufio.NewReaderSize 528.17kB 1.50% 95.58% 528.17kB 1.50% bufio.NewWriterSize 528.17kB 1.50% 97.09% 528.17kB 1.50% github.com/tendermint/tendermint/consensus.NewConsensusState 512.19kB 1.46% 98.54% 512.19kB 1.46% runtime.malg 512.08kB 1.46% 100% 512.08kB 1.46% syscall.ByteSliceFromString 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).(github.com/tendermint/tendermint/consensus.defaultDecideProposal)-fm 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).defaultDecideProposal 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).enterNewRound ``` 100 subscriptions produce 32MB. Again, no additional goroutines are running after the end of the test (wsConnection readRoutine and writeRoutine both finishes). **It means that some exiting goroutine or object is holding a reference to the *Query objects, which are leaking.** One of them is pubsub#loop. It's using state.queries to map queries to clients and state.clients to map clients to queries. Before this commit, we're not thoroughly cleaning state.queries, which was the reason for memory leakage.
7 years ago
[libs/pubsub] fix memory leak Refs #1755 I started with writing a test for wsConnection (WebsocketManager) where I: - create a WS connection - do a simple echo call - close it No leaking goroutines, nor any leaking memory were detected. For useful shortcuts see my blog post https://blog.cosmos.network/debugging-the-memory-leak-in-tendermint-210186711420 Then I went to the rpc tests to see if calling Subscribe results in memory growth. It did. I used a slightly modified version of TestHeaderEvents function: ``` func TestHeaderEvents(t *testing.T) { // memory heap before f, err := os.Create("/tmp/mem1.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() for i := 0; i < 100; i++ { c := getHTTPClient() err = c.Start() require.Nil(t, err) evtTyp := types.EventNewBlockHeader evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(t, err) _, ok := evt.(types.EventDataNewBlockHeader) require.True(t, ok) c.Stop() c = nil } runtime.GC() // memory heap before f, err = os.Create("/tmp/mem2.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() // dump all running goroutines time.Sleep(10 * time.Second) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } ``` ``` Showing nodes accounting for 35159.16kB, 100% of 35159.16kB total Showing top 10 nodes out of 48 flat flat% sum% cum cum% 32022.23kB 91.08% 91.08% 32022.23kB 91.08% github.com/tendermint/tendermint/libs/pubsub/query.(*QueryParser).Init 1056.33kB 3.00% 94.08% 1056.33kB 3.00% bufio.NewReaderSize 528.17kB 1.50% 95.58% 528.17kB 1.50% bufio.NewWriterSize 528.17kB 1.50% 97.09% 528.17kB 1.50% github.com/tendermint/tendermint/consensus.NewConsensusState 512.19kB 1.46% 98.54% 512.19kB 1.46% runtime.malg 512.08kB 1.46% 100% 512.08kB 1.46% syscall.ByteSliceFromString 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).(github.com/tendermint/tendermint/consensus.defaultDecideProposal)-fm 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).defaultDecideProposal 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).enterNewRound ``` 100 subscriptions produce 32MB. Again, no additional goroutines are running after the end of the test (wsConnection readRoutine and writeRoutine both finishes). **It means that some exiting goroutine or object is holding a reference to the *Query objects, which are leaking.** One of them is pubsub#loop. It's using state.queries to map queries to clients and state.clients to map clients to queries. Before this commit, we're not thoroughly cleaning state.queries, which was the reason for memory leakage.
7 years ago
[libs/pubsub] fix memory leak Refs #1755 I started with writing a test for wsConnection (WebsocketManager) where I: - create a WS connection - do a simple echo call - close it No leaking goroutines, nor any leaking memory were detected. For useful shortcuts see my blog post https://blog.cosmos.network/debugging-the-memory-leak-in-tendermint-210186711420 Then I went to the rpc tests to see if calling Subscribe results in memory growth. It did. I used a slightly modified version of TestHeaderEvents function: ``` func TestHeaderEvents(t *testing.T) { // memory heap before f, err := os.Create("/tmp/mem1.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() for i := 0; i < 100; i++ { c := getHTTPClient() err = c.Start() require.Nil(t, err) evtTyp := types.EventNewBlockHeader evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(t, err) _, ok := evt.(types.EventDataNewBlockHeader) require.True(t, ok) c.Stop() c = nil } runtime.GC() // memory heap before f, err = os.Create("/tmp/mem2.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() // dump all running goroutines time.Sleep(10 * time.Second) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } ``` ``` Showing nodes accounting for 35159.16kB, 100% of 35159.16kB total Showing top 10 nodes out of 48 flat flat% sum% cum cum% 32022.23kB 91.08% 91.08% 32022.23kB 91.08% github.com/tendermint/tendermint/libs/pubsub/query.(*QueryParser).Init 1056.33kB 3.00% 94.08% 1056.33kB 3.00% bufio.NewReaderSize 528.17kB 1.50% 95.58% 528.17kB 1.50% bufio.NewWriterSize 528.17kB 1.50% 97.09% 528.17kB 1.50% github.com/tendermint/tendermint/consensus.NewConsensusState 512.19kB 1.46% 98.54% 512.19kB 1.46% runtime.malg 512.08kB 1.46% 100% 512.08kB 1.46% syscall.ByteSliceFromString 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).(github.com/tendermint/tendermint/consensus.defaultDecideProposal)-fm 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).defaultDecideProposal 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).enterNewRound ``` 100 subscriptions produce 32MB. Again, no additional goroutines are running after the end of the test (wsConnection readRoutine and writeRoutine both finishes). **It means that some exiting goroutine or object is holding a reference to the *Query objects, which are leaking.** One of them is pubsub#loop. It's using state.queries to map queries to clients and state.clients to map clients to queries. Before this commit, we're not thoroughly cleaning state.queries, which was the reason for memory leakage.
7 years ago
  1. // Package pubsub implements a pub-sub model with a single publisher (Server)
  2. // and multiple subscribers (clients).
  3. //
  4. // Though you can have multiple publishers by sharing a pointer to a server or
  5. // by giving the same channel to each publisher and publishing messages from
  6. // that channel (fan-in).
  7. //
  8. // Clients subscribe for messages, which could be of any type, using a query.
  9. // When some message is published, we match it with all queries. If there is a
  10. // match, this message will be pushed to all clients, subscribed to that query.
  11. // See query subpackage for our implementation.
  12. //
  13. // Due to the blocking send implementation, a single subscriber can freeze an
  14. // entire server by not reading messages before it unsubscribes. To avoid such
  15. // scenario, subscribers must either:
  16. //
  17. // a) make sure they continue to read from the out channel until
  18. // Unsubscribe(All) is called
  19. //
  20. // s.Subscribe(ctx, sub, qry, out)
  21. // go func() {
  22. // for msg := range out {
  23. // // handle msg
  24. // // will exit automatically when out is closed by Unsubscribe(All)
  25. // }
  26. // }()
  27. // s.UnsubscribeAll(ctx, sub)
  28. //
  29. // b) drain the out channel before calling Unsubscribe(All)
  30. //
  31. // s.Subscribe(ctx, sub, qry, out)
  32. // defer func() {
  33. // for range out {
  34. // // drain out to make sure we don't block
  35. // }
  36. // s.UnsubscribeAll(ctx, sub)
  37. // }()
  38. // for msg := range out {
  39. // // handle msg
  40. // if err != nil {
  41. // return err
  42. // }
  43. // }
  44. //
  45. package pubsub
  46. import (
  47. "context"
  48. "errors"
  49. "sync"
  50. cmn "github.com/tendermint/tendermint/libs/common"
  51. )
  52. type operation int
  53. const (
  54. sub operation = iota
  55. pub
  56. unsub
  57. shutdown
  58. )
  59. var (
  60. // ErrSubscriptionNotFound is returned when a client tries to unsubscribe
  61. // from not existing subscription.
  62. ErrSubscriptionNotFound = errors.New("subscription not found")
  63. // ErrAlreadySubscribed is returned when a client tries to subscribe twice or
  64. // more using the same query.
  65. ErrAlreadySubscribed = errors.New("already subscribed")
  66. )
  67. type cmd struct {
  68. op operation
  69. query Query
  70. ch chan<- interface{}
  71. clientID string
  72. msg interface{}
  73. tags TagMap
  74. }
  75. // Query defines an interface for a query to be used for subscribing.
  76. type Query interface {
  77. Matches(tags TagMap) bool
  78. String() string
  79. }
  80. // Server allows clients to subscribe/unsubscribe for messages, publishing
  81. // messages with or without tags, and manages internal state.
  82. type Server struct {
  83. cmn.BaseService
  84. cmds chan cmd
  85. cmdsCap int
  86. mtx sync.RWMutex
  87. subscriptions map[string]map[string]Query // subscriber -> query (string) -> Query
  88. }
  89. // Option sets a parameter for the server.
  90. type Option func(*Server)
  91. // TagMap is used to associate tags to a message.
  92. // They can be queried by subscribers to choose messages they will received.
  93. type TagMap interface {
  94. // Get returns the value for a key, or nil if no value is present.
  95. // The ok result indicates whether value was found in the tags.
  96. Get(key string) (value string, ok bool)
  97. // Len returns the number of tags.
  98. Len() int
  99. }
  100. type tagMap map[string]string
  101. var _ TagMap = (*tagMap)(nil)
  102. // NewTagMap constructs a new immutable tag set from a map.
  103. func NewTagMap(data map[string]string) TagMap {
  104. return tagMap(data)
  105. }
  106. // Get returns the value for a key, or nil if no value is present.
  107. // The ok result indicates whether value was found in the tags.
  108. func (ts tagMap) Get(key string) (value string, ok bool) {
  109. value, ok = ts[key]
  110. return
  111. }
  112. // Len returns the number of tags.
  113. func (ts tagMap) Len() int {
  114. return len(ts)
  115. }
  116. // NewServer returns a new server. See the commentary on the Option functions
  117. // for a detailed description of how to configure buffering. If no options are
  118. // provided, the resulting server's queue is unbuffered.
  119. func NewServer(options ...Option) *Server {
  120. s := &Server{
  121. subscriptions: make(map[string]map[string]Query),
  122. }
  123. s.BaseService = *cmn.NewBaseService(nil, "PubSub", s)
  124. for _, option := range options {
  125. option(s)
  126. }
  127. // if BufferCapacity option was not set, the channel is unbuffered
  128. s.cmds = make(chan cmd, s.cmdsCap)
  129. return s
  130. }
  131. // BufferCapacity allows you to specify capacity for the internal server's
  132. // queue. Since the server, given Y subscribers, could only process X messages,
  133. // this option could be used to survive spikes (e.g. high amount of
  134. // transactions during peak hours).
  135. func BufferCapacity(cap int) Option {
  136. return func(s *Server) {
  137. if cap > 0 {
  138. s.cmdsCap = cap
  139. }
  140. }
  141. }
  142. // BufferCapacity returns capacity of the internal server's queue.
  143. func (s *Server) BufferCapacity() int {
  144. return s.cmdsCap
  145. }
  146. // Subscribe creates a subscription for the given client. It accepts a channel
  147. // on which messages matching the given query can be received. An error will be
  148. // returned to the caller if the context is canceled or if subscription already
  149. // exist for pair clientID and query.
  150. func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
  151. s.mtx.RLock()
  152. clientSubscriptions, ok := s.subscriptions[clientID]
  153. if ok {
  154. _, ok = clientSubscriptions[query.String()]
  155. }
  156. s.mtx.RUnlock()
  157. if ok {
  158. return ErrAlreadySubscribed
  159. }
  160. select {
  161. case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
  162. s.mtx.Lock()
  163. if _, ok = s.subscriptions[clientID]; !ok {
  164. s.subscriptions[clientID] = make(map[string]Query)
  165. }
  166. // preserve original query
  167. // see Unsubscribe
  168. s.subscriptions[clientID][query.String()] = query
  169. s.mtx.Unlock()
  170. return nil
  171. case <-ctx.Done():
  172. return ctx.Err()
  173. case <-s.Quit():
  174. return nil
  175. }
  176. }
  177. // Unsubscribe removes the subscription on the given query. An error will be
  178. // returned to the caller if the context is canceled or if subscription does
  179. // not exist.
  180. func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
  181. var origQuery Query
  182. s.mtx.RLock()
  183. clientSubscriptions, ok := s.subscriptions[clientID]
  184. if ok {
  185. origQuery, ok = clientSubscriptions[query.String()]
  186. }
  187. s.mtx.RUnlock()
  188. if !ok {
  189. return ErrSubscriptionNotFound
  190. }
  191. // original query is used here because we're using pointers as map keys
  192. select {
  193. case s.cmds <- cmd{op: unsub, clientID: clientID, query: origQuery}:
  194. s.mtx.Lock()
  195. delete(clientSubscriptions, query.String())
  196. s.mtx.Unlock()
  197. return nil
  198. case <-ctx.Done():
  199. return ctx.Err()
  200. case <-s.Quit():
  201. return nil
  202. }
  203. }
  204. // UnsubscribeAll removes all client subscriptions. An error will be returned
  205. // to the caller if the context is canceled or if subscription does not exist.
  206. func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
  207. s.mtx.RLock()
  208. _, ok := s.subscriptions[clientID]
  209. s.mtx.RUnlock()
  210. if !ok {
  211. return ErrSubscriptionNotFound
  212. }
  213. select {
  214. case s.cmds <- cmd{op: unsub, clientID: clientID}:
  215. s.mtx.Lock()
  216. delete(s.subscriptions, clientID)
  217. s.mtx.Unlock()
  218. return nil
  219. case <-ctx.Done():
  220. return ctx.Err()
  221. case <-s.Quit():
  222. return nil
  223. }
  224. }
  225. // Publish publishes the given message. An error will be returned to the caller
  226. // if the context is canceled.
  227. func (s *Server) Publish(ctx context.Context, msg interface{}) error {
  228. return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]string)))
  229. }
  230. // PublishWithTags publishes the given message with the set of tags. The set is
  231. // matched with clients queries. If there is a match, the message is sent to
  232. // the client.
  233. func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error {
  234. select {
  235. case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
  236. return nil
  237. case <-ctx.Done():
  238. return ctx.Err()
  239. case <-s.Quit():
  240. return nil
  241. }
  242. }
  243. // OnStop implements Service.OnStop by shutting down the server.
  244. func (s *Server) OnStop() {
  245. s.cmds <- cmd{op: shutdown}
  246. }
  247. // NOTE: not goroutine safe
  248. type state struct {
  249. // query -> client -> ch
  250. queries map[Query]map[string]chan<- interface{}
  251. // client -> query -> struct{}
  252. clients map[string]map[Query]struct{}
  253. }
  254. // OnStart implements Service.OnStart by starting the server.
  255. func (s *Server) OnStart() error {
  256. go s.loop(state{
  257. queries: make(map[Query]map[string]chan<- interface{}),
  258. clients: make(map[string]map[Query]struct{}),
  259. })
  260. return nil
  261. }
  262. // OnReset implements Service.OnReset
  263. func (s *Server) OnReset() error {
  264. return nil
  265. }
  266. func (s *Server) loop(state state) {
  267. loop:
  268. for cmd := range s.cmds {
  269. switch cmd.op {
  270. case unsub:
  271. if cmd.query != nil {
  272. state.remove(cmd.clientID, cmd.query)
  273. } else {
  274. state.removeAll(cmd.clientID)
  275. }
  276. case shutdown:
  277. for clientID := range state.clients {
  278. state.removeAll(clientID)
  279. }
  280. break loop
  281. case sub:
  282. state.add(cmd.clientID, cmd.query, cmd.ch)
  283. case pub:
  284. state.send(cmd.msg, cmd.tags)
  285. }
  286. }
  287. }
  288. func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
  289. // initialize clientToChannelMap per query if needed
  290. if _, ok := state.queries[q]; !ok {
  291. state.queries[q] = make(map[string]chan<- interface{})
  292. }
  293. // create subscription
  294. state.queries[q][clientID] = ch
  295. // add client if needed
  296. if _, ok := state.clients[clientID]; !ok {
  297. state.clients[clientID] = make(map[Query]struct{})
  298. }
  299. state.clients[clientID][q] = struct{}{}
  300. }
  301. func (state *state) remove(clientID string, q Query) {
  302. clientToChannelMap, ok := state.queries[q]
  303. if !ok {
  304. return
  305. }
  306. ch, ok := clientToChannelMap[clientID]
  307. if ok {
  308. close(ch)
  309. delete(state.clients[clientID], q)
  310. // if it not subscribed to anything else, remove the client
  311. if len(state.clients[clientID]) == 0 {
  312. delete(state.clients, clientID)
  313. }
  314. delete(state.queries[q], clientID)
  315. if len(state.queries[q]) == 0 {
  316. delete(state.queries, q)
  317. }
  318. }
  319. }
  320. func (state *state) removeAll(clientID string) {
  321. queryMap, ok := state.clients[clientID]
  322. if !ok {
  323. return
  324. }
  325. for q := range queryMap {
  326. ch := state.queries[q][clientID]
  327. close(ch)
  328. delete(state.queries[q], clientID)
  329. if len(state.queries[q]) == 0 {
  330. delete(state.queries, q)
  331. }
  332. }
  333. delete(state.clients, clientID)
  334. }
  335. func (state *state) send(msg interface{}, tags TagMap) {
  336. for q, clientToChannelMap := range state.queries {
  337. if q.Matches(tags) {
  338. for _, ch := range clientToChannelMap {
  339. ch <- msg
  340. }
  341. }
  342. }
  343. }