diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 105288bdc..dc5bacc39 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -421,7 +421,6 @@ func (c *MConnection) CanSend(chID byte) bool { // sendRoutine polls for packets to send from channels. func (c *MConnection) sendRoutine() { defer c._recover() - protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter) FOR_LOOP: diff --git a/internal/p2p/transport_mconn_test.go b/internal/p2p/transport_mconn_test.go index 541f4767e..06cd93c0a 100644 --- a/internal/p2p/transport_mconn_test.go +++ b/internal/p2p/transport_mconn_test.go @@ -151,9 +151,6 @@ func TestMConnTransport_Listen(t *testing.T) { []*p2p.ChannelDescriptor{{ID: byte(chID), Priority: 1}}, p2p.MConnTransportOptions{}, ) - t.Cleanup(func() { - _ = transport.Close() - }) // Transport should not listen on any endpoints yet. require.Empty(t, transport.Endpoints()) @@ -166,19 +163,6 @@ func TestMConnTransport_Listen(t *testing.T) { } require.NoError(t, err) - // Start a goroutine to just accept any connections. - go func() { - for { - conn, err := transport.Accept() - if err != nil { - return - } - defer func() { - _ = conn.Close() - }() - } - }() - // Check the endpoint. endpoints := transport.Endpoints() require.Len(t, endpoints, 1) @@ -195,14 +179,40 @@ func TestMConnTransport_Listen(t *testing.T) { require.NotZero(t, endpoint.Port) require.Empty(t, endpoint.Path) - // Dialing the endpoint should work. - conn, err := transport.Dial(ctx, endpoint) + dialedChan := make(chan struct{}) + + var peerConn p2p.Connection + go func() { + // Dialing the endpoint should work. + var err error + peerConn, err = transport.Dial(ctx, endpoint) + require.NoError(t, err) + close(dialedChan) + }() + + conn, err := transport.Accept() require.NoError(t, err) - require.NoError(t, conn.Close()) + _ = conn.Close() + <-dialedChan + + time.Sleep(time.Minute) + // closing the connection should not error + require.NoError(t, peerConn.Close()) + + // try to read from the connection should error + _, _, err = peerConn.ReceiveMessage() + require.Error(t, err) // Trying to listen again should error. err = transport.Listen(tc.endpoint) require.Error(t, err) + + // close the transport + _ = transport.Close() + + // Dialing the closed endpoint should error + _, err = transport.Dial(ctx, endpoint) + require.Error(t, err) }) } } diff --git a/internal/p2p/transport_test.go b/internal/p2p/transport_test.go index 2d9d9f9f3..1b8ab77f5 100644 --- a/internal/p2p/transport_test.go +++ b/internal/p2p/transport_test.go @@ -403,6 +403,9 @@ func TestConnection_SendReceive(t *testing.T) { _, _, err = ab.ReceiveMessage() require.Error(t, err) require.Equal(t, io.EOF, err) + _, err = ab.TrySendMessage(chID, []byte("closed try")) + require.Error(t, err) + require.Equal(t, io.EOF, err) _, err = ab.SendMessage(chID, []byte("closed")) require.Error(t, err) require.Equal(t, io.EOF, err) @@ -410,6 +413,9 @@ func TestConnection_SendReceive(t *testing.T) { _, _, err = ba.ReceiveMessage() require.Error(t, err) require.Equal(t, io.EOF, err) + _, err = ba.TrySendMessage(chID, []byte("closed try")) + require.Error(t, err) + require.Equal(t, io.EOF, err) _, err = ba.SendMessage(chID, []byte("closed")) require.Error(t, err) require.Equal(t, io.EOF, err)