Browse Source

Merge pull request #1209 from tendermint/1205-fixes-for-p2p-memory-leak-and-pong

Fixes for p2p memory leak and pong
pull/1218/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
d3e276bf80
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 45 additions and 61 deletions
  1. +4
    -4
      blockchain/pool.go
  2. +1
    -1
      blockchain/reactor.go
  3. +2
    -3
      blockchain/reactor_test.go
  4. +1
    -1
      consensus/reactor.go
  5. +1
    -1
      consensus/state.go
  6. +1
    -1
      consensus/ticker.go
  7. +1
    -1
      evidence/reactor.go
  8. +4
    -4
      glide.lock
  9. +2
    -2
      glide.yaml
  10. +4
    -4
      mempool/reactor.go
  11. +1
    -1
      node/node_test.go
  12. +7
    -15
      p2p/conn/connection.go
  13. +0
    -6
      p2p/peer.go
  14. +1
    -1
      p2p/pex/addrbook.go
  15. +2
    -2
      p2p/pex/pex_reactor.go
  16. +0
    -1
      p2p/pex/pex_reactor_test.go
  17. +2
    -2
      p2p/trust/metric.go
  18. +1
    -1
      p2p/trust/store.go
  19. +1
    -1
      rpc/client/httpclient.go
  20. +3
    -3
      rpc/lib/client/ws_client.go
  21. +2
    -2
      rpc/lib/client/ws_client_test.go
  22. +4
    -4
      rpc/lib/server/handlers.go

+ 4
- 4
blockchain/pool.go View File

@ -534,10 +534,10 @@ OUTER_LOOP:
// Send request and wait. // Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id) bpr.pool.sendRequest(bpr.height, peer.id)
select { select {
case <-bpr.pool.Quit:
case <-bpr.pool.Quit():
bpr.Stop() bpr.Stop()
return return
case <-bpr.Quit:
case <-bpr.Quit():
return return
case <-bpr.redoCh: case <-bpr.redoCh:
bpr.reset() bpr.reset()
@ -545,10 +545,10 @@ OUTER_LOOP:
case <-bpr.gotBlockCh: case <-bpr.gotBlockCh:
// We got the block, now see if it's good. // We got the block, now see if it's good.
select { select {
case <-bpr.pool.Quit:
case <-bpr.pool.Quit():
bpr.Stop() bpr.Stop()
return return
case <-bpr.Quit:
case <-bpr.Quit():
return return
case <-bpr.redoCh: case <-bpr.redoCh:
bpr.reset() bpr.reset()


+ 1
- 1
blockchain/reactor.go View File

@ -322,7 +322,7 @@ FOR_LOOP:
} }
} }
continue FOR_LOOP continue FOR_LOOP
case <-bcR.Quit:
case <-bcR.Quit():
break FOR_LOOP break FOR_LOOP
} }
} }


+ 2
- 3
blockchain/reactor_test.go View File

@ -157,7 +157,7 @@ func makeBlock(height int64, state sm.State) *types.Block {
// The Test peer // The Test peer
type bcrTestPeer struct { type bcrTestPeer struct {
*cmn.BaseService
cmn.BaseService
id p2p.ID id p2p.ID
ch chan interface{} ch chan interface{}
} }
@ -169,7 +169,7 @@ func newbcrTestPeer(id p2p.ID) *bcrTestPeer {
id: id, id: id,
ch: make(chan interface{}, 2), ch: make(chan interface{}, 2),
} }
bcr.BaseService = cmn.NewBaseService(nil, "bcrTestPeer", bcr)
bcr.BaseService = *cmn.NewBaseService(nil, "bcrTestPeer", bcr)
return bcr return bcr
} }
@ -196,4 +196,3 @@ func (tp *bcrTestPeer) IsOutbound() bool { return false }
func (tp *bcrTestPeer) IsPersistent() bool { return true } func (tp *bcrTestPeer) IsPersistent() bool { return true }
func (tp *bcrTestPeer) Get(s string) interface{} { return s } func (tp *bcrTestPeer) Get(s string) interface{} { return s }
func (tp *bcrTestPeer) Set(string, interface{}) {} func (tp *bcrTestPeer) Set(string, interface{}) {}
func (tp *bcrTestPeer) QuitChan() <-chan struct{} { return tp.Quit }

+ 1
- 1
consensus/reactor.go View File

