Browse Source

p2p/conn: FlushStop. Use in pex. Closes #2092 (#2802)

* p2p/conn: FlushStop. Use in pex. Closes #2092

In seed mode, we call StopPeer immediately after Send.
Since flushing msgs to the peer happens in the background,
the peer connection is often closed before the messages are
actually sent out. The new FlushStop method allows all msgs
to first be written and flushed out on the conn before it is closed.

* fix dummy peer

* typo

* fixes from review

* more comments

* ensure pex doesn't call FlushStop more than once

FlushStop is not safe to call more than once,
but we call it from Receive in a go-routine so Receive
doesn't block.

To ensure we only call it once, we use the lastReceivedRequests
map - if an entry already exists, then FlushStop should already have
been called and we can return.
pull/2869/head
Ethan Buchman 6 years ago
committed by GitHub
parent
commit
0d5e0d2f13
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 134 additions and 16 deletions
  1. +1
    -0
      blockchain/reactor_test.go
  2. +56
    -6
      p2p/conn/connection.go
  3. +37
    -0
      p2p/conn/connection_test.go
  4. +5
    -0
      p2p/dummy/peer.go
  5. +10
    -0
      p2p/peer.go
  6. +1
    -0
      p2p/peer_set_test.go
  7. +23
    -10
      p2p/pex/pex_reactor.go
  8. +1
    -0
      p2p/pex/pex_reactor_test.go

+ 1
- 0
blockchain/reactor_test.go View File

@ -197,6 +197,7 @@ func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool {
return true
}
func (tp *bcrTestPeer) FlushStop() {}
func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) }
func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} }
func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} }


+ 56
- 6
p2p/conn/connection.go View File

