You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

239 lines
6.4 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package conn
  2. import (
  3. "fmt"
  4. "io"
  5. "testing"
  6. "github.com/stretchr/testify/assert"
  7. "github.com/stretchr/testify/require"
  8. crypto "github.com/tendermint/go-crypto"
  9. cmn "github.com/tendermint/tmlibs/common"
  10. )
  11. type kvstoreConn struct {
  12. *io.PipeReader
  13. *io.PipeWriter
  14. }
  15. func (drw kvstoreConn) Close() (err error) {
  16. err2 := drw.PipeWriter.CloseWithError(io.EOF)
  17. err1 := drw.PipeReader.Close()
  18. if err2 != nil {
  19. return err
  20. }
  21. return err1
  22. }
  23. // Each returned ReadWriteCloser is akin to a net.Connection
  24. func makeKVStoreConnPair() (fooConn, barConn kvstoreConn) {
  25. barReader, fooWriter := io.Pipe()
  26. fooReader, barWriter := io.Pipe()
  27. return kvstoreConn{fooReader, fooWriter}, kvstoreConn{barReader, barWriter}
  28. }
  29. func makeSecretConnPair(tb testing.TB) (fooSecConn, barSecConn *SecretConnection) {
  30. fooConn, barConn := makeKVStoreConnPair()
  31. fooPrvKey := crypto.GenPrivKeyEd25519().Wrap()
  32. fooPubKey := fooPrvKey.PubKey()
  33. barPrvKey := crypto.GenPrivKeyEd25519().Wrap()
  34. barPubKey := barPrvKey.PubKey()
  35. var trs, ok = cmn.Parallel(
  36. func(_ int) (val interface{}, err error, abort bool) {
  37. fooSecConn, err = MakeSecretConnection(fooConn, fooPrvKey)
  38. if err != nil {
  39. tb.Errorf("Failed to establish SecretConnection for foo: %v", err)
  40. return nil, err, true
  41. }
  42. remotePubBytes := fooSecConn.RemotePubKey()
  43. if !remotePubBytes.Equals(barPubKey) {
  44. err = fmt.Errorf("Unexpected fooSecConn.RemotePubKey. Expected %v, got %v",
  45. barPubKey, fooSecConn.RemotePubKey())
  46. tb.Error(err)
  47. return nil, err, false
  48. }
  49. return nil, nil, false
  50. },
  51. func(_ int) (val interface{}, err error, abort bool) {
  52. barSecConn, err = MakeSecretConnection(barConn, barPrvKey)
  53. if barSecConn == nil {
  54. tb.Errorf("Failed to establish SecretConnection for bar: %v", err)
  55. return nil, err, true
  56. }
  57. remotePubBytes := barSecConn.RemotePubKey()
  58. if !remotePubBytes.Equals(fooPubKey) {
  59. err = fmt.Errorf("Unexpected barSecConn.RemotePubKey. Expected %v, got %v",
  60. fooPubKey, barSecConn.RemotePubKey())
  61. tb.Error(err)
  62. return nil, nil, false
  63. }
  64. return nil, nil, false
  65. },
  66. )
  67. require.Nil(tb, trs.FirstError())
  68. require.True(tb, ok, "Unexpected task abortion")
  69. return
  70. }
  71. func TestSecretConnectionHandshake(t *testing.T) {
  72. fooSecConn, barSecConn := makeSecretConnPair(t)
  73. if err := fooSecConn.Close(); err != nil {
  74. t.Error(err)
  75. }
  76. if err := barSecConn.Close(); err != nil {
  77. t.Error(err)
  78. }
  79. }
  80. func TestSecretConnectionReadWrite(t *testing.T) {
  81. fooConn, barConn := makeKVStoreConnPair()
  82. fooWrites, barWrites := []string{}, []string{}
  83. fooReads, barReads := []string{}, []string{}
  84. // Pre-generate the things to write (for foo & bar)
  85. for i := 0; i < 100; i++ {
  86. fooWrites = append(fooWrites, cmn.RandStr((cmn.RandInt()%(dataMaxSize*5))+1))
  87. barWrites = append(barWrites, cmn.RandStr((cmn.RandInt()%(dataMaxSize*5))+1))
  88. }
  89. // A helper that will run with (fooConn, fooWrites, fooReads) and vice versa
  90. genNodeRunner := func(nodeConn kvstoreConn, nodeWrites []string, nodeReads *[]string) cmn.Task {
  91. return func(_ int) (interface{}, error, bool) {
  92. // Node handskae
  93. nodePrvKey := crypto.GenPrivKeyEd25519().Wrap()
  94. nodeSecretConn, err := MakeSecretConnection(nodeConn, nodePrvKey)
  95. if err != nil {
  96. t.Errorf("Failed to establish SecretConnection for node: %v", err)
  97. return nil, err, true
  98. }
  99. // In parallel, handle some reads and writes.
  100. var trs, ok = cmn.Parallel(
  101. func(_ int) (interface{}, error, bool) {
  102. // Node writes
  103. for _, nodeWrite := range nodeWrites {
  104. n, err := nodeSecretConn.Write([]byte(nodeWrite))
  105. if err != nil {
  106. t.Errorf("Failed to write to nodeSecretConn: %v", err)
  107. return nil, err, true
  108. }
  109. if n != len(nodeWrite) {
  110. err = fmt.Errorf("Failed to write all bytes. Expected %v, wrote %v", len(nodeWrite), n)
  111. t.Error(err)
  112. return nil, err, true
  113. }
  114. }
  115. if err := nodeConn.PipeWriter.Close(); err != nil {
  116. t.Error(err)
  117. return nil, err, true
  118. }
  119. return nil, nil, false
  120. },
  121. func(_ int) (interface{}, error, bool) {
  122. // Node reads
  123. readBuffer := make([]byte, dataMaxSize)
  124. for {
  125. n, err := nodeSecretConn.Read(readBuffer)
  126. if err == io.EOF {
  127. return nil, nil, false
  128. } else if err != nil {
  129. t.Errorf("Failed to read from nodeSecretConn: %v", err)
  130. return nil, err, true
  131. }
  132. *nodeReads = append(*nodeReads, string(readBuffer[:n]))
  133. }
  134. if err := nodeConn.PipeReader.Close(); err != nil {
  135. t.Error(err)
  136. return nil, err, true
  137. }
  138. return nil, nil, false
  139. },
  140. )
  141. assert.True(t, ok, "Unexpected task abortion")
  142. // If error:
  143. if trs.FirstError() != nil {
  144. return nil, trs.FirstError(), true
  145. }
  146. // Otherwise:
  147. return nil, nil, false
  148. }
  149. }
  150. // Run foo & bar in parallel
  151. var trs, ok = cmn.Parallel(
  152. genNodeRunner(fooConn, fooWrites, &fooReads),
  153. genNodeRunner(barConn, barWrites, &barReads),
  154. )
  155. require.Nil(t, trs.FirstError())
  156. require.True(t, ok, "unexpected task abortion")
  157. // A helper to ensure that the writes and reads match.
  158. // Additionally, small writes (<= dataMaxSize) must be atomically read.
  159. compareWritesReads := func(writes []string, reads []string) {
  160. for {
  161. // Pop next write & corresponding reads
  162. var read, write string = "", writes[0]
  163. var readCount = 0
  164. for _, readChunk := range reads {
  165. read += readChunk
  166. readCount++
  167. if len(write) <= len(read) {
  168. break
  169. }
  170. if len(write) <= dataMaxSize {
  171. break // atomicity of small writes
  172. }
  173. }
  174. // Compare
  175. if write != read {
  176. t.Errorf("Expected to read %X, got %X", write, read)
  177. }
  178. // Iterate
  179. writes = writes[1:]
  180. reads = reads[readCount:]
  181. if len(writes) == 0 {
  182. break
  183. }
  184. }
  185. }
  186. compareWritesReads(fooWrites, barReads)
  187. compareWritesReads(barWrites, fooReads)
  188. }
  189. func BenchmarkSecretConnection(b *testing.B) {
  190. b.StopTimer()
  191. fooSecConn, barSecConn := makeSecretConnPair(b)
  192. fooWriteText := cmn.RandStr(dataMaxSize)
  193. // Consume reads from bar's reader
  194. go func() {
  195. readBuffer := make([]byte, dataMaxSize)
  196. for {
  197. _, err := barSecConn.Read(readBuffer)
  198. if err == io.EOF {
  199. return
  200. } else if err != nil {
  201. b.Fatalf("Failed to read from barSecConn: %v", err)
  202. }
  203. }
  204. }()
  205. b.StartTimer()
  206. for i := 0; i < b.N; i++ {
  207. _, err := fooSecConn.Write([]byte(fooWriteText))
  208. if err != nil {
  209. b.Fatalf("Failed to write to fooSecConn: %v", err)
  210. }
  211. }
  212. b.StopTimer()
  213. if err := fooSecConn.Close(); err != nil {
  214. b.Error(err)
  215. }
  216. //barSecConn.Close() race condition
  217. }