From 9968f53c1578ae6b24d28431ec27318a7a004e24 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 23 Feb 2022 14:06:03 -0500 Subject: [PATCH] p2p: make mconn transport test less flaky (#7973) The previous implementation of the *test* was flaky, and this irons out some of those problems. The primary assertion that was failing (less than 1% of the time) was an error on close that I think we shouldn't care about. --- internal/p2p/transport_memory.go | 9 +++++++++ internal/p2p/transport_test.go | 23 +++++++++++------------ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/internal/p2p/transport_memory.go b/internal/p2p/transport_memory.go index f363c12be..528ce4bb6 100644 --- a/internal/p2p/transport_memory.go +++ b/internal/p2p/transport_memory.go @@ -94,17 +94,23 @@ type MemoryTransport struct { bufferSize int acceptCh chan *MemoryConnection + closeCh chan struct{} + closeFn func() } // newMemoryTransport creates a new MemoryTransport. This is for internal use by // MemoryNetwork, use MemoryNetwork.CreateTransport() instead. func newMemoryTransport(network *MemoryNetwork, nodeID types.NodeID) *MemoryTransport { + once := &sync.Once{} + closeCh := make(chan struct{}) return &MemoryTransport{ logger: network.logger.With("local", nodeID), network: network, nodeID: nodeID, bufferSize: network.bufferSize, acceptCh: make(chan *MemoryConnection), + closeCh: closeCh, + closeFn: func() { once.Do(func() { close(closeCh) }) }, } } @@ -141,6 +147,8 @@ func (t *MemoryTransport) Endpoints() []Endpoint { // Accept implements Transport. func (t *MemoryTransport) Accept(ctx context.Context) (Connection, error) { select { + case <-t.closeCh: + return nil, io.EOF case conn := <-t.acceptCh: t.logger.Info("accepted connection", "remote", conn.RemoteEndpoint().Path) return conn, nil @@ -197,6 +205,7 @@ func (t *MemoryTransport) Dial(ctx context.Context, endpoint Endpoint) (Connecti // Close implements Transport. func (t *MemoryTransport) Close() error { t.network.RemoveTransport(t.nodeID) + t.closeFn() return nil } diff --git a/internal/p2p/transport_test.go b/internal/p2p/transport_test.go index 63ce5ad5b..8f4f90483 100644 --- a/internal/p2p/transport_test.go +++ b/internal/p2p/transport_test.go @@ -46,23 +46,22 @@ func TestTransport_AcceptClose(t *testing.T) { withTransports(ctx, t, func(ctx context.Context, t *testing.T, makeTransport transportFactory) { a := makeTransport(t) - opctx, opcancel := context.WithCancel(ctx) - - // In-progress Accept should error on concurrent close. - errCh := make(chan error, 1) - go func() { - time.Sleep(200 * time.Millisecond) - opcancel() - errCh <- a.Close() - }() + opctx, opcancel := context.WithTimeout(ctx, 200*time.Millisecond) + defer opcancel() _, err := a.Accept(opctx) require.Error(t, err) require.Equal(t, io.EOF, err) - require.NoError(t, <-errCh) - // Closed transport should return error immediately. - _, err = a.Accept(opctx) + <-opctx.Done() + _ = a.Close() + + // Closed transport should return error immediately, + // because the transport is closed. We use the base + // context (ctx) rather than the operation context + // (opctx) because using the later would mean this + // could error because the context was canceled. + _, err = a.Accept(ctx) require.Error(t, err) require.Equal(t, io.EOF, err) })