@ -84,7 +84,11 @@ type MConnection struct {
errored uint32
config MConnConfig
quit chan struct{}
// Closing quitSendRoutine will cause
// doneSendRoutine to close.
quitSendRoutine chan struct{}
doneSendRoutine chan struct{}
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
pingTimer *cmn.RepeatTimer // send pings periodically
@ -190,7 +194,8 @@ func (c *MConnection) OnStart() error {
if err := c.BaseService.OnStart(); err != nil {
return err
}
c.quit = make(chan struct{})
c.quitSendRoutine = make(chan struct{})
c.doneSendRoutine = make(chan struct{})
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval)
c.pongTimeoutCh = make(chan bool, 1)
@ -200,15 +205,59 @@ func (c *MConnection) OnStart() error {
return nil
}
// FlushStop replicates the logic of OnStop.
// It additionally ensures that all successful
// .Send() calls will get flushed before closing
// the connection.
// NOTE: it is not safe to call this method more than once.
func (c *MConnection) FlushStop() {
c.BaseService.OnStop()
c.flushTimer.Stop()
c.pingTimer.Stop()
c.chStatsTimer.Stop()
if c.quitSendRoutine != nil {
close(c.quitSendRoutine)
// wait until the sendRoutine exits
// so we dont race on calling sendSomePacketMsgs
<-c.doneSendRoutine
}
// Send and flush all pending msgs.
// By now, IsRunning == false,
// so any concurrent attempts to send will fail.
// Since sendRoutine has exited, we can call this
// safely
eof := c.sendSomePacketMsgs()
for !eof {
eof = c.sendSomePacketMsgs()
}
c.flush()
// Now we can close the connection
c.conn.Close() // nolint: errcheck
// We can't close pong safely here because
// recvRoutine may write to it after we've stopped.
// Though it doesn't need to get closed at all,
// we close it @ recvRoutine.
// c.Stop()
}
// OnStop implements BaseService
func (c *MConnection) OnStop() {
select {
case <-c.quitSendRoutine:
// already quit via FlushStop
return
default:
}
c.BaseService.OnStop()
c.flushTimer.Stop()
c.pingTimer.Stop()
c.chStatsTimer.Stop()
if c.quit != nil {
close(c.quit)
}
close(c.quitSendRoutine)
c.conn.Close() // nolint: errcheck
// We can't close pong safely here because
@ -365,7 +414,8 @@ FOR_LOOP:
}
c.sendMonitor.Update(int(_n))
c.flush()
case <-c.quit:
case <-c.quitSendRoutine:
close(c.doneSendRoutine)
break FOR_LOOP
case <-c.send:
// Send some PacketMsgs


+ 37
- 0
p2p/conn/connection_test.go View File

@ -36,6 +36,43 @@ func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msg
return c
}
func TestMConnectionSendFlushStop(t *testing.T) {
server, client := NetPipe()
defer server.Close() // nolint: errcheck
defer client.Close() // nolint: errcheck
clientConn := createTestMConnection(client)
err := clientConn.Start()
require.Nil(t, err)
defer clientConn.Stop()
msg := []byte("abc")
assert.True(t, clientConn.Send(0x01, msg))
aminoMsgLength := 14
// start the reader in a new routine, so we can flush
errCh := make(chan error)
go func() {
msgB := make([]byte, aminoMsgLength)
_, err := server.Read(msgB)
if err != nil {
t.Fatal(err)
}
errCh <- err
}()
// stop the conn - it should flush all conns
clientConn.FlushStop()
timer := time.NewTimer(3 * time.Second)
select {
case <-errCh:
case <-timer.C:
t.Error("timed out waiting for msgs to be read")
}
}
func TestMConnectionSend(t *testing.T) {
server, client := NetPipe()
defer server.Close() // nolint: errcheck


+ 5
- 0
p2p/dummy/peer.go View File

@ -25,6 +25,11 @@ func NewPeer() *peer {
return p
}
// FlushStop just calls Stop.
func (p *peer) FlushStop() {
p.Stop()
}
// ID always returns dummy.
func (p *peer) ID() p2p.ID {
return p2p.ID("dummy")


+ 10
- 0
p2p/peer.go View File

@ -17,6 +17,7 @@ const metricsTickerDuration = 10 * time.Second
// Peer is an interface representing a peer connected on a reactor.
type Peer interface {
cmn.Service
FlushStop()
ID() ID // peer's cryptographic ID
RemoteIP() net.IP // remote IP of the connection
@ -184,6 +185,15 @@ func (p *peer) OnStart() error {
return nil
}
// FlushStop mimics OnStop but additionally ensures that all successful
// .Send() calls will get flushed before closing the connection.
// NOTE: it is not safe to call this method more than once.
func (p *peer) FlushStop() {
p.metricsTicker.Stop()
p.BaseService.OnStop()
p.mconn.FlushStop() // stop everything and close the conn
}
// OnStop implements BaseService.
func (p *peer) OnStop() {
p.metricsTicker.Stop()


+ 1
- 0
p2p/peer_set_test.go View File

@ -18,6 +18,7 @@ type mockPeer struct {
id ID
}
func (mp *mockPeer) FlushStop() { mp.Stop() }
func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }


+ 23
- 10
p2p/pex/pex_reactor.go View File

@ -208,25 +208,38 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *pexRequestMessage:
// Check we're not receiving too many requests
if err := r.receiveRequest(src); err != nil {
r.Switch.StopPeerForError(src, err)
return
}
// Seeds disconnect after sending a batch of addrs
// NOTE: this is a prime candidate for amplification attacks
// NOTE: this is a prime candidate for amplification attacks,
// so it's important we
// 1) restrict how frequently peers can request
// 2) limit the output size
if r.config.SeedMode {
// If we're a seed and this is an inbound peer,
// respond once and disconnect.
if r.config.SeedMode && !src.IsOutbound() {
id := string(src.ID())
v := r.lastReceivedRequests.Get(id)
if v != nil {
// FlushStop/StopPeer are already
// running in a go-routine.
return
}
r.lastReceivedRequests.Set(id, time.Now())
// Send addrs and disconnect
r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
go func() {
// TODO Fix properly #2092
time.Sleep(time.Second * 5)
// In a go-routine so it doesn't block .Receive.
src.FlushStop()
r.Switch.StopPeerGracefully(src)
}()
} else {
// Check we're not receiving requests too frequently.
if err := r.receiveRequest(src); err != nil {
r.Switch.StopPeerForError(src, err)
return
}
r.SendAddrs(src, r.book.GetSelection())
}


+ 1
- 0
p2p/pex/pex_reactor_test.go View File

@ -387,6 +387,7 @@ func newMockPeer() mockPeer {
return mp
}
func (mp mockPeer) FlushStop() { mp.Stop() }
func (mp mockPeer) ID() p2p.ID { return mp.addr.ID }
func (mp mockPeer) IsOutbound() bool { return mp.outbound }
func (mp mockPeer) IsPersistent() bool { return mp.persistent }


Loading…
Cancel
Save