You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

360 lines
9.1 KiB

[libs/pubsub] fix memory leak Refs #1755 I started with writing a test for wsConnection (WebsocketManager) where I: - create a WS connection - do a simple echo call - close it No leaking goroutines, nor any leaking memory were detected. For useful shortcuts see my blog post https://blog.cosmos.network/debugging-the-memory-leak-in-tendermint-210186711420 Then I went to the rpc tests to see if calling Subscribe results in memory growth. It did. I used a slightly modified version of TestHeaderEvents function: ``` func TestHeaderEvents(t *testing.T) { // memory heap before f, err := os.Create("/tmp/mem1.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() for i := 0; i < 100; i++ { c := getHTTPClient() err = c.Start() require.Nil(t, err) evtTyp := types.EventNewBlockHeader evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(t, err) _, ok := evt.(types.EventDataNewBlockHeader) require.True(t, ok) c.Stop() c = nil } runtime.GC() // memory heap before f, err = os.Create("/tmp/mem2.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() // dump all running goroutines time.Sleep(10 * time.Second) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } ``` ``` Showing nodes accounting for 35159.16kB, 100% of 35159.16kB total Showing top 10 nodes out of 48 flat flat% sum% cum cum% 32022.23kB 91.08% 91.08% 32022.23kB 91.08% github.com/tendermint/tendermint/libs/pubsub/query.(*QueryParser).Init 1056.33kB 3.00% 94.08% 1056.33kB 3.00% bufio.NewReaderSize 528.17kB 1.50% 95.58% 528.17kB 1.50% bufio.NewWriterSize 528.17kB 1.50% 97.09% 528.17kB 1.50% github.com/tendermint/tendermint/consensus.NewConsensusState 512.19kB 1.46% 98.54% 512.19kB 1.46% runtime.malg 512.08kB 1.46% 100% 512.08kB 1.46% syscall.ByteSliceFromString 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).(github.com/tendermint/tendermint/consensus.defaultDecideProposal)-fm 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).defaultDecideProposal 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).enterNewRound ``` 100 subscriptions produce 32MB. Again, no additional goroutines are running after the end of the test (wsConnection readRoutine and writeRoutine both finishes). **It means that some exiting goroutine or object is holding a reference to the *Query objects, which are leaking.** One of them is pubsub#loop. It's using state.queries to map queries to clients and state.clients to map clients to queries. Before this commit, we're not thoroughly cleaning state.queries, which was the reason for memory leakage.
7 years ago
[libs/pubsub] fix memory leak Refs #1755 I started with writing a test for wsConnection (WebsocketManager) where I: - create a WS connection - do a simple echo call - close it No leaking goroutines, nor any leaking memory were detected. For useful shortcuts see my blog post https://blog.cosmos.network/debugging-the-memory-leak-in-tendermint-210186711420 Then I went to the rpc tests to see if calling Subscribe results in memory growth. It did. I used a slightly modified version of TestHeaderEvents function: ``` func TestHeaderEvents(t *testing.T) { // memory heap before f, err := os.Create("/tmp/mem1.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() for i := 0; i < 100; i++ { c := getHTTPClient() err = c.Start() require.Nil(t, err) evtTyp := types.EventNewBlockHeader evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(t, err) _, ok := evt.(types.EventDataNewBlockHeader) require.True(t, ok) c.Stop() c = nil } runtime.GC() // memory heap before f, err = os.Create("/tmp/mem2.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() // dump all running goroutines time.Sleep(10 * time.Second) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } ``` ``` Showing nodes accounting for 35159.16kB, 100% of 35159.16kB total Showing top 10 nodes out of 48 flat flat% sum% cum cum% 32022.23kB 91.08% 91.08% 32022.23kB 91.08% github.com/tendermint/tendermint/libs/pubsub/query.(*QueryParser).Init 1056.33kB 3.00% 94.08% 1056.33kB 3.00% bufio.NewReaderSize 528.17kB 1.50% 95.58% 528.17kB 1.50% bufio.NewWriterSize 528.17kB 1.50% 97.09% 528.17kB 1.50% github.com/tendermint/tendermint/consensus.NewConsensusState 512.19kB 1.46% 98.54% 512.19kB 1.46% runtime.malg 512.08kB 1.46% 100% 512.08kB 1.46% syscall.ByteSliceFromString 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).(github.com/tendermint/tendermint/consensus.defaultDecideProposal)-fm 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).defaultDecideProposal 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).enterNewRound ``` 100 subscriptions produce 32MB. Again, no additional goroutines are running after the end of the test (wsConnection readRoutine and writeRoutine both finishes). **It means that some exiting goroutine or object is holding a reference to the *Query objects, which are leaking.** One of them is pubsub#loop. It's using state.queries to map queries to clients and state.clients to map clients to queries. Before this commit, we're not thoroughly cleaning state.queries, which was the reason for memory leakage.
7 years ago
[libs/pubsub] fix memory leak Refs #1755 I started with writing a test for wsConnection (WebsocketManager) where I: - create a WS connection - do a simple echo call - close it No leaking goroutines, nor any leaking memory were detected. For useful shortcuts see my blog post https://blog.cosmos.network/debugging-the-memory-leak-in-tendermint-210186711420 Then I went to the rpc tests to see if calling Subscribe results in memory growth. It did. I used a slightly modified version of TestHeaderEvents function: ``` func TestHeaderEvents(t *testing.T) { // memory heap before f, err := os.Create("/tmp/mem1.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() for i := 0; i < 100; i++ { c := getHTTPClient() err = c.Start() require.Nil(t, err) evtTyp := types.EventNewBlockHeader evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(t, err) _, ok := evt.(types.EventDataNewBlockHeader) require.True(t, ok) c.Stop() c = nil } runtime.GC() // memory heap before f, err = os.Create("/tmp/mem2.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() // dump all running goroutines time.Sleep(10 * time.Second) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } ``` ``` Showing nodes accounting for 35159.16kB, 100% of 35159.16kB total Showing top 10 nodes out of 48 flat flat% sum% cum cum% 32022.23kB 91.08% 91.08% 32022.23kB 91.08% github.com/tendermint/tendermint/libs/pubsub/query.(*QueryParser).Init 1056.33kB 3.00% 94.08% 1056.33kB 3.00% bufio.NewReaderSize 528.17kB 1.50% 95.58% 528.17kB 1.50% bufio.NewWriterSize 528.17kB 1.50% 97.09% 528.17kB 1.50% github.com/tendermint/tendermint/consensus.NewConsensusState 512.19kB 1.46% 98.54% 512.19kB 1.46% runtime.malg 512.08kB 1.46% 100% 512.08kB 1.46% syscall.ByteSliceFromString 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).(github.com/tendermint/tendermint/consensus.defaultDecideProposal)-fm 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).defaultDecideProposal 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).enterNewRound ``` 100 subscriptions produce 32MB. Again, no additional goroutines are running after the end of the test (wsConnection readRoutine and writeRoutine both finishes). **It means that some exiting goroutine or object is holding a reference to the *Query objects, which are leaking.** One of them is pubsub#loop. It's using state.queries to map queries to clients and state.clients to map clients to queries. Before this commit, we're not thoroughly cleaning state.queries, which was the reason for memory leakage.
7 years ago
  1. // Package pubsub implements a pub-sub model with a single publisher (Server)
  2. // and multiple subscribers (clients).
  3. //
  4. // Though you can have multiple publishers by sharing a pointer to a server or
  5. // by giving the same channel to each publisher and publishing messages from
  6. // that channel (fan-in).
  7. //
  8. // Clients subscribe for messages, which could be of any type, using a query.
  9. // When some message is published, we match it with all queries. If there is a
  10. // match, this message will be pushed to all clients, subscribed to that query.
  11. // See query subpackage for our implementation.
  12. package pubsub
  13. import (
  14. "context"
  15. "errors"
  16. "sync"
  17. cmn "github.com/tendermint/tendermint/libs/common"
  18. )
  19. type operation int
  20. const (
  21. sub operation = iota
  22. pub
  23. unsub
  24. shutdown
  25. )
  26. var (
  27. // ErrSubscriptionNotFound is returned when a client tries to unsubscribe
  28. // from not existing subscription.
  29. ErrSubscriptionNotFound = errors.New("subscription not found")
  30. // ErrAlreadySubscribed is returned when a client tries to subscribe twice or
  31. // more using the same query.
  32. ErrAlreadySubscribed = errors.New("already subscribed")
  33. )
  34. type cmd struct {
  35. op operation
  36. query Query
  37. ch chan<- interface{}
  38. clientID string
  39. msg interface{}
  40. tags TagMap
  41. }
  42. // Query defines an interface for a query to be used for subscribing.
  43. type Query interface {
  44. Matches(tags TagMap) bool
  45. String() string
  46. }
  47. // Server allows clients to subscribe/unsubscribe for messages, publishing
  48. // messages with or without tags, and manages internal state.
  49. type Server struct {
  50. cmn.BaseService
  51. cmds chan cmd
  52. cmdsCap int
  53. mtx sync.RWMutex
  54. subscriptions map[string]map[string]Query // subscriber -> query (string) -> Query
  55. }
  56. // Option sets a parameter for the server.
  57. type Option func(*Server)
  58. // TagMap is used to associate tags to a message.
  59. // They can be queried by subscribers to choose messages they will received.
  60. type TagMap interface {
  61. // Get returns the value for a key, or nil if no value is present.
  62. // The ok result indicates whether value was found in the tags.
  63. Get(key string) (value string, ok bool)
  64. // Len returns the number of tags.
  65. Len() int
  66. }
  67. type tagMap map[string]string
  68. var _ TagMap = (*tagMap)(nil)
  69. // NewTagMap constructs a new immutable tag set from a map.
  70. func NewTagMap(data map[string]string) TagMap {
  71. return tagMap(data)
  72. }
  73. // Get returns the value for a key, or nil if no value is present.
  74. // The ok result indicates whether value was found in the tags.
  75. func (ts tagMap) Get(key string) (value string, ok bool) {
  76. value, ok = ts[key]
  77. return
  78. }
  79. // Len returns the number of tags.
  80. func (ts tagMap) Len() int {
  81. return len(ts)
  82. }
  83. // NewServer returns a new server. See the commentary on the Option functions
  84. // for a detailed description of how to configure buffering. If no options are
  85. // provided, the resulting server's queue is unbuffered.
  86. func NewServer(options ...Option) *Server {
  87. s := &Server{
  88. subscriptions: make(map[string]map[string]Query),
  89. }
  90. s.BaseService = *cmn.NewBaseService(nil, "PubSub", s)
  91. for _, option := range options {
  92. option(s)
  93. }
  94. // if BufferCapacity option was not set, the channel is unbuffered
  95. s.cmds = make(chan cmd, s.cmdsCap)
  96. return s
  97. }
  98. // BufferCapacity allows you to specify capacity for the internal server's
  99. // queue. Since the server, given Y subscribers, could only process X messages,
  100. // this option could be used to survive spikes (e.g. high amount of
  101. // transactions during peak hours).
  102. func BufferCapacity(cap int) Option {
  103. return func(s *Server) {
  104. if cap > 0 {
  105. s.cmdsCap = cap
  106. }
  107. }
  108. }
  109. // BufferCapacity returns capacity of the internal server's queue.
  110. func (s *Server) BufferCapacity() int {
  111. return s.cmdsCap
  112. }
  113. // Subscribe creates a subscription for the given client. It accepts a channel
  114. // on which messages matching the given query can be received. An error will be
  115. // returned to the caller if the context is canceled or if subscription already
  116. // exist for pair clientID and query.
  117. func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
  118. s.mtx.RLock()
  119. clientSubscriptions, ok := s.subscriptions[clientID]
  120. if ok {
  121. _, ok = clientSubscriptions[query.String()]
  122. }
  123. s.mtx.RUnlock()
  124. if ok {
  125. return ErrAlreadySubscribed
  126. }
  127. select {
  128. case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
  129. s.mtx.Lock()
  130. if _, ok = s.subscriptions[clientID]; !ok {
  131. s.subscriptions[clientID] = make(map[string]Query)
  132. }
  133. // preserve original query
  134. // see Unsubscribe
  135. s.subscriptions[clientID][query.String()] = query
  136. s.mtx.Unlock()
  137. return nil
  138. case <-ctx.Done():
  139. return ctx.Err()
  140. case <-s.Quit():
  141. return nil
  142. }
  143. }
  144. // Unsubscribe removes the subscription on the given query. An error will be
  145. // returned to the caller if the context is canceled or if subscription does
  146. // not exist.
  147. func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
  148. var origQuery Query
  149. s.mtx.RLock()
  150. clientSubscriptions, ok := s.subscriptions[clientID]
  151. if ok {
  152. origQuery, ok = clientSubscriptions[query.String()]
  153. }
  154. s.mtx.RUnlock()
  155. if !ok {
  156. return ErrSubscriptionNotFound
  157. }
  158. // original query is used here because we're using pointers as map keys
  159. select {
  160. case s.cmds <- cmd{op: unsub, clientID: clientID, query: origQuery}:
  161. s.mtx.Lock()
  162. delete(clientSubscriptions, query.String())
  163. s.mtx.Unlock()
  164. return nil
  165. case <-ctx.Done():
  166. return ctx.Err()
  167. case <-s.Quit():
  168. return nil
  169. }
  170. }
  171. // UnsubscribeAll removes all client subscriptions. An error will be returned
  172. // to the caller if the context is canceled or if subscription does not exist.
  173. func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
  174. s.mtx.RLock()
  175. _, ok := s.subscriptions[clientID]
  176. s.mtx.RUnlock()
  177. if !ok {
  178. return ErrSubscriptionNotFound
  179. }
  180. select {
  181. case s.cmds <- cmd{op: unsub, clientID: clientID}:
  182. s.mtx.Lock()
  183. delete(s.subscriptions, clientID)
  184. s.mtx.Unlock()
  185. return nil
  186. case <-ctx.Done():
  187. return ctx.Err()
  188. case <-s.Quit():
  189. return nil
  190. }
  191. }
  192. // Publish publishes the given message. An error will be returned to the caller
  193. // if the context is canceled.
  194. func (s *Server) Publish(ctx context.Context, msg interface{}) error {
  195. return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]string)))
  196. }
  197. // PublishWithTags publishes the given message with the set of tags. The set is
  198. // matched with clients queries. If there is a match, the message is sent to
  199. // the client.
  200. func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error {
  201. select {
  202. case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
  203. return nil
  204. case <-ctx.Done():
  205. return ctx.Err()
  206. case <-s.Quit():
  207. return nil
  208. }
  209. }
  210. // OnStop implements Service.OnStop by shutting down the server.
  211. func (s *Server) OnStop() {
  212. s.cmds <- cmd{op: shutdown}
  213. }
  214. // NOTE: not goroutine safe
  215. type state struct {
  216. // query -> client -> ch
  217. queries map[Query]map[string]chan<- interface{}
  218. // client -> query -> struct{}
  219. clients map[string]map[Query]struct{}
  220. }
  221. // OnStart implements Service.OnStart by starting the server.
  222. func (s *Server) OnStart() error {
  223. go s.loop(state{
  224. queries: make(map[Query]map[string]chan<- interface{}),
  225. clients: make(map[string]map[Query]struct{}),
  226. })
  227. return nil
  228. }
  229. // OnReset implements Service.OnReset
  230. func (s *Server) OnReset() error {
  231. return nil
  232. }
  233. func (s *Server) loop(state state) {
  234. loop:
  235. for cmd := range s.cmds {
  236. switch cmd.op {
  237. case unsub:
  238. if cmd.query != nil {
  239. state.remove(cmd.clientID, cmd.query)
  240. } else {
  241. state.removeAll(cmd.clientID)
  242. }
  243. case shutdown:
  244. for clientID := range state.clients {
  245. state.removeAll(clientID)
  246. }
  247. break loop
  248. case sub:
  249. state.add(cmd.clientID, cmd.query, cmd.ch)
  250. case pub:
  251. state.send(cmd.msg, cmd.tags)
  252. }
  253. }
  254. }
  255. func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
  256. // initialize clientToChannelMap per query if needed
  257. if _, ok := state.queries[q]; !ok {
  258. state.queries[q] = make(map[string]chan<- interface{})
  259. }
  260. // create subscription
  261. state.queries[q][clientID] = ch
  262. // add client if needed
  263. if _, ok := state.clients[clientID]; !ok {
  264. state.clients[clientID] = make(map[Query]struct{})
  265. }
  266. state.clients[clientID][q] = struct{}{}
  267. }
  268. func (state *state) remove(clientID string, q Query) {
  269. clientToChannelMap, ok := state.queries[q]
  270. if !ok {
  271. return
  272. }
  273. ch, ok := clientToChannelMap[clientID]
  274. if ok {
  275. close(ch)
  276. delete(state.clients[clientID], q)
  277. // if it not subscribed to anything else, remove the client
  278. if len(state.clients[clientID]) == 0 {
  279. delete(state.clients, clientID)
  280. }
  281. delete(state.queries[q], clientID)
  282. if len(state.queries[q]) == 0 {
  283. delete(state.queries, q)
  284. }
  285. }
  286. }
  287. func (state *state) removeAll(clientID string) {
  288. queryMap, ok := state.clients[clientID]
  289. if !ok {
  290. return
  291. }
  292. for q := range queryMap {
  293. ch := state.queries[q][clientID]
  294. close(ch)
  295. delete(state.queries[q], clientID)
  296. if len(state.queries[q]) == 0 {
  297. delete(state.queries, q)
  298. }
  299. }
  300. delete(state.clients, clientID)
  301. }
  302. func (state *state) send(msg interface{}, tags TagMap) {
  303. for q, clientToChannelMap := range state.queries {
  304. if q.Matches(tags) {
  305. for _, ch := range clientToChannelMap {
  306. ch <- msg
  307. }
  308. }
  309. }
  310. }