From 9e9026452cb337fa7110ccc62fd3f6707894e37f Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 6 Feb 2019 10:29:51 -0500 Subject: [PATCH] p2p/conn: don't hold stopMtx while waiting (#3254) * p2p/conn: fix deadlock in FlushStop/OnStop * makefile: set_with_deadlock * close doneSendRoutine at end of sendRoutine * conn: initialize channs in OnStart --- Makefile | 7 ++-- p2p/conn/connection.go | 77 ++++++++++++++++++++++-------------------- 2 files changed, 45 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index 8c0928d04..57e465439 100644 --- a/Makefile +++ b/Makefile @@ -228,11 +228,14 @@ test_race: # uses https://github.com/sasha-s/go-deadlock/ to detect potential deadlocks test_with_deadlock: + make set_with_deadlock + make test + make cleanup_after_test_with_deadlock + +set_with_deadlock: find . -name "*.go" | grep -v "vendor/" | xargs -n 1 sed -i.bak 's/sync.RWMutex/deadlock.RWMutex/' find . -name "*.go" | grep -v "vendor/" | xargs -n 1 sed -i.bak 's/sync.Mutex/deadlock.Mutex/' find . -name "*.go" | grep -v "vendor/" | xargs -n 1 goimports -w - make test - make cleanup_after_test_with_deadlock # cleanes up after you ran test_with_deadlock cleanup_after_test_with_deadlock: diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 920734914..c1e90ab76 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -85,8 +85,8 @@ type MConnection struct { errored uint32 config MConnConfig - // Closing quitSendRoutine will cause - // doneSendRoutine to close. + // Closing quitSendRoutine will cause the sendRoutine to eventually quit. + // doneSendRoutine is closed when the sendRoutine actually quits. quitSendRoutine chan struct{} doneSendRoutine chan struct{} @@ -200,29 +200,28 @@ func (c *MConnection) OnStart() error { if err := c.BaseService.OnStart(); err != nil { return err } - c.quitSendRoutine = make(chan struct{}) - c.doneSendRoutine = make(chan struct{}) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) c.pongTimeoutCh = make(chan bool, 1) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats) + c.quitSendRoutine = make(chan struct{}) + c.doneSendRoutine = make(chan struct{}) go c.sendRoutine() go c.recvRoutine() return nil } -// FlushStop replicates the logic of OnStop. -// It additionally ensures that all successful -// .Send() calls will get flushed before closing -// the connection. -func (c *MConnection) FlushStop() { +// stopServices stops the BaseService and timers and closes the quitSendRoutine. +// if the quitSendRoutine was already closed, it returns true, otherwise it returns false. +// It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time. +func (c *MConnection) stopServices() (alreadyStopped bool) { c.stopMtx.Lock() defer c.stopMtx.Unlock() select { case <-c.quitSendRoutine: - // already quit via OnStop - return + // already quit via FlushStop or OnStop + return true default: } @@ -230,25 +229,40 @@ func (c *MConnection) FlushStop() { c.flushTimer.Stop() c.pingTimer.Stop() c.chStatsTimer.Stop() - if c.quitSendRoutine != nil { - close(c.quitSendRoutine) + + close(c.quitSendRoutine) + return false +} + +// FlushStop replicates the logic of OnStop. +// It additionally ensures that all successful +// .Send() calls will get flushed before closing +// the connection. +func (c *MConnection) FlushStop() { + if c.stopServices() { + return + } + + // this block is unique to FlushStop + { // wait until the sendRoutine exits // so we dont race on calling sendSomePacketMsgs <-c.doneSendRoutine - } - // Send and flush all pending msgs. - // By now, IsRunning == false, - // so any concurrent attempts to send will fail. - // Since sendRoutine has exited, we can call this - // safely - eof := c.sendSomePacketMsgs() - for !eof { - eof = c.sendSomePacketMsgs() + // Send and flush all pending msgs. + // By now, IsRunning == false, + // so any concurrent attempts to send will fail. + // Since sendRoutine has exited, we can call this + // safely + eof := c.sendSomePacketMsgs() + for !eof { + eof = c.sendSomePacketMsgs() + } + c.flush() + + // Now we can close the connection } - c.flush() - // Now we can close the connection c.conn.Close() // nolint: errcheck // We can't close pong safely here because @@ -261,21 +275,10 @@ func (c *MConnection) FlushStop() { // OnStop implements BaseService func (c *MConnection) OnStop() { - c.stopMtx.Lock() - defer c.stopMtx.Unlock() - - select { - case <-c.quitSendRoutine: - // already quit via FlushStop + if c.stopServices() { return - default: } - c.BaseService.OnStop() - c.flushTimer.Stop() - c.pingTimer.Stop() - c.chStatsTimer.Stop() - close(c.quitSendRoutine) c.conn.Close() // nolint: errcheck // We can't close pong safely here because @@ -433,7 +436,6 @@ FOR_LOOP: c.sendMonitor.Update(int(_n)) c.flush() case <-c.quitSendRoutine: - close(c.doneSendRoutine) break FOR_LOOP case <-c.send: // Send some PacketMsgs @@ -459,6 +461,7 @@ FOR_LOOP: // Cleanup c.stopPongTimer() + close(c.doneSendRoutine) } // Returns true if messages from channels were exhausted.