diff --git a/glide.lock b/glide.lock index 801c9f4d6..61107006a 100644 --- a/glide.lock +++ b/glide.lock @@ -203,3 +203,5 @@ testImports: subpackages: - assert - require +- name: github.com/fortytw2/leaktest + version: 3b724c3d7b8729a35bf4e577f71653aec6e53513 diff --git a/glide.yaml b/glide.yaml index c2726708f..238110c29 100644 --- a/glide.yaml +++ b/glide.yaml @@ -61,3 +61,4 @@ testImport: subpackages: - assert - require +- package: github.com/fortytw2/leaktest diff --git a/mempool/reactor.go b/mempool/reactor.go index 4e43bb0c5..66f32dd9c 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -2,6 +2,7 @@ package mempool import ( "bytes" + "context" "fmt" "reflect" "time" @@ -101,24 +102,39 @@ type PeerState interface { } // Send new mempool txs to peer. -// TODO: Handle mempool or reactor shutdown - as is this routine -// may block forever if no new txs come in. func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { if !memR.config.Broadcast { return } + // used to abort waiting until a tx available + // otherwise TxsFrontWait/NextWait could block forever if there are + // no txs + ctx, cancel := context.WithCancel(context.Background()) + go func() { + const healthCheckInterval = 5 * time.Second + for { + if !memR.IsRunning() || !peer.IsRunning() { + cancel() + return + } + time.Sleep(healthCheckInterval) + } + }() + var next *clist.CElement for { - if !memR.IsRunning() || !peer.IsRunning() { - return // Quit! - } + // This happens because the CElement we were looking at got + // garbage collected (removed). That is, .NextWait() returned nil. + // Go ahead and start from the beginning. if next == nil { - // This happens because the CElement we were looking at got - // garbage collected (removed). That is, .NextWait() returned nil. - // Go ahead and start from the beginning. - next = memR.Mempool.TxsFrontWait() // Wait until a tx is available + // Wait until a tx is available + next = waitWithCancel(memR.Mempool.TxsFrontWait, ctx) + if ctx.Err() != nil { + return + } } + memTx := next.Value.(*mempoolTx) // make sure the peer is up to date height := memTx.Height() @@ -136,9 +152,20 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } + next = waitWithCancel(next.NextWait, ctx) + if ctx.Err() != nil { + return + } + } +} - next = next.NextWait() - continue +func waitWithCancel(f func() *clist.CElement, ctx context.Context) *clist.CElement { + el := make(chan *clist.CElement, 1) + select { + case el <- f(): + return <-el + case <-ctx.Done(): + return nil } } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 45458a983..3cbc57481 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/go-kit/kit/log/term" @@ -91,18 +93,65 @@ func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int wg.Done() } -var ( +const ( NUM_TXS = 1000 TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow ) func TestReactorBroadcastTxMessage(t *testing.T) { config := cfg.TestConfig() - N := 4 + const N = 4 reactors := makeAndConnectMempoolReactors(config, N) + defer func() { + for _, r := range reactors { + r.Stop() + } + }() // send a bunch of txs to the first reactor's mempool // and wait for them all to be received in the others txs := checkTxs(t, reactors[0].Mempool, NUM_TXS) waitForTxs(t, txs, reactors) } + +func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + config := cfg.TestConfig() + const N = 2 + reactors := makeAndConnectMempoolReactors(config, N) + defer func() { + for _, r := range reactors { + r.Stop() + } + }() + + // stop peer + sw := reactors[1].Switch + sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason")) + + // check that we are not leaking any go-routines + // i.e. broadcastTxRoutine finishes when peer is stopped + leaktest.CheckTimeout(t, 10*time.Second)() +} + +func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + config := cfg.TestConfig() + const N = 2 + reactors := makeAndConnectMempoolReactors(config, N) + + // stop reactors + for _, r := range reactors { + r.Stop() + } + + // check that we are not leaking any go-routines + // i.e. broadcastTxRoutine finishes when reactor is stopped + leaktest.CheckTimeout(t, 10*time.Second)() +}