Browse Source

Merge pull request #1173 from tendermint/memory-leak-in-reconnect-to-peer-2

fix memory leak in mempool reactor
pull/1200/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
66fc476e1e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 105 additions and 30 deletions
  1. +7
    -5
      blockchain/reactor_test.go
  2. +5
    -4
      glide.lock
  3. +3
    -4
      glide.yaml
  4. +11
    -4
      mempool/mempool.go
  5. +23
    -11
      mempool/reactor.go
  6. +49
    -2
      mempool/reactor_test.go
  7. +6
    -0
      p2p/peer.go
  8. +1
    -0
      p2p/pex/pex_reactor_test.go

+ 7
- 5
blockchain/reactor_test.go View File

@ -157,7 +157,7 @@ func makeBlock(height int64, state sm.State) *types.Block {
// The Test peer // The Test peer
type bcrTestPeer struct { type bcrTestPeer struct {
cmn.Service
*cmn.BaseService
id p2p.ID id p2p.ID
ch chan interface{} ch chan interface{}
} }
@ -165,11 +165,12 @@ type bcrTestPeer struct {
var _ p2p.Peer = (*bcrTestPeer)(nil) var _ p2p.Peer = (*bcrTestPeer)(nil)
func newbcrTestPeer(id p2p.ID) *bcrTestPeer { func newbcrTestPeer(id p2p.ID) *bcrTestPeer {
return &bcrTestPeer{
Service: cmn.NewBaseService(nil, "bcrTestPeer", nil),
id: id,
ch: make(chan interface{}, 2),
bcr := &bcrTestPeer{
id: id,
ch: make(chan interface{}, 2),
} }
bcr.BaseService = cmn.NewBaseService(nil, "bcrTestPeer", bcr)
return bcr
} }
func (tp *bcrTestPeer) lastValue() interface{} { return <-tp.ch } func (tp *bcrTestPeer) lastValue() interface{} { return <-tp.ch }
@ -195,3 +196,4 @@ func (tp *bcrTestPeer) IsOutbound() bool { return false }
func (tp *bcrTestPeer) IsPersistent() bool { return true } func (tp *bcrTestPeer) IsPersistent() bool { return true }
func (tp *bcrTestPeer) Get(s string) interface{} { return s } func (tp *bcrTestPeer) Get(s string) interface{} { return s }
func (tp *bcrTestPeer) Set(string, interface{}) {} func (tp *bcrTestPeer) Set(string, interface{}) {}
func (tp *bcrTestPeer) QuitChan() <-chan struct{} { return tp.Quit }

+ 5
- 4
glide.lock View File

@ -1,5 +1,5 @@
hash: 78f23456c3ca7af06fc26e59107de92a7b208776643bda398b0a05348153da1b
updated: 2018-02-03T03:31:46.976175875-05:00
hash: 8aeec731d864d5d3008b4403c3229800148c9b472969ef6e5181a8c93ac1f4c8
updated: 2018-02-05T18:46:05.226387951Z
imports: imports:
- name: github.com/btcsuite/btcd - name: github.com/btcsuite/btcd
version: 50de9da05b50eb15658bb350f6ea24368a111ab7 version: 50de9da05b50eb15658bb350f6ea24368a111ab7
@ -116,9 +116,8 @@ imports:
version: b6fc872b42d41158a60307db4da051dd6f179415 version: b6fc872b42d41158a60307db4da051dd6f179415
subpackages: subpackages:
- data - data
- nowriter/tmlegacy
- name: github.com/tendermint/tmlibs - name: github.com/tendermint/tmlibs
version: deaaf014d8b8d1095054380a38b1b00e293f725f
version: 51684dabf79c2079f32cc25d6bccb748ee098386
subpackages: subpackages:
- autofile - autofile
- cli - cli
@ -194,6 +193,8 @@ testImports:
version: 346938d642f2ec3594ed81d874461961cd0faa76 version: 346938d642f2ec3594ed81d874461961cd0faa76
subpackages: subpackages:
- spew - spew
- name: github.com/fortytw2/leaktest
version: 3b724c3d7b8729a35bf4e577f71653aec6e53513
- name: github.com/pmezard/go-difflib - name: github.com/pmezard/go-difflib
version: 792786c7400a136282c1664665ae0a8db921c6c2 version: 792786c7400a136282c1664665ae0a8db921c6c2
subpackages: subpackages:


+ 3
- 4
glide.yaml View File

@ -30,7 +30,7 @@ import:
subpackages: subpackages:
- data - data
- package: github.com/tendermint/tmlibs - package: github.com/tendermint/tmlibs
version: develop
version: 51684dabf79c2079f32cc25d6bccb748ee098386
subpackages: subpackages:
- autofile - autofile
- cli - cli
@ -42,17 +42,16 @@ import:
- log - log
- merkle - merkle
- pubsub - pubsub
- pubsub/query
- package: golang.org/x/crypto - package: golang.org/x/crypto
subpackages: subpackages:
- nacl/box - nacl/box
- nacl/secretbox - nacl/secretbox
- ripemd160 - ripemd160
- package: golang.org/x/net
subpackages:
- context
- package: google.golang.org/grpc - package: google.golang.org/grpc
version: v1.7.3 version: v1.7.3
testImport: testImport:
- package: github.com/fortytw2/leaktest
- package: github.com/go-kit/kit - package: github.com/go-kit/kit
version: ^0.6.0 version: ^0.6.0
subpackages: subpackages:


+ 11
- 4
mempool/mempool.go View File

@ -178,10 +178,17 @@ func (mem *Mempool) Flush() {
} }
} }
// TxsFrontWait returns the first transaction in the ordered list for peer goroutines to call .NextWait() on.
// It blocks until the mempool is not empty (ie. until the internal `mem.txs` has at least one element)
func (mem *Mempool) TxsFrontWait() *clist.CElement {
return mem.txs.FrontWait()
// TxsFront returns the first transaction in the ordered list for peer
// goroutines to call .NextWait() on.
func (mem *Mempool) TxsFront() *clist.CElement {
return mem.txs.Front()
}
// TxsWaitChan returns a channel to wait on transactions. It will be closed
// once the mempool is not empty (ie. the internal `mem.txs` has at least one
// element)
func (mem *Mempool) TxsWaitChan() <-chan struct{} {
return mem.txs.WaitChan()
} }
// CheckTx executes a new transaction against the application to determine its validity // CheckTx executes a new transaction against the application to determine its validity