@ -380,7 +380,7 @@ func (conR *ConsensusReactor) startBroadcastRoutine() error {
edph := data.(types.TMEventData).Unwrap().(types.EventDataProposalHeartbeat) edph := data.(types.TMEventData).Unwrap().(types.EventDataProposalHeartbeat)
conR.broadcastProposalHeartbeatMessage(edph) conR.broadcastProposalHeartbeatMessage(edph)
} }
case <-conR.Quit:
case <-conR.Quit():
conR.eventBus.UnsubscribeAll(ctx, subscriber) conR.eventBus.UnsubscribeAll(ctx, subscriber)
return return
} }


+ 1
- 1
consensus/state.go View File

@ -541,7 +541,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
// if the timeout is relevant to the rs // if the timeout is relevant to the rs
// go to the next step // go to the next step
cs.handleTimeout(ti, rs) cs.handleTimeout(ti, rs)
case <-cs.Quit:
case <-cs.Quit():
// NOTE: the internalMsgQueue may have signed messages from our // NOTE: the internalMsgQueue may have signed messages from our
// priv_val that haven't hit the WAL, but its ok because // priv_val that haven't hit the WAL, but its ok because


+ 1
- 1
consensus/ticker.go View File

@ -127,7 +127,7 @@ func (t *timeoutTicker) timeoutRoutine() {
// We can eliminate it by merging the timeoutRoutine into receiveRoutine // We can eliminate it by merging the timeoutRoutine into receiveRoutine
// and managing the timeouts ourselves with a millisecond ticker // and managing the timeouts ourselves with a millisecond ticker
go func(toi timeoutInfo) { t.tockChan <- toi }(ti) go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
case <-t.Quit:
case <-t.Quit():
return return
} }
} }


+ 1
- 1
evidence/reactor.go View File

@ -126,7 +126,7 @@ func (evR *EvidenceReactor) broadcastRoutine() {
// broadcast all pending evidence // broadcast all pending evidence
msg := &EvidenceListMessage{evR.evpool.PendingEvidence()} msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg}) evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
case <-evR.Quit:
case <-evR.Quit():
return return
} }
} }


+ 4
- 4
glide.lock View File

@ -1,5 +1,5 @@
hash: 41f411204b59e893053e59cda43466b3a6634c5fc88698d1f3131ecd5f239de7
updated: 2018-02-09T09:56:16.586709479Z
hash: 0a994be202cfc9c8a820c5a68321bbbf5592f48790b9bd408b5f95cd344c3be5
updated: 2018-02-12T08:29:16.126185849Z
imports: imports:
- name: github.com/btcsuite/btcd - name: github.com/btcsuite/btcd
version: 50de9da05b50eb15658bb350f6ea24368a111ab7 version: 50de9da05b50eb15658bb350f6ea24368a111ab7
@ -97,7 +97,7 @@ imports:
- leveldb/table - leveldb/table
- leveldb/util - leveldb/util
- name: github.com/tendermint/abci - name: github.com/tendermint/abci
version: 5a4f56056e23cdfd5f3733db056968e016468508
version: 5913ae8960c7ae5d748c37aa060bd35c99ff8a05
subpackages: subpackages:
- client - client
- example/code - example/code
@ -117,7 +117,7 @@ imports:
subpackages: subpackages:
- data - data
- name: github.com/tendermint/tmlibs - name: github.com/tendermint/tmlibs
version: 52ce4c20f8bc9b6da5fc1274bcce27c0b9dd738a
version: a57340ffb53aefb0fca1fc610d18fcbcc61b126f
subpackages: subpackages:
- autofile - autofile
- cli - cli


+ 2
- 2
glide.yaml View File

@ -19,7 +19,7 @@ import:
- package: github.com/spf13/viper - package: github.com/spf13/viper
version: v1.0.0 version: v1.0.0
- package: github.com/tendermint/abci - package: github.com/tendermint/abci
version: develop
version: 5913ae8960c7ae5d748c37aa060bd35c99ff8a05
subpackages: subpackages:
- client - client
- example/dummy - example/dummy
@ -31,7 +31,7 @@ import:
subpackages: subpackages:
- data - data
- package: github.com/tendermint/tmlibs - package: github.com/tendermint/tmlibs
version: develop
version: a57340ffb53aefb0fca1fc610d18fcbcc61b126f
subpackages: subpackages:
- autofile - autofile
- cli - cli


+ 4
- 4
mempool/reactor.go View File

