|
|
- package conn
-
- import (
- "fmt"
- "io"
- "testing"
-
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- crypto "github.com/tendermint/go-crypto"
- cmn "github.com/tendermint/tmlibs/common"
- )
-
- type kvstoreConn struct {
- *io.PipeReader
- *io.PipeWriter
- }
-
- func (drw kvstoreConn) Close() (err error) {
- err2 := drw.PipeWriter.CloseWithError(io.EOF)
- err1 := drw.PipeReader.Close()
- if err2 != nil {
- return err
- }
- return err1
- }
-
- // Each returned ReadWriteCloser is akin to a net.Connection
- func makeKVStoreConnPair() (fooConn, barConn kvstoreConn) {
- barReader, fooWriter := io.Pipe()
- fooReader, barWriter := io.Pipe()
- return kvstoreConn{fooReader, fooWriter}, kvstoreConn{barReader, barWriter}
- }
-
- func makeSecretConnPair(tb testing.TB) (fooSecConn, barSecConn *SecretConnection) {
- fooConn, barConn := makeKVStoreConnPair()
- fooPrvKey := crypto.GenPrivKeyEd25519().Wrap()
- fooPubKey := fooPrvKey.PubKey()
- barPrvKey := crypto.GenPrivKeyEd25519().Wrap()
- barPubKey := barPrvKey.PubKey()
-
- var trs, ok = cmn.Parallel(
- func(_ int) (val interface{}, err error, abort bool) {
- fooSecConn, err = MakeSecretConnection(fooConn, fooPrvKey)
- if err != nil {
- tb.Errorf("Failed to establish SecretConnection for foo: %v", err)
- return nil, err, true
- }
- remotePubBytes := fooSecConn.RemotePubKey()
- if !remotePubBytes.Equals(barPubKey) {
- err = fmt.Errorf("Unexpected fooSecConn.RemotePubKey. Expected %v, got %v",
- barPubKey, fooSecConn.RemotePubKey())
- tb.Error(err)
- return nil, err, false
- }
- return nil, nil, false
- },
- func(_ int) (val interface{}, err error, abort bool) {
- barSecConn, err = MakeSecretConnection(barConn, barPrvKey)
- if barSecConn == nil {
- tb.Errorf("Failed to establish SecretConnection for bar: %v", err)
- return nil, err, true
- }
- remotePubBytes := barSecConn.RemotePubKey()
- if !remotePubBytes.Equals(fooPubKey) {
- err = fmt.Errorf("Unexpected barSecConn.RemotePubKey. Expected %v, got %v",
- fooPubKey, barSecConn.RemotePubKey())
- tb.Error(err)
- return nil, nil, false
- }
- return nil, nil, false
- },
- )
-
- require.Nil(tb, trs.FirstError())
- require.True(tb, ok, "Unexpected task abortion")
-
- return
- }
-
- func TestSecretConnectionHandshake(t *testing.T) {
- fooSecConn, barSecConn := makeSecretConnPair(t)
- if err := fooSecConn.Close(); err != nil {
- t.Error(err)
- }
- if err := barSecConn.Close(); err != nil {
- t.Error(err)
- }
- }
-
- func TestSecretConnectionReadWrite(t *testing.T) {
- fooConn, barConn := makeKVStoreConnPair()
- fooWrites, barWrites := []string{}, []string{}
- fooReads, barReads := []string{}, []string{}
-
- // Pre-generate the things to write (for foo & bar)
- for i := 0; i < 100; i++ {
- fooWrites = append(fooWrites, cmn.RandStr((cmn.RandInt()%(dataMaxSize*5))+1))
- barWrites = append(barWrites, cmn.RandStr((cmn.RandInt()%(dataMaxSize*5))+1))
- }
-
- // A helper that will run with (fooConn, fooWrites, fooReads) and vice versa
- genNodeRunner := func(nodeConn kvstoreConn, nodeWrites []string, nodeReads *[]string) cmn.Task {
- return func(_ int) (interface{}, error, bool) {
- // Node handskae
- nodePrvKey := crypto.GenPrivKeyEd25519().Wrap()
- nodeSecretConn, err := MakeSecretConnection(nodeConn, nodePrvKey)
- if err != nil {
- t.Errorf("Failed to establish SecretConnection for node: %v", err)
- return nil, err, true
- }
- // In parallel, handle some reads and writes.
- var trs, ok = cmn.Parallel(
- func(_ int) (interface{}, error, bool) {
- // Node writes
- for _, nodeWrite := range nodeWrites {
- n, err := nodeSecretConn.Write([]byte(nodeWrite))
- if err != nil {
- t.Errorf("Failed to write to nodeSecretConn: %v", err)
- return nil, err, true
- }
- if n != len(nodeWrite) {
- err = fmt.Errorf("Failed to write all bytes. Expected %v, wrote %v", len(nodeWrite), n)
- t.Error(err)
- return nil, err, true
- }
- }
- if err := nodeConn.PipeWriter.Close(); err != nil {
- t.Error(err)
- return nil, err, true
- }
- return nil, nil, false
- },
- func(_ int) (interface{}, error, bool) {
- // Node reads
- readBuffer := make([]byte, dataMaxSize)
- for {
- n, err := nodeSecretConn.Read(readBuffer)
- if err == io.EOF {
- return nil, nil, false
- } else if err != nil {
- t.Errorf("Failed to read from nodeSecretConn: %v", err)
- return nil, err, true
- }
- *nodeReads = append(*nodeReads, string(readBuffer[:n]))
- }
- if err := nodeConn.PipeReader.Close(); err != nil {
- t.Error(err)
- return nil, err, true
- }
- return nil, nil, false
- },
- )
- assert.True(t, ok, "Unexpected task abortion")
-
- // If error:
- if trs.FirstError() != nil {
- return nil, trs.FirstError(), true
- }
-
- // Otherwise:
- return nil, nil, false
- }
- }
-
- // Run foo & bar in parallel
- var trs, ok = cmn.Parallel(
- genNodeRunner(fooConn, fooWrites, &fooReads),
- genNodeRunner(barConn, barWrites, &barReads),
- )
- require.Nil(t, trs.FirstError())
- require.True(t, ok, "unexpected task abortion")
-
- // A helper to ensure that the writes and reads match.
- // Additionally, small writes (<= dataMaxSize) must be atomically read.
- compareWritesReads := func(writes []string, reads []string) {
- for {
- // Pop next write & corresponding reads
- var read, write string = "", writes[0]
- var readCount = 0
- for _, readChunk := range reads {
- read += readChunk
- readCount++
- if len(write) <= len(read) {
- break
- }
- if len(write) <= dataMaxSize {
- break // atomicity of small writes
- }
- }
- // Compare
- if write != read {
- t.Errorf("Expected to read %X, got %X", write, read)
- }
- // Iterate
- writes = writes[1:]
- reads = reads[readCount:]
- if len(writes) == 0 {
- break
- }
- }
- }
-
- compareWritesReads(fooWrites, barReads)
- compareWritesReads(barWrites, fooReads)
-
- }
-
- func BenchmarkSecretConnection(b *testing.B) {
- b.StopTimer()
- fooSecConn, barSecConn := makeSecretConnPair(b)
- fooWriteText := cmn.RandStr(dataMaxSize)
- // Consume reads from bar's reader
- go func() {
- readBuffer := make([]byte, dataMaxSize)
- for {
- _, err := barSecConn.Read(readBuffer)
- if err == io.EOF {
- return
- } else if err != nil {
- b.Fatalf("Failed to read from barSecConn: %v", err)
- }
- }
- }()
-
- b.StartTimer()
- for i := 0; i < b.N; i++ {
- _, err := fooSecConn.Write([]byte(fooWriteText))
- if err != nil {
- b.Fatalf("Failed to write to fooSecConn: %v", err)
- }
- }
- b.StopTimer()
-
- if err := fooSecConn.Close(); err != nil {
- b.Error(err)
- }
- //barSecConn.Close() race condition
- }
|