Browse Source

p2p/conn: don't hold stopMtx while waiting (#3254)

* p2p/conn: fix deadlock in FlushStop/OnStop

* makefile: set_with_deadlock

* close doneSendRoutine at end of sendRoutine

* conn: initialize channs in OnStart
pull/3268/head
Ethan Buchman 5 years ago
committed by GitHub
parent
commit
9e9026452c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 39 deletions
  1. +5
    -2
      Makefile
  2. +40
    -37
      p2p/conn/connection.go

+ 5
- 2
Makefile View File

@ -228,11 +228,14 @@ test_race:
# uses https://github.com/sasha-s/go-deadlock/ to detect potential deadlocks # uses https://github.com/sasha-s/go-deadlock/ to detect potential deadlocks
test_with_deadlock: test_with_deadlock:
make set_with_deadlock
make test
make cleanup_after_test_with_deadlock
set_with_deadlock:
find . -name "*.go" | grep -v "vendor/" | xargs -n 1 sed -i.bak 's/sync.RWMutex/deadlock.RWMutex/' find . -name "*.go" | grep -v "vendor/" | xargs -n 1 sed -i.bak 's/sync.RWMutex/deadlock.RWMutex/'
find . -name "*.go" | grep -v "vendor/" | xargs -n 1 sed -i.bak 's/sync.Mutex/deadlock.Mutex/' find . -name "*.go" | grep -v "vendor/" | xargs -n 1 sed -i.bak 's/sync.Mutex/deadlock.Mutex/'
find . -name "*.go" | grep -v "vendor/" | xargs -n 1 goimports -w find . -name "*.go" | grep -v "vendor/" | xargs -n 1 goimports -w
make test
make cleanup_after_test_with_deadlock
# cleanes up after you ran test_with_deadlock # cleanes up after you ran test_with_deadlock
cleanup_after_test_with_deadlock: cleanup_after_test_with_deadlock:


+ 40
- 37
p2p/conn/connection.go View File

@ -85,8 +85,8 @@ type MConnection struct {
errored uint32 errored uint32
config MConnConfig config MConnConfig
// Closing quitSendRoutine will cause
// doneSendRoutine to close.
// Closing quitSendRoutine will cause the sendRoutine to eventually quit.
// doneSendRoutine is closed when the sendRoutine actually quits.
quitSendRoutine chan struct{} quitSendRoutine chan struct{}
doneSendRoutine chan struct{} doneSendRoutine chan struct{}
@ -200,29 +200,28 @@ func (c *MConnection) OnStart() error {
if err := c.BaseService.OnStart(); err != nil { if err := c.BaseService.OnStart(); err != nil {
return err return err
} }
c.quitSendRoutine = make(chan struct{})
c.doneSendRoutine = make(chan struct{})
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval)
c.pongTimeoutCh = make(chan bool, 1) c.pongTimeoutCh = make(chan bool, 1)
c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
c.quitSendRoutine = make(chan struct{})
c.doneSendRoutine = make(chan struct{})
go c.sendRoutine() go c.sendRoutine()
go c.recvRoutine() go c.recvRoutine()
return nil return nil
} }
// FlushStop replicates the logic of OnStop.
// It additionally ensures that all successful
// .Send() calls will get flushed before closing
// the connection.
func (c *MConnection) FlushStop() {
// stopServices stops the BaseService and timers and closes the quitSendRoutine.
// if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
// It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
func (c *MConnection) stopServices() (alreadyStopped bool) {
c.stopMtx.Lock() c.stopMtx.Lock()
defer c.stopMtx.Unlock() defer c.stopMtx.Unlock()
select { select {
case <-c.quitSendRoutine: case <-c.quitSendRoutine:
// already quit via OnStop
return
// already quit via FlushStop or OnStop
return true
default: default:
} }
@ -230,25 +229,40 @@ func (c *MConnection) FlushStop() {
c.flushTimer.Stop() c.flushTimer.Stop()
c.pingTimer.Stop() c.pingTimer.Stop()
c.chStatsTimer.Stop() c.chStatsTimer.Stop()
if c.quitSendRoutine != nil {
close(c.quitSendRoutine)
close(c.quitSendRoutine)
return false
}
// FlushStop replicates the logic of OnStop.
// It additionally ensures that all successful
// .Send() calls will get flushed before closing
// the connection.
func (c *MConnection) FlushStop() {
if c.stopServices() {
return
}
// this block is unique to FlushStop
{
// wait until the sendRoutine exits // wait until the sendRoutine exits
// so we dont race on calling sendSomePacketMsgs // so we dont race on calling sendSomePacketMsgs
<-c.doneSendRoutine <-c.doneSendRoutine
}
// Send and flush all pending msgs.
// By now, IsRunning == false,
// so any concurrent attempts to send will fail.
// Since sendRoutine has exited, we can call this
// safely
eof := c.sendSomePacketMsgs()
for !eof {
eof = c.sendSomePacketMsgs()
// Send and flush all pending msgs.
// By now, IsRunning == false,
// so any concurrent attempts to send will fail.
// Since sendRoutine has exited, we can call this
// safely
eof := c.sendSomePacketMsgs()
for !eof {
eof = c.sendSomePacketMsgs()
}
c.flush()
// Now we can close the connection
} }
c.flush()
// Now we can close the connection
c.conn.Close() // nolint: errcheck c.conn.Close() // nolint: errcheck
// We can't close pong safely here because // We can't close pong safely here because
@ -261,21 +275,10 @@ func (c *MConnection) FlushStop() {
// OnStop implements BaseService // OnStop implements BaseService
func (c *MConnection) OnStop() { func (c *MConnection) OnStop() {
c.stopMtx.Lock()
defer c.stopMtx.Unlock()
select {
case <-c.quitSendRoutine:
// already quit via FlushStop
if c.stopServices() {
return return
default:
} }
c.BaseService.OnStop()
c.flushTimer.Stop()
c.pingTimer.Stop()
c.chStatsTimer.Stop()
close(c.quitSendRoutine)
c.conn.Close() // nolint: errcheck c.conn.Close() // nolint: errcheck
// We can't close pong safely here because // We can't close pong safely here because
@ -433,7 +436,6 @@ FOR_LOOP:
c.sendMonitor.Update(int(_n)) c.sendMonitor.Update(int(_n))
c.flush() c.flush()
case <-c.quitSendRoutine: case <-c.quitSendRoutine:
close(c.doneSendRoutine)
break FOR_LOOP break FOR_LOOP
case <-c.send: case <-c.send:
// Send some PacketMsgs // Send some PacketMsgs
@ -459,6 +461,7 @@ FOR_LOOP:
// Cleanup // Cleanup
c.stopPongTimer() c.stopPongTimer()
close(c.doneSendRoutine)
} }
// Returns true if messages from channels were exhausted. // Returns true if messages from channels were exhausted.


Loading…
Cancel
Save