diff --git a/p2p/peer.go b/p2p/peer.go index f0d3a6758..6724ba4d6 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -27,14 +27,16 @@ func peerHandshake(conn net.Conn, ourNodeInfo *types.NodeInfo) (*types.NodeInfo, var peerNodeInfo = new(types.NodeInfo) var err1 error var err2 error - Parallel(func() { - var n int64 - binary.WriteBinary(ourNodeInfo, conn, &n, &err1) - }, func() { - var n int64 - binary.ReadBinary(peerNodeInfo, conn, &n, &err2) - log.Info("Peer handshake", "peerNodeInfo", peerNodeInfo) - }) + Parallel( + func() { + var n int64 + binary.WriteBinary(ourNodeInfo, conn, &n, &err1) + }, + func() { + var n int64 + binary.ReadBinary(peerNodeInfo, conn, &n, &err2) + log.Info("Peer handshake", "peerNodeInfo", peerNodeInfo) + }) if err1 != nil { return nil, err1 } diff --git a/p2p/secret_connection.go b/p2p/secret_connection.go index 8d1b8e44f..2cb524cf8 100644 --- a/p2p/secret_connection.go +++ b/p2p/secret_connection.go @@ -7,6 +7,8 @@ import ( "crypto/sha256" "encoding/binary" "errors" + "net" + "time" //"fmt" "io" "sync" @@ -17,6 +19,7 @@ import ( acm "github.com/tendermint/tendermint/account" bm "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" ) // 2 + 1024 == 1026 total frame size @@ -25,6 +28,7 @@ const dataMaxSize = 1024 const totalFrameSize = dataMaxSize + dataLenSize const sealedFrameSize = totalFrameSize + secretbox.Overhead +// Implements net.Conn type SecretConnection struct { conn io.ReadWriteCloser recvBuffer []byte @@ -157,8 +161,16 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) { return } -func (sc *SecretConnection) Close() error { - return sc.conn.Close() +// Implements net.Conn +func (sc *SecretConnection) Close() error { return sc.conn.Close() } +func (sc *SecretConnection) LocalAddr() net.Addr { return sc.conn.(net.Conn).LocalAddr() } +func (sc *SecretConnection) RemoteAddr() net.Addr { return sc.conn.(net.Conn).RemoteAddr() } +func (sc *SecretConnection) SetDeadline(t time.Time) error { return sc.conn.(net.Conn).SetDeadline(t) } +func (sc *SecretConnection) SetReadDeadline(t time.Time) error { + return sc.conn.(net.Conn).SetReadDeadline(t) +} +func (sc *SecretConnection) SetWriteDeadline(t time.Time) error { + return sc.conn.(net.Conn).SetWriteDeadline(t) } func genEphKeys() (ephPub, ephPriv *[32]byte) { @@ -246,28 +258,23 @@ type authSigMessage struct { func shareAuthSignature(sc *SecretConnection, pubKey acm.PubKeyEd25519, signature acm.SignatureEd25519) (acm.PubKeyEd25519, acm.SignatureEd25519, error) { var recvMsg authSigMessage var err1, err2 error - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - msgBytes := bm.BinaryBytes(authSigMessage{pubKey, signature}) - _, err1 = sc.Write(msgBytes) - }() + Parallel( + func() { + msgBytes := bm.BinaryBytes(authSigMessage{pubKey, signature}) + _, err1 = sc.Write(msgBytes) + }, + func() { + // NOTE relies on atomicity of small data. + readBuffer := make([]byte, dataMaxSize) + _, err2 = sc.Read(readBuffer) + if err2 != nil { + return + } + n := int64(0) // not used. + recvMsg = bm.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), &n, &err2).(authSigMessage) + }) - go func() { - defer wg.Done() - // NOTE relies on atomicity of small data. - readBuffer := make([]byte, dataMaxSize) - _, err2 = sc.Read(readBuffer) - if err2 != nil { - return - } - n := int64(0) // not used. - recvMsg = bm.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), &n, &err2).(authSigMessage) - }() - - wg.Wait() if err1 != nil { return nil, nil, err1 } diff --git a/p2p/secret_connection_test.go b/p2p/secret_connection_test.go index dbf289c26..d90e54285 100644 --- a/p2p/secret_connection_test.go +++ b/p2p/secret_connection_test.go @@ -37,29 +37,31 @@ func makeSecretConnPair(tb testing.TB) (fooSecConn, barSecConn *SecretConnection barPrvKey := acm.PrivKeyEd25519(CRandBytes(32)) barPubKey := barPrvKey.PubKey().(acm.PubKeyEd25519) - Parallel(func() { - var err error - fooSecConn, err = MakeSecretConnection(fooConn, fooPrvKey) - if err != nil { - tb.Errorf("Failed to establish SecretConnection for foo: %v", err) - return - } - if !bytes.Equal(fooSecConn.RemotePubKey(), barPubKey) { - tb.Errorf("Unexpected fooSecConn.RemotePubKey. Expected %v, got %v", - barPubKey, fooSecConn.RemotePubKey()) - } - }, func() { - var err error - barSecConn, err = MakeSecretConnection(barConn, barPrvKey) - if barSecConn == nil { - tb.Errorf("Failed to establish SecretConnection for bar: %v", err) - return - } - if !bytes.Equal(barSecConn.RemotePubKey(), fooPubKey) { - tb.Errorf("Unexpected barSecConn.RemotePubKey. Expected %v, got %v", - fooPubKey, barSecConn.RemotePubKey()) - } - }) + Parallel( + func() { + var err error + fooSecConn, err = MakeSecretConnection(fooConn, fooPrvKey) + if err != nil { + tb.Errorf("Failed to establish SecretConnection for foo: %v", err) + return + } + if !bytes.Equal(fooSecConn.RemotePubKey(), barPubKey) { + tb.Errorf("Unexpected fooSecConn.RemotePubKey. Expected %v, got %v", + barPubKey, fooSecConn.RemotePubKey()) + } + }, + func() { + var err error + barSecConn, err = MakeSecretConnection(barConn, barPrvKey) + if barSecConn == nil { + tb.Errorf("Failed to establish SecretConnection for bar: %v", err) + return + } + if !bytes.Equal(barSecConn.RemotePubKey(), fooPubKey) { + tb.Errorf("Unexpected barSecConn.RemotePubKey. Expected %v, got %v", + fooPubKey, barSecConn.RemotePubKey()) + } + }) return } @@ -92,35 +94,37 @@ func TestSecretConnectionReadWrite(t *testing.T) { return } // In parallel, handle reads and writes - Parallel(func() { - // Node writes - for _, nodeWrite := range nodeWrites { - n, err := nodeSecretConn.Write([]byte(nodeWrite)) - if err != nil { - t.Errorf("Failed to write to nodeSecretConn: %v", err) - return - } - if n != len(nodeWrite) { - t.Errorf("Failed to write all bytes. Expected %v, wrote %v", len(nodeWrite), n) - return + Parallel( + func() { + // Node writes + for _, nodeWrite := range nodeWrites { + n, err := nodeSecretConn.Write([]byte(nodeWrite)) + if err != nil { + t.Errorf("Failed to write to nodeSecretConn: %v", err) + return + } + if n != len(nodeWrite) { + t.Errorf("Failed to write all bytes. Expected %v, wrote %v", len(nodeWrite), n) + return + } } - } - nodeConn.PipeWriter.Close() - }, func() { - // Node reads - readBuffer := make([]byte, dataMaxSize) - for { - n, err := nodeSecretConn.Read(readBuffer) - if err == io.EOF { - return - } else if err != nil { - t.Errorf("Failed to read from nodeSecretConn: %v", err) - return + nodeConn.PipeWriter.Close() + }, + func() { + // Node reads + readBuffer := make([]byte, dataMaxSize) + for { + n, err := nodeSecretConn.Read(readBuffer) + if err == io.EOF { + return + } else if err != nil { + t.Errorf("Failed to read from nodeSecretConn: %v", err) + return + } + *nodeReads = append(*nodeReads, string(readBuffer[:n])) } - *nodeReads = append(*nodeReads, string(readBuffer[:n])) - } - nodeConn.PipeReader.Close() - }) + nodeConn.PipeReader.Close() + }) } }