diff --git a/.circleci/config.yml b/.circleci/config.yml index 5669384c6..ecc7c0ac7 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -240,7 +240,7 @@ jobs: for pkg in $(go list github.com/tendermint/tendermint/... | circleci tests split --split-by=timings); do id=$(basename "$pkg") - GOCACHE=off go test -timeout 5m -race -coverprofile=/tmp/workspace/profiles/$id.out -covermode=atomic "$pkg" | tee "/tmp/logs/$id-$RANDOM.log" + GOCACHE=off go test -v -timeout 5m -race -coverprofile=/tmp/workspace/profiles/$id.out -covermode=atomic "$pkg" | tee "/tmp/logs/$id-$RANDOM.log" done - persist_to_workspace: root: /tmp/workspace diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 06b2ec52c..183b9d60b 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -21,3 +21,4 @@ Special thanks to external contributors on this release: ### IMPROVEMENTS: ### BUG FIXES: +- [p2p] \#2967 Fix file descriptor leaks diff --git a/p2p/conn_set.go b/p2p/conn_set.go index f960c0e88..d64622783 100644 --- a/p2p/conn_set.go +++ b/p2p/conn_set.go @@ -11,6 +11,7 @@ type ConnSet interface { HasIP(net.IP) bool Set(net.Conn, []net.IP) Remove(net.Conn) + RemoveAddr(net.Addr) } type connSetItem struct { @@ -62,6 +63,13 @@ func (cs *connSet) Remove(c net.Conn) { delete(cs.conns, c.RemoteAddr().String()) } +func (cs *connSet) RemoveAddr(addr net.Addr) { + cs.Lock() + defer cs.Unlock() + + delete(cs.conns, addr.String()) +} + func (cs *connSet) Set(c net.Conn, ips []net.IP) { cs.Lock() defer cs.Unlock() diff --git a/p2p/dummy/peer.go b/p2p/dummy/peer.go index 71def27e0..57edafc67 100644 --- a/p2p/dummy/peer.go +++ b/p2p/dummy/peer.go @@ -55,6 +55,16 @@ func (p *peer) RemoteIP() net.IP { return net.ParseIP("127.0.0.1") } +// Addr always returns tcp://localhost:8800. +func (p *peer) RemoteAddr() net.Addr { + return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800} +} + +// CloseConn always returns nil. +func (p *peer) CloseConn() error { + return nil +} + // Status always returns empry connection status. func (p *peer) Status() tmconn.ConnectionStatus { return tmconn.ConnectionStatus{} diff --git a/p2p/peer.go b/p2p/peer.go index da301d497..73332a2aa 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -18,15 +18,18 @@ type Peer interface { cmn.Service FlushStop() - ID() ID // peer's cryptographic ID - RemoteIP() net.IP // remote IP of the connection + ID() ID // peer's cryptographic ID + RemoteIP() net.IP // remote IP of the connection + RemoteAddr() net.Addr // remote address of the connection IsOutbound() bool // did we dial the peer IsPersistent() bool // do we redial this peer when we disconnect + CloseConn() error // close original connection + NodeInfo() NodeInfo // peer's info Status() tmconn.ConnectionStatus - OriginalAddr() *NetAddress + OriginalAddr() *NetAddress // original address for outbound peers Send(byte, []byte) bool TrySend(byte, []byte) bool @@ -296,6 +299,11 @@ func (p *peer) hasChannel(chID byte) bool { return false } +// CloseConn closes original connection. Used for cleaning up in cases where the peer had not been started at all. +func (p *peer) CloseConn() error { + return p.peerConn.conn.Close() +} + //--------------------------------------------------- // methods only used for testing // TODO: can we remove these? @@ -305,8 +313,8 @@ func (pc *peerConn) CloseConn() { pc.conn.Close() // nolint: errcheck } -// Addr returns peer's remote network address. -func (p *peer) Addr() net.Addr { +// RemoteAddr returns peer's remote network address. +func (p *peer) RemoteAddr() net.Addr { return p.peerConn.conn.RemoteAddr() } diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 3eb5357d3..1d2372fb0 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -30,6 +30,8 @@ func (mp *mockPeer) Get(s string) interface{} { return s } func (mp *mockPeer) Set(string, interface{}) {} func (mp *mockPeer) RemoteIP() net.IP { return mp.ip } func (mp *mockPeer) OriginalAddr() *NetAddress { return nil } +func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } +func (mp *mockPeer) CloseConn() error { return nil } // Returns a mock peer func newMockPeer(ip net.IP) *mockPeer { diff --git a/p2p/peer_test.go b/p2p/peer_test.go index e53d6013b..90be31131 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -39,7 +39,7 @@ func TestPeerBasic(t *testing.T) { assert.False(p.IsPersistent()) p.persistent = true assert.True(p.IsPersistent()) - assert.Equal(rp.Addr().DialString(), p.Addr().String()) + assert.Equal(rp.Addr().DialString(), p.RemoteAddr().String()) assert.Equal(rp.ID(), p.ID()) } @@ -137,9 +137,9 @@ type remotePeer struct { PrivKey crypto.PrivKey Config *config.P2PConfig addr *NetAddress - quit chan struct{} channels cmn.HexBytes listenAddr string + listener net.Listener } func (rp *remotePeer) Addr() *NetAddress { @@ -159,25 +159,45 @@ func (rp *remotePeer) Start() { if e != nil { golog.Fatalf("net.Listen tcp :0: %+v", e) } + rp.listener = l rp.addr = NewNetAddress(PubKeyToID(rp.PrivKey.PubKey()), l.Addr()) - rp.quit = make(chan struct{}) if rp.channels == nil { rp.channels = []byte{testCh} } - go rp.accept(l) + go rp.accept() } func (rp *remotePeer) Stop() { - close(rp.quit) + rp.listener.Close() } -func (rp *remotePeer) accept(l net.Listener) { +func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) { + conn, err := addr.DialTimeout(1 * time.Second) + if err != nil { + return nil, err + } + pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey) + if err != nil { + return nil, err + } + _, err = handshake(pc.conn, time.Second, rp.nodeInfo()) + if err != nil { + return nil, err + } + return conn, err +} + +func (rp *remotePeer) accept() { conns := []net.Conn{} for { - conn, err := l.Accept() + conn, err := rp.listener.Accept() if err != nil { - golog.Fatalf("Failed to accept conn: %+v", err) + golog.Printf("Failed to accept conn: %+v", err) + for _, conn := range conns { + _ = conn.Close() + } + return } pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey) @@ -185,31 +205,20 @@ func (rp *remotePeer) accept(l net.Listener) { golog.Fatalf("Failed to create a peer: %+v", err) } - _, err = handshake(pc.conn, time.Second, rp.nodeInfo(l)) + _, err = handshake(pc.conn, time.Second, rp.nodeInfo()) if err != nil { golog.Fatalf("Failed to perform handshake: %+v", err) } conns = append(conns, conn) - - select { - case <-rp.quit: - for _, conn := range conns { - if err := conn.Close(); err != nil { - golog.Fatal(err) - } - } - return - default: - } } } -func (rp *remotePeer) nodeInfo(l net.Listener) NodeInfo { +func (rp *remotePeer) nodeInfo() NodeInfo { return DefaultNodeInfo{ ProtocolVersion: defaultProtocolVersion, ID_: rp.Addr().ID, - ListenAddr: l.Addr().String(), + ListenAddr: rp.listener.Addr().String(), Network: "testing", Version: "1.2.3-rc0-deadbeef", Channels: rp.channels, diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 2e2f3f249..f5125c603 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -404,6 +404,8 @@ func (mockPeer) TrySend(byte, []byte) bool { return false } func (mockPeer) Set(string, interface{}) {} func (mockPeer) Get(string) interface{} { return nil } func (mockPeer) OriginalAddr() *p2p.NetAddress { return nil } +func (mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800} } +func (mockPeer) CloseConn() error { return nil } func assertPeersWithTimeout( t *testing.T, diff --git a/p2p/switch.go b/p2p/switch.go index 0490eebb8..dbd9c2a60 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -210,6 +210,7 @@ func (sw *Switch) OnStart() error { func (sw *Switch) OnStop() { // Stop peers for _, p := range sw.peers.List() { + sw.transport.Cleanup(p) p.Stop() if sw.peers.Remove(p) { sw.metrics.Peers.Add(float64(-1)) @@ -304,6 +305,7 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { if sw.peers.Remove(peer) { sw.metrics.Peers.Add(float64(-1)) } + sw.transport.Cleanup(peer) peer.Stop() for _, reactor := range sw.reactors { reactor.RemovePeer(peer, reason) @@ -529,13 +531,16 @@ func (sw *Switch) acceptRoutine() { "max", sw.config.MaxNumInboundPeers, ) - _ = p.Stop() + sw.transport.Cleanup(p) continue } if err := sw.addPeer(p); err != nil { - _ = p.Stop() + sw.transport.Cleanup(p) + if p.IsRunning() { + _ = p.Stop() + } sw.Logger.Info( "Ignoring inbound connection: error while adding peer", "err", err, @@ -593,7 +598,10 @@ func (sw *Switch) addOutboundPeerWithConfig( } if err := sw.addPeer(p); err != nil { - _ = p.Stop() + sw.transport.Cleanup(p) + if p.IsRunning() { + _ = p.Stop() + } return err } @@ -628,7 +636,8 @@ func (sw *Switch) filterPeer(p Peer) error { return nil } -// addPeer starts up the Peer and adds it to the Switch. +// addPeer starts up the Peer and adds it to the Switch. Error is returned if +// the peer is filtered out or failed to start or can't be added. func (sw *Switch) addPeer(p Peer) error { if err := sw.filterPeer(p); err != nil { return err @@ -636,11 +645,15 @@ func (sw *Switch) addPeer(p Peer) error { p.SetLogger(sw.Logger.With("peer", p.NodeInfo().NetAddress())) - // All good. Start peer + // Handle the shut down case where the switch has stopped but we're + // concurrently trying to add a peer. if sw.IsRunning() { + // All good. Start peer if err := sw.startInitPeer(p); err != nil { return err } + } else { + sw.Logger.Error("Won't start a peer - switch is not running", "peer", p) } // Add the peer to .peers. diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 6c515be02..358661616 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -3,7 +3,9 @@ package p2p import ( "bytes" "fmt" + "io" "io/ioutil" + "net" "net/http" "net/http/httptest" "regexp" @@ -13,7 +15,6 @@ import ( "time" stdprometheus "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -477,6 +478,58 @@ func TestSwitchFullConnectivity(t *testing.T) { } } +func TestSwitchAcceptRoutine(t *testing.T) { + cfg.MaxNumInboundPeers = 5 + + // make switch + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + err := sw.Start() + require.NoError(t, err) + defer sw.Stop() + + remotePeers := make([]*remotePeer, 0) + assert.Equal(t, 0, sw.Peers().Size()) + + // 1. check we connect up to MaxNumInboundPeers + for i := 0; i < cfg.MaxNumInboundPeers; i++ { + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + remotePeers = append(remotePeers, rp) + rp.Start() + c, err := rp.Dial(sw.NodeInfo().NetAddress()) + require.NoError(t, err) + // spawn a reading routine to prevent connection from closing + go func(c net.Conn) { + for { + one := make([]byte, 1) + _, err := c.Read(one) + if err != nil { + return + } + } + }(c) + } + time.Sleep(10 * time.Millisecond) + assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size()) + + // 2. check we close new connections if we already have MaxNumInboundPeers peers + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + rp.Start() + conn, err := rp.Dial(sw.NodeInfo().NetAddress()) + require.NoError(t, err) + // check conn is closed + one := make([]byte, 1) + conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)) + _, err = conn.Read(one) + assert.Equal(t, io.EOF, err) + assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size()) + rp.Stop() + + // stop remote peers + for _, rp := range remotePeers { + rp.Stop() + } +} + func BenchmarkSwitchBroadcast(b *testing.B) { s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch { // Make bar reactors of bar channels each diff --git a/p2p/test_util.go b/p2p/test_util.go index ea788b79f..04629fcae 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -247,17 +247,35 @@ func testNodeInfo(id ID, name string) NodeInfo { } func testNodeInfoWithNetwork(id ID, name, network string) NodeInfo { + port, err := getFreePort() + if err != nil { + panic(err) + } return DefaultNodeInfo{ ProtocolVersion: defaultProtocolVersion, ID_: id, - ListenAddr: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023), + ListenAddr: fmt.Sprintf("127.0.0.1:%d", port), Network: network, Version: "1.2.3-rc0-deadbeef", Channels: []byte{testCh}, Moniker: name, Other: DefaultNodeInfoOther{ TxIndex: "on", - RPCAddress: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023), + RPCAddress: fmt.Sprintf("127.0.0.1:%d", port), }, } } + +func getFreePort() (int, error) { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return 0, err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return 0, err + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil +} diff --git a/p2p/transport.go b/p2p/transport.go index 69fab312c..2d4420a11 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -52,6 +52,9 @@ type Transport interface { // Dial connects to the Peer for the address. Dial(NetAddress, peerConfig) (Peer, error) + + // Cleanup any resources associated with Peer. + Cleanup(Peer) } // transportLifecycle bundles the methods for callers to control start and stop @@ -274,6 +277,13 @@ func (mt *MultiplexTransport) acceptPeers() { } } +// Cleanup removes the given address from the connections set and +// closes the connection. +func (mt *MultiplexTransport) Cleanup(peer Peer) { + mt.conns.RemoveAddr(peer.RemoteAddr()) + _ = peer.CloseConn() +} + func (mt *MultiplexTransport) cleanup(c net.Conn) error { mt.conns.Remove(c) @@ -418,12 +428,6 @@ func (mt *MultiplexTransport) wrapPeer( PeerMetrics(cfg.metrics), ) - // Wait for Peer to Stop so we can cleanup. - go func(c net.Conn) { - <-p.Quit() - _ = mt.cleanup(c) - }(c) - return p }