diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 9acaf6175..e4ae4df80 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -7,7 +7,6 @@ import ( "io" "math" "net" - "runtime/debug" "sync/atomic" "time" @@ -230,8 +229,7 @@ func (c *MConnection) flush() { // Catch panics, usually caused by remote disconnects. func (c *MConnection) _recover() { if r := recover(); r != nil { - stack := debug.Stack() - err := cmn.StackError{r, stack} + err := cmn.ErrorWrap(r, "recovered from panic") c.stopForError(err) } } diff --git a/p2p/conn/secret_connection.go b/p2p/conn/secret_connection.go index aa6db05bb..3495853e0 100644 --- a/p2p/conn/secret_connection.go +++ b/p2p/conn/secret_connection.go @@ -20,8 +20,8 @@ import ( "golang.org/x/crypto/nacl/secretbox" "golang.org/x/crypto/ripemd160" - "github.com/tendermint/go-crypto" - "github.com/tendermint/go-wire" + crypto "github.com/tendermint/go-crypto" + wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" ) @@ -200,26 +200,36 @@ func genEphKeys() (ephPub, ephPriv *[32]byte) { } func shareEphPubKey(conn io.ReadWriteCloser, locEphPub *[32]byte) (remEphPub *[32]byte, err error) { - var err1, err2 error - - cmn.Parallel( - func() { - _, err1 = conn.Write(locEphPub[:]) + // Send our pubkey and receive theirs in tandem. + var trs, _ = cmn.Parallel( + func(_ int) (val interface{}, err error, abort bool) { + var _, err1 = conn.Write(locEphPub[:]) + if err1 != nil { + return nil, err1, true // abort + } else { + return nil, nil, false + } }, - func() { - remEphPub = new([32]byte) - _, err2 = io.ReadFull(conn, remEphPub[:]) + func(_ int) (val interface{}, err error, abort bool) { + var _remEphPub [32]byte + var _, err2 = io.ReadFull(conn, _remEphPub[:]) + if err2 != nil { + return nil, err2, true // abort + } else { + return _remEphPub, nil, false + } }, ) - if err1 != nil { - return nil, err1 - } - if err2 != nil { - return nil, err2 + // If error: + if trs.FirstError() != nil { + err = trs.FirstError() + return } - return remEphPub, nil + // Otherwise: + var _remEphPub = trs.FirstValue().([32]byte) + return &_remEphPub, nil } func computeSharedSecret(remPubKey, locPrivKey *[32]byte) (shrSecret *[32]byte) { @@ -268,33 +278,42 @@ type authSigMessage struct { Sig crypto.Signature } -func shareAuthSignature(sc *SecretConnection, pubKey crypto.PubKey, signature crypto.Signature) (*authSigMessage, error) { - var recvMsg authSigMessage - var err1, err2 error - - cmn.Parallel( - func() { +func shareAuthSignature(sc *SecretConnection, pubKey crypto.PubKey, signature crypto.Signature) (recvMsg *authSigMessage, err error) { + // Send our info and receive theirs in tandem. + var trs, _ = cmn.Parallel( + func(_ int) (val interface{}, err error, abort bool) { msgBytes := wire.BinaryBytes(authSigMessage{pubKey.Wrap(), signature.Wrap()}) - _, err1 = sc.Write(msgBytes) + var _, err1 = sc.Write(msgBytes) + if err1 != nil { + return nil, err1, true // abort + } else { + return nil, nil, false + } }, - func() { + func(_ int) (val interface{}, err error, abort bool) { readBuffer := make([]byte, authSigMsgSize) - _, err2 = io.ReadFull(sc, readBuffer) + var _, err2 = io.ReadFull(sc, readBuffer) if err2 != nil { - return + return nil, err2, true // abort } n := int(0) // not used. - recvMsg = wire.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), authSigMsgSize, &n, &err2).(authSigMessage) - }) + var _recvMsg = wire.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), authSigMsgSize, &n, &err2).(authSigMessage) + if err2 != nil { + return nil, err2, true // abort + } else { + return _recvMsg, nil, false + } + }, + ) - if err1 != nil { - return nil, err1 - } - if err2 != nil { - return nil, err2 + // If error: + if trs.FirstError() != nil { + err = trs.FirstError() + return } - return &recvMsg, nil + var _recvMsg = trs.FirstValue().(authSigMessage) + return &_recvMsg, nil } //-------------------------------------------------------------------------------- diff --git a/p2p/conn/secret_connection_test.go b/p2p/conn/secret_connection_test.go index c81b3b285..54c453a7c 100644 --- a/p2p/conn/secret_connection_test.go +++ b/p2p/conn/secret_connection_test.go @@ -1,9 +1,12 @@ package conn import ( + "fmt" "io" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" crypto "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" ) @@ -36,33 +39,41 @@ func makeSecretConnPair(tb testing.TB) (fooSecConn, barSecConn *SecretConnection barPrvKey := crypto.GenPrivKeyEd25519().Wrap() barPubKey := barPrvKey.PubKey() - cmn.Parallel( - func() { - var err error + var trs, ok = cmn.Parallel( + func(_ int) (val interface{}, err error, abort bool) { fooSecConn, err = MakeSecretConnection(fooConn, fooPrvKey) if err != nil { tb.Errorf("Failed to establish SecretConnection for foo: %v", err) - return + return nil, err, true } remotePubBytes := fooSecConn.RemotePubKey() if !remotePubBytes.Equals(barPubKey) { - tb.Errorf("Unexpected fooSecConn.RemotePubKey. Expected %v, got %v", + err = fmt.Errorf("Unexpected fooSecConn.RemotePubKey. Expected %v, got %v", barPubKey, fooSecConn.RemotePubKey()) + tb.Error(err) + return nil, err, false } + return nil, nil, false }, - func() { - var err error + func(_ int) (val interface{}, err error, abort bool) { barSecConn, err = MakeSecretConnection(barConn, barPrvKey) if barSecConn == nil { tb.Errorf("Failed to establish SecretConnection for bar: %v", err) - return + return nil, err, true } remotePubBytes := barSecConn.RemotePubKey() if !remotePubBytes.Equals(fooPubKey) { - tb.Errorf("Unexpected barSecConn.RemotePubKey. Expected %v, got %v", + err = fmt.Errorf("Unexpected barSecConn.RemotePubKey. Expected %v, got %v", fooPubKey, barSecConn.RemotePubKey()) + tb.Error(err) + return nil, nil, false } - }) + return nil, nil, false + }, + ) + + require.Nil(tb, trs.FirstError()) + require.True(tb, ok, "Unexpected task abortion") return } @@ -89,59 +100,76 @@ func TestSecretConnectionReadWrite(t *testing.T) { } // A helper that will run with (fooConn, fooWrites, fooReads) and vice versa - genNodeRunner := func(nodeConn kvstoreConn, nodeWrites []string, nodeReads *[]string) func() { - return func() { + genNodeRunner := func(nodeConn kvstoreConn, nodeWrites []string, nodeReads *[]string) cmn.Task { + return func(_ int) (interface{}, error, bool) { // Node handskae nodePrvKey := crypto.GenPrivKeyEd25519().Wrap() nodeSecretConn, err := MakeSecretConnection(nodeConn, nodePrvKey) if err != nil { t.Errorf("Failed to establish SecretConnection for node: %v", err) - return + return nil, err, true } - // In parallel, handle reads and writes - cmn.Parallel( - func() { + // In parallel, handle some reads and writes. + var trs, ok = cmn.Parallel( + func(_ int) (interface{}, error, bool) { // Node writes for _, nodeWrite := range nodeWrites { n, err := nodeSecretConn.Write([]byte(nodeWrite)) if err != nil { t.Errorf("Failed to write to nodeSecretConn: %v", err) - return + return nil, err, true } if n != len(nodeWrite) { - t.Errorf("Failed to write all bytes. Expected %v, wrote %v", len(nodeWrite), n) - return + err = fmt.Errorf("Failed to write all bytes. Expected %v, wrote %v", len(nodeWrite), n) + t.Error(err) + return nil, err, true } } if err := nodeConn.PipeWriter.Close(); err != nil { t.Error(err) + return nil, err, true } + return nil, nil, false }, - func() { + func(_ int) (interface{}, error, bool) { // Node reads readBuffer := make([]byte, dataMaxSize) for { n, err := nodeSecretConn.Read(readBuffer) if err == io.EOF { - return + return nil, nil, false } else if err != nil { t.Errorf("Failed to read from nodeSecretConn: %v", err) - return + return nil, err, true } *nodeReads = append(*nodeReads, string(readBuffer[:n])) } if err := nodeConn.PipeReader.Close(); err != nil { t.Error(err) + return nil, err, true } - }) + return nil, nil, false + }, + ) + assert.True(t, ok, "Unexpected task abortion") + + // If error: + if trs.FirstError() != nil { + return nil, trs.FirstError(), true + } + + // Otherwise: + return nil, nil, false } } // Run foo & bar in parallel - cmn.Parallel( + var trs, ok = cmn.Parallel( genNodeRunner(fooConn, fooWrites, &fooReads), genNodeRunner(barConn, barWrites, &barReads), ) + require.Nil(t, trs.FirstError()) + require.True(t, ok, "unexpected task abortion") // A helper to ensure that the writes and reads match. // Additionally, small writes (<= dataMaxSize) must be atomically read. diff --git a/p2p/peer.go b/p2p/peer.go index 4af6eeaae..0fa7ca034 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -293,22 +293,20 @@ func (pc *peerConn) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration return peerNodeInfo, errors.Wrap(err, "Error setting deadline") } - var err1 error - var err2 error - cmn.Parallel( - func() { + var trs, _ = cmn.Parallel( + func(_ int) (val interface{}, err error, abort bool) { var n int - wire.WriteBinary(&ourNodeInfo, pc.conn, &n, &err1) + wire.WriteBinary(&ourNodeInfo, pc.conn, &n, &err) + return }, - func() { + func(_ int) (val interface{}, err error, abort bool) { var n int - wire.ReadBinary(&peerNodeInfo, pc.conn, MaxNodeInfoSize(), &n, &err2) - }) - if err1 != nil { - return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write") - } - if err2 != nil { - return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read") + wire.ReadBinary(&peerNodeInfo, pc.conn, MaxNodeInfoSize(), &n, &err) + return + }, + ) + if err := trs.FirstError(); err != nil { + return peerNodeInfo, errors.Wrap(err, "Error during handshake") } // Remove deadline