+ 23
- 11
mempool/reactor.go View File

@ -101,8 +101,6 @@ type PeerState interface {
} }
// Send new mempool txs to peer. // Send new mempool txs to peer.
// TODO: Handle mempool or reactor shutdown - as is this routine
// may block forever if no new txs come in.
func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
if !memR.config.Broadcast { if !memR.config.Broadcast {
return return
@ -110,15 +108,22 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
var next *clist.CElement var next *clist.CElement
for { for {
if !memR.IsRunning() || !peer.IsRunning() {
return // Quit!
}
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
if next == nil { if next == nil {
// This happens because the CElement we were looking at got
// garbage collected (removed). That is, .NextWait() returned nil.
// Go ahead and start from the beginning.
next = memR.Mempool.TxsFrontWait() // Wait until a tx is available
select {
case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available
if next = memR.Mempool.TxsFront(); next == nil {
continue
}
case <-peer.QuitChan():
return
case <-memR.Quit:
return
}
} }
memTx := next.Value.(*mempoolTx) memTx := next.Value.(*mempoolTx)
// make sure the peer is up to date // make sure the peer is up to date
height := memTx.Height() height := memTx.Height()
@ -137,8 +142,15 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
continue continue
} }
next = next.NextWait()
continue
select {
case <-next.NextWaitChan():
// see the start of the for loop for nil check
next = next.Next()
case <-peer.QuitChan():
return
case <-memR.Quit:
return
}
} }
} }


+ 49
- 2
mempool/reactor_test.go View File

@ -91,18 +91,65 @@ func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int
wg.Done() wg.Done()
} }
var (
const (
NUM_TXS = 1000 NUM_TXS = 1000
TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow
) )
func TestReactorBroadcastTxMessage(t *testing.T) { func TestReactorBroadcastTxMessage(t *testing.T) {
config := cfg.TestConfig() config := cfg.TestConfig()
N := 4
const N = 4
reactors := makeAndConnectMempoolReactors(config, N) reactors := makeAndConnectMempoolReactors(config, N)
defer func() {
for _, r := range reactors {
r.Stop()
}
}()
// send a bunch of txs to the first reactor's mempool // send a bunch of txs to the first reactor's mempool
// and wait for them all to be received in the others // and wait for them all to be received in the others
txs := checkTxs(t, reactors[0].Mempool, NUM_TXS) txs := checkTxs(t, reactors[0].Mempool, NUM_TXS)
waitForTxs(t, txs, reactors) waitForTxs(t, txs, reactors)
} }
// func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
// if testing.Short() {
// t.Skip("skipping test in short mode.")
// }
// config := cfg.TestConfig()
// const N = 2
// reactors := makeAndConnectMempoolReactors(config, N)
// defer func() {
// for _, r := range reactors {
// r.Stop()
// }
// }()
// // stop peer
// sw := reactors[1].Switch
// sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason"))
// // check that we are not leaking any go-routines
// // i.e. broadcastTxRoutine finishes when peer is stopped
// leaktest.CheckTimeout(t, 10*time.Second)()
// }
// func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) {
// if testing.Short() {
// t.Skip("skipping test in short mode.")
// }
// config := cfg.TestConfig()
// const N = 2
// reactors := makeAndConnectMempoolReactors(config, N)
// // stop reactors
// for _, r := range reactors {
// r.Stop()
// }
// // check that we are not leaking any go-routines
// // i.e. broadcastTxRoutine finishes when reactor is stopped
// leaktest.CheckTimeout(t, 10*time.Second)()
// }

+ 6
- 0
p2p/peer.go View File

@ -18,6 +18,7 @@ import (
// Peer is an interface representing a peer connected on a reactor. // Peer is an interface representing a peer connected on a reactor.
type Peer interface { type Peer interface {
cmn.Service cmn.Service
QuitChan() <-chan struct{}
ID() ID // peer's cryptographic ID ID() ID // peer's cryptographic ID
IsOutbound() bool // did we dial the peer IsOutbound() bool // did we dial the peer
@ -331,6 +332,11 @@ func (p *peer) String() string {
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
} }
// QuitChan returns a channel, which will be closed once peer is stopped.
func (p *peer) QuitChan() <-chan struct{} {
return p.Quit
}
//------------------------------------------------------------------ //------------------------------------------------------------------
// helper funcs // helper funcs


+ 1
- 0
p2p/pex/pex_reactor_test.go View File

@ -368,3 +368,4 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false }
func (mp mockPeer) TrySend(byte, interface{}) bool { return false } func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
func (mp mockPeer) Set(string, interface{}) {} func (mp mockPeer) Set(string, interface{}) {}
func (mp mockPeer) Get(string) interface{} { return nil } func (mp mockPeer) Get(string) interface{} { return nil }
func (mp mockPeer) QuitChan() <-chan struct{} { return mp.Quit }

Loading…
Cancel
Save