From 202d9a2c0c0168a7c2dfcf60e69c80d16090a78a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 30 Jan 2018 12:28:05 +0400 Subject: [PATCH] fix memory leak in mempool reactor Leaking goroutine: ``` 114 @ 0x42f2bc 0x42f3ae 0x440794 0x4403b9 0x468002 0x9fe32d 0x9ff78f 0xa025ed 0x45e571 ``` Explanation: it blocks on an empty clist forever. so unless theres txs coming in, this go routine will just sit there, holding onto the peer too. if we're constantly reconnecting to some peer, old instances are not garbage collected, leading to memory leak. Fixes https://github.com/cosmos/gaia/issues/108 Previous attempt https://github.com/tendermint/tendermint/pull/1156 --- glide.lock | 2 ++ glide.yaml | 1 + mempool/reactor.go | 49 ++++++++++++++++++++++++++++--------- mempool/reactor_test.go | 53 +++++++++++++++++++++++++++++++++++++++-- 4 files changed, 92 insertions(+), 13 deletions(-) diff --git a/glide.lock b/glide.lock index 801c9f4d6..61107006a 100644 --- a/glide.lock +++ b/glide.lock @@ -203,3 +203,5 @@ testImports: subpackages: - assert - require +- name: github.com/fortytw2/leaktest + version: 3b724c3d7b8729a35bf4e577f71653aec6e53513 diff --git a/glide.yaml b/glide.yaml index c2726708f..238110c29 100644 --- a/glide.yaml +++ b/glide.yaml @@ -61,3 +61,4 @@ testImport: subpackages: - assert - require +- package: github.com/fortytw2/leaktest diff --git a/mempool/reactor.go b/mempool/reactor.go index 4e43bb0c5..66f32dd9c 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -2,6 +2,7 @@ package mempool import ( "bytes" + "context" "fmt" "reflect" "time" @@ -101,24 +102,39 @@ type PeerState interface { } // Send new mempool txs to peer. -// TODO: Handle mempool or reactor shutdown - as is this routine -// may block forever if no new txs come in. func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { if !memR.config.Broadcast { return } + // used to abort waiting until a tx available + // otherwise TxsFrontWait/NextWait could block forever if there are + // no txs + ctx, cancel := context.WithCancel(context.Background()) + go func() { + const healthCheckInterval = 5 * time.Second + for { + if !memR.IsRunning() || !peer.IsRunning() { + cancel() + return + } + time.Sleep(healthCheckInterval) + } + }() + var next *clist.CElement for { - if !memR.IsRunning() || !peer.IsRunning() { - return // Quit! - } + // This happens because the CElement we were looking at got + // garbage collected (removed). That is, .NextWait() returned nil. + // Go ahead and start from the beginning. if next == nil { - // This happens because the CElement we were looking at got - // garbage collected (removed). That is, .NextWait() returned nil. - // Go ahead and start from the beginning. - next = memR.Mempool.TxsFrontWait() // Wait until a tx is available + // Wait until a tx is available + next = waitWithCancel(memR.Mempool.TxsFrontWait, ctx) + if ctx.Err() != nil { + return + } } + memTx := next.Value.(*mempoolTx) // make sure the peer is up to date height := memTx.Height() @@ -136,9 +152,20 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } + next = waitWithCancel(next.NextWait, ctx) + if ctx.Err() != nil { + return + } + } +} - next = next.NextWait() - continue +func waitWithCancel(f func() *clist.CElement, ctx context.Context) *clist.CElement { + el := make(chan *clist.CElement, 1) + select { + case el <- f(): + return <-el + case <-ctx.Done(): + return nil } } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 45458a983..3cbc57481 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/go-kit/kit/log/term" @@ -91,18 +93,65 @@ func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int wg.Done() } -var ( +const ( NUM_TXS = 1000 TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow ) func TestReactorBroadcastTxMessage(t *testing.T) { config := cfg.TestConfig() - N := 4 + const N = 4 reactors := makeAndConnectMempoolReactors(config, N) + defer func() { + for _, r := range reactors { + r.Stop() + } + }() // send a bunch of txs to the first reactor's mempool // and wait for them all to be received in the others txs := checkTxs(t, reactors[0].Mempool, NUM_TXS) waitForTxs(t, txs, reactors) } + +func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + config := cfg.TestConfig() + const N = 2 + reactors := makeAndConnectMempoolReactors(config, N) + defer func() { + for _, r := range reactors { + r.Stop() + } + }() + + // stop peer + sw := reactors[1].Switch + sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason")) + + // check that we are not leaking any go-routines + // i.e. broadcastTxRoutine finishes when peer is stopped + leaktest.CheckTimeout(t, 10*time.Second)() +} + +func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + config := cfg.TestConfig() + const N = 2 + reactors := makeAndConnectMempoolReactors(config, N) + + // stop reactors + for _, r := range reactors { + r.Stop() + } + + // check that we are not leaking any go-routines + // i.e. broadcastTxRoutine finishes when reactor is stopped + leaktest.CheckTimeout(t, 10*time.Second)() +}