Browse Source

p2p: make mconn transport test less flaky (#7973)

The previous implementation of the *test* was flaky, and this irons
out some of those problems. The primary assertion that was failing
(less than 1% of the time) was an error on close that I think we
shouldn't care about.
pull/7980/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
9968f53c15
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 12 deletions
  1. +9
    -0
      internal/p2p/transport_memory.go
  2. +11
    -12
      internal/p2p/transport_test.go

+ 9
- 0
internal/p2p/transport_memory.go View File

@ -94,17 +94,23 @@ type MemoryTransport struct {
bufferSize int bufferSize int
acceptCh chan *MemoryConnection acceptCh chan *MemoryConnection
closeCh chan struct{}
closeFn func()
} }
// newMemoryTransport creates a new MemoryTransport. This is for internal use by // newMemoryTransport creates a new MemoryTransport. This is for internal use by
// MemoryNetwork, use MemoryNetwork.CreateTransport() instead. // MemoryNetwork, use MemoryNetwork.CreateTransport() instead.
func newMemoryTransport(network *MemoryNetwork, nodeID types.NodeID) *MemoryTransport { func newMemoryTransport(network *MemoryNetwork, nodeID types.NodeID) *MemoryTransport {
once := &sync.Once{}
closeCh := make(chan struct{})
return &MemoryTransport{ return &MemoryTransport{
logger: network.logger.With("local", nodeID), logger: network.logger.With("local", nodeID),
network: network, network: network,
nodeID: nodeID, nodeID: nodeID,
bufferSize: network.bufferSize, bufferSize: network.bufferSize,
acceptCh: make(chan *MemoryConnection), acceptCh: make(chan *MemoryConnection),
closeCh: closeCh,
closeFn: func() { once.Do(func() { close(closeCh) }) },
} }
} }
@ -141,6 +147,8 @@ func (t *MemoryTransport) Endpoints() []Endpoint {
// Accept implements Transport. // Accept implements Transport.
func (t *MemoryTransport) Accept(ctx context.Context) (Connection, error) { func (t *MemoryTransport) Accept(ctx context.Context) (Connection, error) {
select { select {
case <-t.closeCh:
return nil, io.EOF
case conn := <-t.acceptCh: case conn := <-t.acceptCh:
t.logger.Info("accepted connection", "remote", conn.RemoteEndpoint().Path) t.logger.Info("accepted connection", "remote", conn.RemoteEndpoint().Path)
return conn, nil return conn, nil
@ -197,6 +205,7 @@ func (t *MemoryTransport) Dial(ctx context.Context, endpoint Endpoint) (Connecti
// Close implements Transport. // Close implements Transport.
func (t *MemoryTransport) Close() error { func (t *MemoryTransport) Close() error {
t.network.RemoveTransport(t.nodeID) t.network.RemoveTransport(t.nodeID)
t.closeFn()
return nil return nil
} }


+ 11
- 12
internal/p2p/transport_test.go View File

@ -46,23 +46,22 @@ func TestTransport_AcceptClose(t *testing.T) {
withTransports(ctx, t, func(ctx context.Context, t *testing.T, makeTransport transportFactory) { withTransports(ctx, t, func(ctx context.Context, t *testing.T, makeTransport transportFactory) {
a := makeTransport(t) a := makeTransport(t)
opctx, opcancel := context.WithCancel(ctx)
// In-progress Accept should error on concurrent close.
errCh := make(chan error, 1)
go func() {
time.Sleep(200 * time.Millisecond)
opcancel()
errCh <- a.Close()
}()
opctx, opcancel := context.WithTimeout(ctx, 200*time.Millisecond)
defer opcancel()
_, err := a.Accept(opctx) _, err := a.Accept(opctx)
require.Error(t, err) require.Error(t, err)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
require.NoError(t, <-errCh)
// Closed transport should return error immediately.
_, err = a.Accept(opctx)
<-opctx.Done()
_ = a.Close()
// Closed transport should return error immediately,
// because the transport is closed. We use the base
// context (ctx) rather than the operation context
// (opctx) because using the later would mean this
// could error because the context was canceled.
_, err = a.Accept(ctx)
require.Error(t, err) require.Error(t, err)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
}) })


Loading…
Cancel
Save