@ -117,9 +117,9 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
if next = memR.Mempool.TxsFront(); next == nil { if next = memR.Mempool.TxsFront(); next == nil {
continue continue
} }
case <-peer.QuitChan():
case <-peer.Quit():
return return
case <-memR.Quit:
case <-memR.Quit():
return return
} }
} }
@ -146,9 +146,9 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
case <-next.NextWaitChan(): case <-next.NextWaitChan():
// see the start of the for loop for nil check // see the start of the for loop for nil check
next = next.Next() next = next.Next()
case <-peer.QuitChan():
case <-peer.Quit():
return return
case <-memR.Quit:
case <-memR.Quit():
return return
} }
} }


+ 1
- 1
node/node_test.go View File

@ -41,7 +41,7 @@ func TestNodeStartStop(t *testing.T) {
}() }()
select { select {
case <-n.Quit:
case <-n.Quit():
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for shutdown") t.Fatal("timed out waiting for shutdown")
} }


+ 7
- 15
p2p/conn/connection.go View File

@ -201,12 +201,12 @@ func (c *MConnection) OnStart() error {
// OnStop implements BaseService // OnStop implements BaseService
func (c *MConnection) OnStop() { func (c *MConnection) OnStop() {
c.BaseService.OnStop() c.BaseService.OnStop()
if c.quit != nil {
close(c.quit)
}
c.flushTimer.Stop() c.flushTimer.Stop()
c.pingTimer.Stop() c.pingTimer.Stop()
c.chStatsTimer.Stop() c.chStatsTimer.Stop()
if c.quit != nil {
close(c.quit)
}
c.conn.Close() // nolint: errcheck c.conn.Close() // nolint: errcheck
// We can't close pong safely here because // We can't close pong safely here because
@ -339,7 +339,10 @@ FOR_LOOP:
c.sendMonitor.Update(int(n)) c.sendMonitor.Update(int(n))
c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout) c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() { c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
c.pongTimeoutCh <- true
select {
case c.pongTimeoutCh <- true:
default:
}
}) })
c.flush() c.flush()
case timeout := <-c.pongTimeoutCh: case timeout := <-c.pongTimeoutCh:
@ -548,7 +551,6 @@ func (c *MConnection) stopPongTimer() {
if !c.pongTimer.Stop() { if !c.pongTimer.Stop() {
<-c.pongTimer.C <-c.pongTimer.C
} }
drain(c.pongTimeoutCh)
c.pongTimer = nil c.pongTimer = nil
} }
} }
@ -780,13 +782,3 @@ type msgPacket struct {
func (p msgPacket) String() string { func (p msgPacket) String() string {
return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF) return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
} }
func drain(ch <-chan bool) {
for {
select {
case <-ch:
default:
return
}
}
}

+ 0
- 6
p2p/peer.go View File

@ -18,7 +18,6 @@ import (
// Peer is an interface representing a peer connected on a reactor. // Peer is an interface representing a peer connected on a reactor.
type Peer interface { type Peer interface {
cmn.Service cmn.Service
QuitChan() <-chan struct{}
ID() ID // peer's cryptographic ID ID() ID // peer's cryptographic ID
IsOutbound() bool // did we dial the peer IsOutbound() bool // did we dial the peer
@ -332,11 +331,6 @@ func (p *peer) String() string {
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
} }
// QuitChan returns a channel, which will be closed once peer is stopped.
func (p *peer) QuitChan() <-chan struct{} {
return p.Quit
}
//------------------------------------------------------------------ //------------------------------------------------------------------
// helper funcs // helper funcs


+ 1
- 1
p2p/pex/addrbook.go View File

@ -332,7 +332,7 @@ out:
select { select {
case <-saveFileTicker.C: case <-saveFileTicker.C:
a.saveToFile(a.filePath) a.saveToFile(a.filePath)
case <-a.Quit:
case <-a.Quit():
break out break out
} }
} }


+ 2
- 2
p2p/pex/pex_reactor.go View File

@ -274,7 +274,7 @@ func (r *PEXReactor) ensurePeersRoutine() {
select { select {
case <-ticker.C: case <-ticker.C:
r.ensurePeers() r.ensurePeers()
case <-r.Quit:
case <-r.Quit():
ticker.Stop() ticker.Stop()
return return
} }
@ -409,7 +409,7 @@ func (r *PEXReactor) crawlPeersRoutine() {
case <-ticker.C: case <-ticker.C:
r.attemptDisconnects() r.attemptDisconnects()
r.crawlPeers() r.crawlPeers()
case <-r.Quit:
case <-r.Quit():
return return
} }
} }


+ 0
- 1
p2p/pex/pex_reactor_test.go View File

