Browse Source

fix memory leak in mempool reactor

Leaking goroutine:
```
114 @ 0x42f2bc 0x42f3ae 0x440794 0x4403b9 0x468002 0x9fe32d 0x9ff78f 0xa025ed 0x45e571
```

Explanation:
it blocks on an empty clist forever. so unless theres txs coming in,
this go routine will just sit there, holding onto the peer too.
if we're constantly reconnecting to some peer, old instances are not
garbage collected, leading to memory leak.

Fixes https://github.com/cosmos/gaia/issues/108
Previous attempt https://github.com/tendermint/tendermint/pull/1156
pull/1173/head
Anton Kaliaev 7 years ago
parent
commit
202d9a2c0c
No known key found for this signature in database GPG Key ID: 7B6881D965918214
4 changed files with 92 additions and 13 deletions
  1. +2
    -0
      glide.lock
  2. +1
    -0
      glide.yaml
  3. +38
    -11
      mempool/reactor.go
  4. +51
    -2
      mempool/reactor_test.go

+ 2
- 0
glide.lock View File

@ -203,3 +203,5 @@ testImports:
subpackages:
- assert
- require
- name: github.com/fortytw2/leaktest
version: 3b724c3d7b8729a35bf4e577f71653aec6e53513

+ 1
- 0
glide.yaml View File

@ -61,3 +61,4 @@ testImport:
subpackages:
- assert
- require
- package: github.com/fortytw2/leaktest

+ 38
- 11
mempool/reactor.go View File

@ -2,6 +2,7 @@ package mempool
import (
"bytes"
"context"
"fmt"
"reflect"
"time"
@ -101,24 +102,39 @@ type PeerState interface {
}
// Send new mempool txs to peer.
// TODO: Handle mempool or reactor shutdown - as is this routine
// may block forever if no new txs come in.
func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
if !memR.config.Broadcast {
return
}
// used to abort waiting until a tx available
// otherwise TxsFrontWait/NextWait could block forever if there are
// no txs
ctx, cancel := context.WithCancel(context.Background())
go func() {
const healthCheckInterval = 5 * time.Second
for {
if !memR.IsRunning() || !peer.IsRunning() {
cancel()
return
}
time.Sleep(healthCheckInterval)
}
}()
var next *clist.CElement
for {
if !memR.IsRunning() || !peer.IsRunning() {
return // Quit!
}
// This happens because the CElement we were looking at got
// garbage collected (removed). That is, .NextWait() returned nil.
// Go ahead and start from the beginning.
if next == nil {
// This happens because the CElement we were looking at got
// garbage collected (removed). That is, .NextWait() returned nil.
// Go ahead and start from the beginning.
next = memR.Mempool.TxsFrontWait() // Wait until a tx is available
// Wait until a tx is available
next = waitWithCancel(memR.Mempool.TxsFrontWait, ctx)
if ctx.Err() != nil {
return
}
}
memTx := next.Value.(*mempoolTx)
// make sure the peer is up to date
height := memTx.Height()
@ -136,9 +152,20 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
next = waitWithCancel(next.NextWait, ctx)
if ctx.Err() != nil {
return
}
}
}
next = next.NextWait()
continue
func waitWithCancel(f func() *clist.CElement, ctx context.Context) *clist.CElement {
el := make(chan *clist.CElement, 1)
select {
case el <- f():
return <-el
case <-ctx.Done():
return nil
}
}


+ 51
- 2
mempool/reactor_test.go View File

@ -6,6 +6,8 @@ import (
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/go-kit/kit/log/term"
@ -91,18 +93,65 @@ func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int
wg.Done()
}
var (
const (
NUM_TXS = 1000
TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow
)
func TestReactorBroadcastTxMessage(t *testing.T) {
config := cfg.TestConfig()
N := 4
const N = 4
reactors := makeAndConnectMempoolReactors(config, N)
defer func() {
for _, r := range reactors {
r.Stop()
}
}()
// send a bunch of txs to the first reactor's mempool
// and wait for them all to be received in the others
txs := checkTxs(t, reactors[0].Mempool, NUM_TXS)
waitForTxs(t, txs, reactors)
}
func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
config := cfg.TestConfig()
const N = 2
reactors := makeAndConnectMempoolReactors(config, N)
defer func() {
for _, r := range reactors {
r.Stop()
}
}()
// stop peer
sw := reactors[1].Switch
sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason"))
// check that we are not leaking any go-routines
// i.e. broadcastTxRoutine finishes when peer is stopped
leaktest.CheckTimeout(t, 10*time.Second)()
}
func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
config := cfg.TestConfig()
const N = 2
reactors := makeAndConnectMempoolReactors(config, N)
// stop reactors
for _, r := range reactors {
r.Stop()
}
// check that we are not leaking any go-routines
// i.e. broadcastTxRoutine finishes when reactor is stopped
leaktest.CheckTimeout(t, 10*time.Second)()
}

Loading…
Cancel
Save