diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 26747ea69..9775d38a9 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -157,7 +157,7 @@ func makeBlock(height int64, state sm.State) *types.Block { // The Test peer type bcrTestPeer struct { - cmn.Service + *cmn.BaseService id p2p.ID ch chan interface{} } @@ -165,11 +165,12 @@ type bcrTestPeer struct { var _ p2p.Peer = (*bcrTestPeer)(nil) func newbcrTestPeer(id p2p.ID) *bcrTestPeer { - return &bcrTestPeer{ - Service: cmn.NewBaseService(nil, "bcrTestPeer", nil), - id: id, - ch: make(chan interface{}, 2), + bcr := &bcrTestPeer{ + id: id, + ch: make(chan interface{}, 2), } + bcr.BaseService = cmn.NewBaseService(nil, "bcrTestPeer", bcr) + return bcr } func (tp *bcrTestPeer) lastValue() interface{} { return <-tp.ch } @@ -195,3 +196,4 @@ func (tp *bcrTestPeer) IsOutbound() bool { return false } func (tp *bcrTestPeer) IsPersistent() bool { return true } func (tp *bcrTestPeer) Get(s string) interface{} { return s } func (tp *bcrTestPeer) Set(string, interface{}) {} +func (tp *bcrTestPeer) QuitChan() <-chan struct{} { return tp.Quit } diff --git a/glide.lock b/glide.lock index 61107006a..333314c1f 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 78f23456c3ca7af06fc26e59107de92a7b208776643bda398b0a05348153da1b -updated: 2018-02-03T03:31:46.976175875-05:00 +hash: 94a3f8a3cf531e0cdde8bc160a2f4bab6f269d99a9a9633404e5badb0481f02c +updated: 2018-02-05T10:04:25.7693634Z imports: - name: github.com/btcsuite/btcd version: 50de9da05b50eb15658bb350f6ea24368a111ab7 @@ -116,9 +116,12 @@ imports: version: b6fc872b42d41158a60307db4da051dd6f179415 subpackages: - data - - nowriter/tmlegacy +- name: github.com/tendermint/iavl + version: 1a59ec0c82dc940c25339dd7c834df5cb76a95cb + subpackages: + - iavl - name: github.com/tendermint/tmlibs - version: deaaf014d8b8d1095054380a38b1b00e293f725f + version: 51684dabf79c2079f32cc25d6bccb748ee098386 subpackages: - autofile - cli @@ -194,6 +197,8 @@ testImports: version: 346938d642f2ec3594ed81d874461961cd0faa76 subpackages: - spew +- name: github.com/fortytw2/leaktest + version: 3b724c3d7b8729a35bf4e577f71653aec6e53513 - name: github.com/pmezard/go-difflib version: 792786c7400a136282c1664665ae0a8db921c6c2 subpackages: @@ -203,5 +208,3 @@ testImports: subpackages: - assert - require -- name: github.com/fortytw2/leaktest - version: 3b724c3d7b8729a35bf4e577f71653aec6e53513 diff --git a/glide.yaml b/glide.yaml index 238110c29..da72cf178 100644 --- a/glide.yaml +++ b/glide.yaml @@ -29,9 +29,13 @@ import: version: master subpackages: - data -- package: github.com/tendermint/tmlibs +- package: github.com/tendermint/iavl version: develop subpackages: + - iavl +- package: github.com/tendermint/tmlibs + version: 51684dabf79c2079f32cc25d6bccb748ee098386 + subpackages: - autofile - cli - cli/flags diff --git a/mempool/mempool.go b/mempool/mempool.go index 0cdd1dee3..ec4f98478 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -178,10 +178,17 @@ func (mem *Mempool) Flush() { } } -// TxsFrontWait returns the first transaction in the ordered list for peer goroutines to call .NextWait() on. -// It blocks until the mempool is not empty (ie. until the internal `mem.txs` has at least one element) -func (mem *Mempool) TxsFrontWait() *clist.CElement { - return mem.txs.FrontWait() +// TxsFront returns the first transaction in the ordered list for peer +// goroutines to call .NextWait() on. +func (mem *Mempool) TxsFront() *clist.CElement { + return mem.txs.Front() +} + +// TxsWaitChan returns a channel to wait on transactions. It will be closed +// once the mempool is not empty (ie. the internal `mem.txs` has at least one +// element) +func (mem *Mempool) TxsWaitChan() <-chan struct{} { + return mem.txs.WaitChan() } // CheckTx executes a new transaction against the application to determine its validity diff --git a/mempool/reactor.go b/mempool/reactor.go index 66f32dd9c..98c83337c 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -2,7 +2,6 @@ package mempool import ( "bytes" - "context" "fmt" "reflect" "time" @@ -107,30 +106,20 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { return } - // used to abort waiting until a tx available - // otherwise TxsFrontWait/NextWait could block forever if there are - // no txs - ctx, cancel := context.WithCancel(context.Background()) - go func() { - const healthCheckInterval = 5 * time.Second - for { - if !memR.IsRunning() || !peer.IsRunning() { - cancel() - return - } - time.Sleep(healthCheckInterval) - } - }() - var next *clist.CElement for { - // This happens because the CElement we were looking at got - // garbage collected (removed). That is, .NextWait() returned nil. - // Go ahead and start from the beginning. + // This happens because the CElement we were looking at got garbage + // collected (removed). That is, .NextWait() returned nil. Go ahead and + // start from the beginning. if next == nil { - // Wait until a tx is available - next = waitWithCancel(memR.Mempool.TxsFrontWait, ctx) - if ctx.Err() != nil { + select { + case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available + if next = memR.Mempool.TxsFront(); next == nil { + continue + } + case <-peer.QuitChan(): + return + case <-memR.Quit: return } } @@ -152,23 +141,19 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } - next = waitWithCancel(next.NextWait, ctx) - if ctx.Err() != nil { + + select { + case <-next.NextWaitChan(): + // see the start of the for loop for nil check + next = next.Next() + case <-peer.QuitChan(): + return + case <-memR.Quit: return } } } -func waitWithCancel(f func() *clist.CElement, ctx context.Context) *clist.CElement { - el := make(chan *clist.CElement, 1) - select { - case el <- f(): - return <-el - case <-ctx.Done(): - return nil - } -} - //----------------------------------------------------------------------------- // Messages diff --git a/p2p/peer.go b/p2p/peer.go index 67ce411cd..cff99ad1f 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -18,6 +18,7 @@ import ( // Peer is an interface representing a peer connected on a reactor. type Peer interface { cmn.Service + QuitChan() <-chan struct{} ID() ID // peer's cryptographic ID IsOutbound() bool // did we dial the peer @@ -331,6 +332,11 @@ func (p *peer) String() string { return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) } +// QuitChan returns a channel, which will be closed once peer is stopped. +func (p *peer) QuitChan() <-chan struct{} { + return p.Quit +} + //------------------------------------------------------------------ // helper funcs diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 82dafecd4..6aeb7a3c7 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -368,3 +368,4 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false } func (mp mockPeer) TrySend(byte, interface{}) bool { return false } func (mp mockPeer) Set(string, interface{}) {} func (mp mockPeer) Get(string) interface{} { return nil } +func (mp mockPeer) QuitChan() <-chan struct{} { return mp.Quit }