@ -368,4 +368,3 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false }
func (mp mockPeer) TrySend(byte, interface{}) bool { return false } func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
func (mp mockPeer) Set(string, interface{}) {} func (mp mockPeer) Set(string, interface{}) {}
func (mp mockPeer) Get(string) interface{} { return nil } func (mp mockPeer) Get(string) interface{} { return nil }
func (mp mockPeer) QuitChan() <-chan struct{} { return mp.Quit }

+ 2
- 2
p2p/trust/metric.go View File

@ -118,7 +118,7 @@ func (tm *TrustMetric) OnStart() error {
} }
// OnStop implements Service // OnStop implements Service
// Nothing to do since the goroutine shuts down by itself via BaseService.Quit
// Nothing to do since the goroutine shuts down by itself via BaseService.Quit()
func (tm *TrustMetric) OnStop() {} func (tm *TrustMetric) OnStop() {}
// Returns a snapshot of the trust metric history data // Returns a snapshot of the trust metric history data
@ -298,7 +298,7 @@ loop:
select { select {
case <-tick: case <-tick:
tm.NextTimeInterval() tm.NextTimeInterval()
case <-tm.Quit:
case <-tm.Quit():
// Stop all further tracking for this metric // Stop all further tracking for this metric
break loop break loop
} }


+ 1
- 1
p2p/trust/store.go View File

@ -200,7 +200,7 @@ loop:
select { select {
case <-t.C: case <-t.C:
tms.SaveToDB() tms.SaveToDB()
case <-tms.Quit:
case <-tms.Quit():
break loop break loop
} }
} }


+ 1
- 1
rpc/client/httpclient.go View File

@ -338,7 +338,7 @@ func (w *WSEvents) eventListener() {
ch <- result.Data ch <- result.Data
} }
w.mtx.RUnlock() w.mtx.RUnlock()
case <-w.Quit:
case <-w.Quit():
return return
} }
} }


+ 3
- 3
rpc/lib/client/ws_client.go View File

@ -335,7 +335,7 @@ func (c *WSClient) reconnectRoutine() {
c.startReadWriteRoutines() c.startReadWriteRoutines()
} }
} }
case <-c.Quit:
case <-c.Quit():
return return
} }
} }
@ -394,7 +394,7 @@ func (c *WSClient) writeRoutine() {
c.Logger.Debug("sent ping") c.Logger.Debug("sent ping")
case <-c.readRoutineQuit: case <-c.readRoutineQuit:
return return
case <-c.Quit:
case <-c.Quit():
if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil { if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
c.Logger.Error("failed to write message", "err", err) c.Logger.Error("failed to write message", "err", err)
} }
@ -455,7 +455,7 @@ func (c *WSClient) readRoutine() {
// c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop // c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
// both readRoutine and writeRoutine // both readRoutine and writeRoutine
select { select {
case <-c.Quit:
case <-c.Quit():
case c.ResponsesCh <- response: case c.ResponsesCh <- response:
} }
} }


+ 2
- 2
rpc/lib/client/ws_client_test.go View File

@ -132,7 +132,7 @@ func TestWSClientReconnectFailure(t *testing.T) {
for { for {
select { select {
case <-c.ResponsesCh: case <-c.ResponsesCh:
case <-c.Quit:
case <-c.Quit():
return return
} }
} }
@ -217,7 +217,7 @@ func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) {
if resp.Result != nil { if resp.Result != nil {
wg.Done() wg.Done()
} }
case <-c.Quit:
case <-c.Quit():
return return
} }
} }


+ 4
- 4
rpc/lib/server/handlers.go View File

@ -484,7 +484,7 @@ func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber {
// It implements WSRPCConnection. It is Goroutine-safe. // It implements WSRPCConnection. It is Goroutine-safe.
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
select { select {
case <-wsc.Quit:
case <-wsc.Quit():
return return
case wsc.writeChan <- resp: case wsc.writeChan <- resp:
} }
@ -494,7 +494,7 @@ func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
// It implements WSRPCConnection. It is Goroutine-safe // It implements WSRPCConnection. It is Goroutine-safe
func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
select { select {
case <-wsc.Quit:
case <-wsc.Quit():
return false return false
case wsc.writeChan <- resp: case wsc.writeChan <- resp:
return true return true
@ -525,7 +525,7 @@ func (wsc *wsConnection) readRoutine() {
for { for {
select { select {
case <-wsc.Quit:
case <-wsc.Quit():
return return
default: default:
// reset deadline for every type of message (control or data) // reset deadline for every type of message (control or data)
@ -643,7 +643,7 @@ func (wsc *wsConnection) writeRoutine() {
return return
} }
} }
case <-wsc.Quit:
case <-wsc.Quit():
return return
} }
} }


Loading…
Cancel
Save