Browse Source

p2p: file descriptor leaks (#3150)

* close peer's connection to avoid fd leak

Fixes #2967

* rename peer#Addr to RemoteAddr

* fix test

* fixes after Ethan's review

* bring back the check

* changelog entry

* write a test for switch#acceptRoutine

* increase timeouts? :(

* remove extra assertNPeersWithTimeout

* simplify test

* assert number of peers (just to be safe)

* Cleanup in OnStop

* run tests with verbose flag on CircleCI

* spawn a reading routine to prevent connection from closing

* get port from the listener

random port is faster, but often results in

```
panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered]
        panic: listen tcp 127.0.0.1:44068: bind: address already in use

goroutine 79 [running]:
testing.tRunner.func1(0xc0001bd600)
        /usr/local/go/src/testing/testing.go:792 +0x387
panic(0x974d20, 0xc0001b0500)
        /usr/local/go/src/runtime/panic.go:513 +0x1b9
github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...)
        /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28
github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed)
        /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9
github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0)
        /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c
github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600)
        /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58
testing.tRunner(0xc0001bd600, 0xb42038)
        /usr/local/go/src/testing/testing.go:827 +0xbf
created by testing.(*T).Run
        /usr/local/go/src/testing/testing.go:878 +0x353
exit status 2
FAIL    github.com/tendermint/tendermint/p2p    0.350s
```
pull/3195/head
Anton Kaliaev 6 years ago
committed by Ethan Buchman
parent
commit
2449bf7300
12 changed files with 170 additions and 42 deletions
  1. +1
    -1
      .circleci/config.yml
  2. +1
    -0
      CHANGELOG_PENDING.md
  3. +8
    -0
      p2p/conn_set.go
  4. +10
    -0
      p2p/dummy/peer.go
  5. +13
    -5
      p2p/peer.go
  6. +2
    -0
      p2p/peer_set_test.go
  7. +31
    -22
      p2p/peer_test.go
  8. +2
    -0
      p2p/pex/pex_reactor_test.go
  9. +18
    -5
      p2p/switch.go
  10. +54
    -1
      p2p/switch_test.go
  11. +20
    -2
      p2p/test_util.go
  12. +10
    -6
      p2p/transport.go

+ 1
- 1
.circleci/config.yml View File

@ -240,7 +240,7 @@ jobs:
for pkg in $(go list github.com/tendermint/tendermint/... | circleci tests split --split-by=timings); do for pkg in $(go list github.com/tendermint/tendermint/... | circleci tests split --split-by=timings); do
id=$(basename "$pkg") id=$(basename "$pkg")
GOCACHE=off go test -timeout 5m -race -coverprofile=/tmp/workspace/profiles/$id.out -covermode=atomic "$pkg" | tee "/tmp/logs/$id-$RANDOM.log"
GOCACHE=off go test -v -timeout 5m -race -coverprofile=/tmp/workspace/profiles/$id.out -covermode=atomic "$pkg" | tee "/tmp/logs/$id-$RANDOM.log"
done done
- persist_to_workspace: - persist_to_workspace:
root: /tmp/workspace root: /tmp/workspace


+ 1
- 0
CHANGELOG_PENDING.md View File

@ -21,3 +21,4 @@ Special thanks to external contributors on this release:
### IMPROVEMENTS: ### IMPROVEMENTS:
### BUG FIXES: ### BUG FIXES:
- [p2p] \#2967 Fix file descriptor leaks

+ 8
- 0
p2p/conn_set.go View File

@ -11,6 +11,7 @@ type ConnSet interface {
HasIP(net.IP) bool HasIP(net.IP) bool
Set(net.Conn, []net.IP) Set(net.Conn, []net.IP)
Remove(net.Conn) Remove(net.Conn)
RemoveAddr(net.Addr)
} }
type connSetItem struct { type connSetItem struct {
@ -62,6 +63,13 @@ func (cs *connSet) Remove(c net.Conn) {
delete(cs.conns, c.RemoteAddr().String()) delete(cs.conns, c.RemoteAddr().String())
} }
func (cs *connSet) RemoveAddr(addr net.Addr) {
cs.Lock()
defer cs.Unlock()
delete(cs.conns, addr.String())
}
func (cs *connSet) Set(c net.Conn, ips []net.IP) { func (cs *connSet) Set(c net.Conn, ips []net.IP) {
cs.Lock() cs.Lock()
defer cs.Unlock() defer cs.Unlock()


+ 10
- 0
p2p/dummy/peer.go View File

@ -55,6 +55,16 @@ func (p *peer) RemoteIP() net.IP {
return net.ParseIP("127.0.0.1") return net.ParseIP("127.0.0.1")
} }
// Addr always returns tcp://localhost:8800.
func (p *peer) RemoteAddr() net.Addr {
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800}
}
// CloseConn always returns nil.
func (p *peer) CloseConn() error {
return nil
}
// Status always returns empry connection status. // Status always returns empry connection status.
func (p *peer) Status() tmconn.ConnectionStatus { func (p *peer) Status() tmconn.ConnectionStatus {
return tmconn.ConnectionStatus{} return tmconn.ConnectionStatus{}


+ 13
- 5
p2p/peer.go View File

@ -18,15 +18,18 @@ type Peer interface {
cmn.Service cmn.Service
FlushStop() FlushStop()
ID() ID // peer's cryptographic ID
RemoteIP() net.IP // remote IP of the connection
ID() ID // peer's cryptographic ID
RemoteIP() net.IP // remote IP of the connection
RemoteAddr() net.Addr // remote address of the connection
IsOutbound() bool // did we dial the peer IsOutbound() bool // did we dial the peer
IsPersistent() bool // do we redial this peer when we disconnect IsPersistent() bool // do we redial this peer when we disconnect
CloseConn() error // close original connection
NodeInfo() NodeInfo // peer's info NodeInfo() NodeInfo // peer's info
Status() tmconn.ConnectionStatus Status() tmconn.ConnectionStatus
OriginalAddr() *NetAddress
OriginalAddr() *NetAddress // original address for outbound peers
Send(byte, []byte) bool Send(byte, []byte) bool
TrySend(byte, []byte) bool TrySend(byte, []byte) bool
@ -296,6 +299,11 @@ func (p *peer) hasChannel(chID byte) bool {
return false return false
} }
// CloseConn closes original connection. Used for cleaning up in cases where the peer had not been started at all.
func (p *peer) CloseConn() error {
return p.peerConn.conn.Close()
}
//--------------------------------------------------- //---------------------------------------------------
// methods only used for testing // methods only used for testing
// TODO: can we remove these? // TODO: can we remove these?
@ -305,8 +313,8 @@ func (pc *peerConn) CloseConn() {
pc.conn.Close() // nolint: errcheck pc.conn.Close() // nolint: errcheck
} }
// Addr returns peer's remote network address.
func (p *peer) Addr() net.Addr {
// RemoteAddr returns peer's remote network address.
func (p *peer) RemoteAddr() net.Addr {
return p.peerConn.conn.RemoteAddr() return p.peerConn.conn.RemoteAddr()
} }


+ 2
- 0
p2p/peer_set_test.go View File

@ -30,6 +30,8 @@ func (mp *mockPeer) Get(s string) interface{} { return s }
func (mp *mockPeer) Set(string, interface{}) {} func (mp *mockPeer) Set(string, interface{}) {}
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip } func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
func (mp *mockPeer) OriginalAddr() *NetAddress { return nil } func (mp *mockPeer) OriginalAddr() *NetAddress { return nil }
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *mockPeer) CloseConn() error { return nil }
// Returns a mock peer // Returns a mock peer
func newMockPeer(ip net.IP) *mockPeer { func newMockPeer(ip net.IP) *mockPeer {


+ 31
- 22
p2p/peer_test.go View File

@ -39,7 +39,7 @@ func TestPeerBasic(t *testing.T) {
assert.False(p.IsPersistent()) assert.False(p.IsPersistent())
p.persistent = true p.persistent = true
assert.True(p.IsPersistent()) assert.True(p.IsPersistent())
assert.Equal(rp.Addr().DialString(), p.Addr().String())
assert.Equal(rp.Addr().DialString(), p.RemoteAddr().String())
assert.Equal(rp.ID(), p.ID()) assert.Equal(rp.ID(), p.ID())
} }
@ -137,9 +137,9 @@ type remotePeer struct {
PrivKey crypto.PrivKey PrivKey crypto.PrivKey
Config *config.P2PConfig Config *config.P2PConfig
addr *NetAddress addr *NetAddress
quit chan struct{}
channels cmn.HexBytes channels cmn.HexBytes
listenAddr string listenAddr string
listener net.Listener
} }
func (rp *remotePeer) Addr() *NetAddress { func (rp *remotePeer) Addr() *NetAddress {
@ -159,25 +159,45 @@ func (rp *remotePeer) Start() {
if e != nil { if e != nil {
golog.Fatalf("net.Listen tcp :0: %+v", e) golog.Fatalf("net.Listen tcp :0: %+v", e)
} }
rp.listener = l
rp.addr = NewNetAddress(PubKeyToID(rp.PrivKey.PubKey()), l.Addr()) rp.addr = NewNetAddress(PubKeyToID(rp.PrivKey.PubKey()), l.Addr())
rp.quit = make(chan struct{})
if rp.channels == nil { if rp.channels == nil {
rp.channels = []byte{testCh} rp.channels = []byte{testCh}
} }
go rp.accept(l)
go rp.accept()
} }
func (rp *remotePeer) Stop() { func (rp *remotePeer) Stop() {
close(rp.quit)
rp.listener.Close()
} }
func (rp *remotePeer) accept(l net.Listener) {
func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) {
conn, err := addr.DialTimeout(1 * time.Second)
if err != nil {
return nil, err
}
pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey)
if err != nil {
return nil, err
}
_, err = handshake(pc.conn, time.Second, rp.nodeInfo())
if err != nil {
return nil, err
}
return conn, err
}
func (rp *remotePeer) accept() {
conns := []net.Conn{} conns := []net.Conn{}
for { for {
conn, err := l.Accept()
conn, err := rp.listener.Accept()
if err != nil { if err != nil {
golog.Fatalf("Failed to accept conn: %+v", err)
golog.Printf("Failed to accept conn: %+v", err)
for _, conn := range conns {
_ = conn.Close()
}
return
} }
pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey) pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey)
@ -185,31 +205,20 @@ func (rp *remotePeer) accept(l net.Listener) {
golog.Fatalf("Failed to create a peer: %+v", err) golog.Fatalf("Failed to create a peer: %+v", err)
} }
_, err = handshake(pc.conn, time.Second, rp.nodeInfo(l))
_, err = handshake(pc.conn, time.Second, rp.nodeInfo())
if err != nil { if err != nil {
golog.Fatalf("Failed to perform handshake: %+v", err) golog.Fatalf("Failed to perform handshake: %+v", err)
} }
conns = append(conns, conn) conns = append(conns, conn)
select {
case <-rp.quit:
for _, conn := range conns {
if err := conn.Close(); err != nil {
golog.Fatal(err)
}
}
return
default:
}
} }
} }
func (rp *remotePeer) nodeInfo(l net.Listener) NodeInfo {
func (rp *remotePeer) nodeInfo() NodeInfo {
return DefaultNodeInfo{ return DefaultNodeInfo{
ProtocolVersion: defaultProtocolVersion, ProtocolVersion: defaultProtocolVersion,
ID_: rp.Addr().ID, ID_: rp.Addr().ID,
ListenAddr: l.Addr().String(),
ListenAddr: rp.listener.Addr().String(),
Network: "testing", Network: "testing",
Version: "1.2.3-rc0-deadbeef", Version: "1.2.3-rc0-deadbeef",
Channels: rp.channels, Channels: rp.channels,


+ 2
- 0
p2p/pex/pex_reactor_test.go View File

@ -404,6 +404,8 @@ func (mockPeer) TrySend(byte, []byte) bool { return false }
func (mockPeer) Set(string, interface{}) {} func (mockPeer) Set(string, interface{}) {}
func (mockPeer) Get(string) interface{} { return nil } func (mockPeer) Get(string) interface{} { return nil }
func (mockPeer) OriginalAddr() *p2p.NetAddress { return nil } func (mockPeer) OriginalAddr() *p2p.NetAddress { return nil }
func (mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800} }
func (mockPeer) CloseConn() error { return nil }
func assertPeersWithTimeout( func assertPeersWithTimeout(
t *testing.T, t *testing.T,


+ 18
- 5
p2p/switch.go View File

@ -210,6 +210,7 @@ func (sw *Switch) OnStart() error {
func (sw *Switch) OnStop() { func (sw *Switch) OnStop() {
// Stop peers // Stop peers
for _, p := range sw.peers.List() { for _, p := range sw.peers.List() {
sw.transport.Cleanup(p)
p.Stop() p.Stop()
if sw.peers.Remove(p) { if sw.peers.Remove(p) {
sw.metrics.Peers.Add(float64(-1)) sw.metrics.Peers.Add(float64(-1))
@ -304,6 +305,7 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
if sw.peers.Remove(peer) { if sw.peers.Remove(peer) {
sw.metrics.Peers.Add(float64(-1)) sw.metrics.Peers.Add(float64(-1))
} }
sw.transport.Cleanup(peer)
peer.Stop() peer.Stop()
for _, reactor := range sw.reactors { for _, reactor := range sw.reactors {
reactor.RemovePeer(peer, reason) reactor.RemovePeer(peer, reason)
@ -529,13 +531,16 @@ func (sw *Switch) acceptRoutine() {
"max", sw.config.MaxNumInboundPeers, "max", sw.config.MaxNumInboundPeers,
) )
_ = p.Stop()
sw.transport.Cleanup(p)
continue continue
} }
if err := sw.addPeer(p); err != nil { if err := sw.addPeer(p); err != nil {
_ = p.Stop()
sw.transport.Cleanup(p)
if p.IsRunning() {
_ = p.Stop()
}
sw.Logger.Info( sw.Logger.Info(
"Ignoring inbound connection: error while adding peer", "Ignoring inbound connection: error while adding peer",
"err", err, "err", err,
@ -593,7 +598,10 @@ func (sw *Switch) addOutboundPeerWithConfig(
} }
if err := sw.addPeer(p); err != nil { if err := sw.addPeer(p); err != nil {
_ = p.Stop()
sw.transport.Cleanup(p)
if p.IsRunning() {
_ = p.Stop()
}
return err return err
} }
@ -628,7 +636,8 @@ func (sw *Switch) filterPeer(p Peer) error {
return nil return nil
} }
// addPeer starts up the Peer and adds it to the Switch.
// addPeer starts up the Peer and adds it to the Switch. Error is returned if
// the peer is filtered out or failed to start or can't be added.
func (sw *Switch) addPeer(p Peer) error { func (sw *Switch) addPeer(p Peer) error {
if err := sw.filterPeer(p); err != nil { if err := sw.filterPeer(p); err != nil {
return err return err
@ -636,11 +645,15 @@ func (sw *Switch) addPeer(p Peer) error {
p.SetLogger(sw.Logger.With("peer", p.NodeInfo().NetAddress())) p.SetLogger(sw.Logger.With("peer", p.NodeInfo().NetAddress()))
// All good. Start peer
// Handle the shut down case where the switch has stopped but we're
// concurrently trying to add a peer.
if sw.IsRunning() { if sw.IsRunning() {
// All good. Start peer
if err := sw.startInitPeer(p); err != nil { if err := sw.startInitPeer(p); err != nil {
return err return err
} }
} else {
sw.Logger.Error("Won't start a peer - switch is not running", "peer", p)
} }
// Add the peer to .peers. // Add the peer to .peers.


+ 54
- 1
p2p/switch_test.go View File

@ -3,7 +3,9 @@ package p2p
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"regexp" "regexp"
@ -13,7 +15,6 @@ import (
"time" "time"
stdprometheus "github.com/prometheus/client_golang/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -477,6 +478,58 @@ func TestSwitchFullConnectivity(t *testing.T) {
} }
} }
func TestSwitchAcceptRoutine(t *testing.T) {
cfg.MaxNumInboundPeers = 5
// make switch
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
remotePeers := make([]*remotePeer, 0)
assert.Equal(t, 0, sw.Peers().Size())
// 1. check we connect up to MaxNumInboundPeers
for i := 0; i < cfg.MaxNumInboundPeers; i++ {
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
remotePeers = append(remotePeers, rp)
rp.Start()
c, err := rp.Dial(sw.NodeInfo().NetAddress())
require.NoError(t, err)
// spawn a reading routine to prevent connection from closing
go func(c net.Conn) {
for {
one := make([]byte, 1)
_, err := c.Read(one)
if err != nil {
return
}
}
}(c)
}
time.Sleep(10 * time.Millisecond)
assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size())
// 2. check we close new connections if we already have MaxNumInboundPeers peers
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
conn, err := rp.Dial(sw.NodeInfo().NetAddress())
require.NoError(t, err)
// check conn is closed
one := make([]byte, 1)
conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
_, err = conn.Read(one)
assert.Equal(t, io.EOF, err)
assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size())
rp.Stop()
// stop remote peers
for _, rp := range remotePeers {
rp.Stop()
}
}
func BenchmarkSwitchBroadcast(b *testing.B) { func BenchmarkSwitchBroadcast(b *testing.B) {
s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch { s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
// Make bar reactors of bar channels each // Make bar reactors of bar channels each


+ 20
- 2
p2p/test_util.go View File

@ -247,17 +247,35 @@ func testNodeInfo(id ID, name string) NodeInfo {
} }
func testNodeInfoWithNetwork(id ID, name, network string) NodeInfo { func testNodeInfoWithNetwork(id ID, name, network string) NodeInfo {
port, err := getFreePort()
if err != nil {
panic(err)
}
return DefaultNodeInfo{ return DefaultNodeInfo{
ProtocolVersion: defaultProtocolVersion, ProtocolVersion: defaultProtocolVersion,
ID_: id, ID_: id,
ListenAddr: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023),
ListenAddr: fmt.Sprintf("127.0.0.1:%d", port),
Network: network, Network: network,
Version: "1.2.3-rc0-deadbeef", Version: "1.2.3-rc0-deadbeef",
Channels: []byte{testCh}, Channels: []byte{testCh},
Moniker: name, Moniker: name,
Other: DefaultNodeInfoOther{ Other: DefaultNodeInfoOther{
TxIndex: "on", TxIndex: "on",
RPCAddress: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023),
RPCAddress: fmt.Sprintf("127.0.0.1:%d", port),
}, },
} }
} }
func getFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}

+ 10
- 6
p2p/transport.go View File

@ -52,6 +52,9 @@ type Transport interface {
// Dial connects to the Peer for the address. // Dial connects to the Peer for the address.
Dial(NetAddress, peerConfig) (Peer, error) Dial(NetAddress, peerConfig) (Peer, error)
// Cleanup any resources associated with Peer.
Cleanup(Peer)
} }
// transportLifecycle bundles the methods for callers to control start and stop // transportLifecycle bundles the methods for callers to control start and stop
@ -274,6 +277,13 @@ func (mt *MultiplexTransport) acceptPeers() {
} }
} }
// Cleanup removes the given address from the connections set and
// closes the connection.
func (mt *MultiplexTransport) Cleanup(peer Peer) {
mt.conns.RemoveAddr(peer.RemoteAddr())
_ = peer.CloseConn()
}
func (mt *MultiplexTransport) cleanup(c net.Conn) error { func (mt *MultiplexTransport) cleanup(c net.Conn) error {
mt.conns.Remove(c) mt.conns.Remove(c)
@ -418,12 +428,6 @@ func (mt *MultiplexTransport) wrapPeer(
PeerMetrics(cfg.metrics), PeerMetrics(cfg.metrics),
) )
// Wait for Peer to Stop so we can cleanup.
go func(c net.Conn) {
<-p.Quit()
_ = mt.cleanup(c)
}(c)
return p return p
} }


Loading…
Cancel
Save