diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 26747ea69..9775d38a9 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -157,7 +157,7 @@ func makeBlock(height int64, state sm.State) *types.Block { // The Test peer type bcrTestPeer struct { - cmn.Service + *cmn.BaseService id p2p.ID ch chan interface{} } @@ -165,11 +165,12 @@ type bcrTestPeer struct { var _ p2p.Peer = (*bcrTestPeer)(nil) func newbcrTestPeer(id p2p.ID) *bcrTestPeer { - return &bcrTestPeer{ - Service: cmn.NewBaseService(nil, "bcrTestPeer", nil), - id: id, - ch: make(chan interface{}, 2), + bcr := &bcrTestPeer{ + id: id, + ch: make(chan interface{}, 2), } + bcr.BaseService = cmn.NewBaseService(nil, "bcrTestPeer", bcr) + return bcr } func (tp *bcrTestPeer) lastValue() interface{} { return <-tp.ch } @@ -195,3 +196,4 @@ func (tp *bcrTestPeer) IsOutbound() bool { return false } func (tp *bcrTestPeer) IsPersistent() bool { return true } func (tp *bcrTestPeer) Get(s string) interface{} { return s } func (tp *bcrTestPeer) Set(string, interface{}) {} +func (tp *bcrTestPeer) QuitChan() <-chan struct{} { return tp.Quit } diff --git a/glide.lock b/glide.lock index 801c9f4d6..cfb28d137 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 78f23456c3ca7af06fc26e59107de92a7b208776643bda398b0a05348153da1b -updated: 2018-02-03T03:31:46.976175875-05:00 +hash: 8aeec731d864d5d3008b4403c3229800148c9b472969ef6e5181a8c93ac1f4c8 +updated: 2018-02-05T18:46:05.226387951Z imports: - name: github.com/btcsuite/btcd version: 50de9da05b50eb15658bb350f6ea24368a111ab7 @@ -116,9 +116,8 @@ imports: version: b6fc872b42d41158a60307db4da051dd6f179415 subpackages: - data - - nowriter/tmlegacy - name: github.com/tendermint/tmlibs - version: deaaf014d8b8d1095054380a38b1b00e293f725f + version: 51684dabf79c2079f32cc25d6bccb748ee098386 subpackages: - autofile - cli @@ -194,6 +193,8 @@ testImports: version: 346938d642f2ec3594ed81d874461961cd0faa76 subpackages: - spew +- name: github.com/fortytw2/leaktest + version: 3b724c3d7b8729a35bf4e577f71653aec6e53513 - name: github.com/pmezard/go-difflib version: 792786c7400a136282c1664665ae0a8db921c6c2 subpackages: diff --git a/glide.yaml b/glide.yaml index c2726708f..fe8747817 100644 --- a/glide.yaml +++ b/glide.yaml @@ -30,7 +30,7 @@ import: subpackages: - data - package: github.com/tendermint/tmlibs - version: develop + version: 51684dabf79c2079f32cc25d6bccb748ee098386 subpackages: - autofile - cli @@ -42,17 +42,16 @@ import: - log - merkle - pubsub + - pubsub/query - package: golang.org/x/crypto subpackages: - nacl/box - nacl/secretbox - ripemd160 -- package: golang.org/x/net - subpackages: - - context - package: google.golang.org/grpc version: v1.7.3 testImport: +- package: github.com/fortytw2/leaktest - package: github.com/go-kit/kit version: ^0.6.0 subpackages: diff --git a/mempool/mempool.go b/mempool/mempool.go index 0cdd1dee3..ec4f98478 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -178,10 +178,17 @@ func (mem *Mempool) Flush() { } } -// TxsFrontWait returns the first transaction in the ordered list for peer goroutines to call .NextWait() on. -// It blocks until the mempool is not empty (ie. until the internal `mem.txs` has at least one element) -func (mem *Mempool) TxsFrontWait() *clist.CElement { - return mem.txs.FrontWait() +// TxsFront returns the first transaction in the ordered list for peer +// goroutines to call .NextWait() on. +func (mem *Mempool) TxsFront() *clist.CElement { + return mem.txs.Front() +} + +// TxsWaitChan returns a channel to wait on transactions. It will be closed +// once the mempool is not empty (ie. the internal `mem.txs` has at least one +// element) +func (mem *Mempool) TxsWaitChan() <-chan struct{} { + return mem.txs.WaitChan() } // CheckTx executes a new transaction against the application to determine its validity diff --git a/mempool/reactor.go b/mempool/reactor.go index 4e43bb0c5..98c83337c 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -101,8 +101,6 @@ type PeerState interface { } // Send new mempool txs to peer. -// TODO: Handle mempool or reactor shutdown - as is this routine -// may block forever if no new txs come in. func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { if !memR.config.Broadcast { return @@ -110,15 +108,22 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { var next *clist.CElement for { - if !memR.IsRunning() || !peer.IsRunning() { - return // Quit! - } + // This happens because the CElement we were looking at got garbage + // collected (removed). That is, .NextWait() returned nil. Go ahead and + // start from the beginning. if next == nil { - // This happens because the CElement we were looking at got - // garbage collected (removed). That is, .NextWait() returned nil. - // Go ahead and start from the beginning. - next = memR.Mempool.TxsFrontWait() // Wait until a tx is available + select { + case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available + if next = memR.Mempool.TxsFront(); next == nil { + continue + } + case <-peer.QuitChan(): + return + case <-memR.Quit: + return + } } + memTx := next.Value.(*mempoolTx) // make sure the peer is up to date height := memTx.Height() @@ -137,8 +142,15 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { continue } - next = next.NextWait() - continue + select { + case <-next.NextWaitChan(): + // see the start of the for loop for nil check + next = next.Next() + case <-peer.QuitChan(): + return + case <-memR.Quit: + return + } } } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 45458a983..9f0b5b48b 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -91,18 +91,65 @@ func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int wg.Done() } -var ( +const ( NUM_TXS = 1000 TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow ) func TestReactorBroadcastTxMessage(t *testing.T) { config := cfg.TestConfig() - N := 4 + const N = 4 reactors := makeAndConnectMempoolReactors(config, N) + defer func() { + for _, r := range reactors { + r.Stop() + } + }() // send a bunch of txs to the first reactor's mempool // and wait for them all to be received in the others txs := checkTxs(t, reactors[0].Mempool, NUM_TXS) waitForTxs(t, txs, reactors) } + +// func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { +// if testing.Short() { +// t.Skip("skipping test in short mode.") +// } + +// config := cfg.TestConfig() +// const N = 2 +// reactors := makeAndConnectMempoolReactors(config, N) +// defer func() { +// for _, r := range reactors { +// r.Stop() +// } +// }() + +// // stop peer +// sw := reactors[1].Switch +// sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason")) + +// // check that we are not leaking any go-routines +// // i.e. broadcastTxRoutine finishes when peer is stopped +// leaktest.CheckTimeout(t, 10*time.Second)() +// } + +// func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { +// if testing.Short() { +// t.Skip("skipping test in short mode.") +// } + +// config := cfg.TestConfig() +// const N = 2 +// reactors := makeAndConnectMempoolReactors(config, N) + +// // stop reactors +// for _, r := range reactors { +// r.Stop() +// } + +// // check that we are not leaking any go-routines +// // i.e. broadcastTxRoutine finishes when reactor is stopped +// leaktest.CheckTimeout(t, 10*time.Second)() +// } diff --git a/p2p/peer.go b/p2p/peer.go index 67ce411cd..cff99ad1f 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -18,6 +18,7 @@ import ( // Peer is an interface representing a peer connected on a reactor. type Peer interface { cmn.Service + QuitChan() <-chan struct{} ID() ID // peer's cryptographic ID IsOutbound() bool // did we dial the peer @@ -331,6 +332,11 @@ func (p *peer) String() string { return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) } +// QuitChan returns a channel, which will be closed once peer is stopped. +func (p *peer) QuitChan() <-chan struct{} { + return p.Quit +} + //------------------------------------------------------------------ // helper funcs diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 82dafecd4..6aeb7a3c7 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -368,3 +368,4 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false } func (mp mockPeer) TrySend(byte, interface{}) bool { return false } func (mp mockPeer) Set(string, interface{}) {} func (mp mockPeer) Get(string) interface{} { return nil } +func (mp mockPeer) QuitChan() <-chan struct{} { return mp.Quit }