You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

245 lines
6.3 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "sync"
  5. "testing"
  6. "time"
  7. . "github.com/tendermint/go-common"
  8. cfg "github.com/tendermint/go-config"
  9. "github.com/tendermint/go-crypto"
  10. "github.com/tendermint/go-wire"
  11. )
  12. var (
  13. config cfg.Config
  14. )
  15. func init() {
  16. config = cfg.NewMapConfig(nil)
  17. setConfigDefaults(config)
  18. }
  19. type PeerMessage struct {
  20. PeerKey string
  21. Bytes []byte
  22. Counter int
  23. }
  24. type TestReactor struct {
  25. BaseReactor
  26. mtx sync.Mutex
  27. channels []*ChannelDescriptor
  28. peersAdded []*Peer
  29. peersRemoved []*Peer
  30. logMessages bool
  31. msgsCounter int
  32. msgsReceived map[byte][]PeerMessage
  33. }
  34. func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
  35. tr := &TestReactor{
  36. channels: channels,
  37. logMessages: logMessages,
  38. msgsReceived: make(map[byte][]PeerMessage),
  39. }
  40. tr.BaseReactor = *NewBaseReactor(log, "TestReactor", tr)
  41. return tr
  42. }
  43. func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
  44. return tr.channels
  45. }
  46. func (tr *TestReactor) AddPeer(peer *Peer) {
  47. tr.mtx.Lock()
  48. defer tr.mtx.Unlock()
  49. tr.peersAdded = append(tr.peersAdded, peer)
  50. }
  51. func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
  52. tr.mtx.Lock()
  53. defer tr.mtx.Unlock()
  54. tr.peersRemoved = append(tr.peersRemoved, peer)
  55. }
  56. func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
  57. if tr.logMessages {
  58. tr.mtx.Lock()
  59. defer tr.mtx.Unlock()
  60. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  61. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key, msgBytes, tr.msgsCounter})
  62. tr.msgsCounter++
  63. }
  64. }
  65. //-----------------------------------------------------------------------------
  66. // convenience method for creating two switches connected to each other.
  67. func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *Switch) {
  68. s1PrivKey := crypto.GenPrivKeyEd25519()
  69. s2PrivKey := crypto.GenPrivKeyEd25519()
  70. // Create two switches that will be interconnected.
  71. s1 := initSwitch(NewSwitch(config))
  72. s1.SetNodeInfo(&NodeInfo{
  73. PubKey: s1PrivKey.PubKey().(crypto.PubKeyEd25519),
  74. Moniker: "switch1",
  75. Network: "testing",
  76. Version: "123.123.123",
  77. })
  78. s1.SetNodePrivKey(s1PrivKey)
  79. s2 := initSwitch(NewSwitch(config))
  80. s2.SetNodeInfo(&NodeInfo{
  81. PubKey: s2PrivKey.PubKey().(crypto.PubKeyEd25519),
  82. Moniker: "switch2",
  83. Network: "testing",
  84. Version: "123.123.123",
  85. })
  86. s2.SetNodePrivKey(s2PrivKey)
  87. // Start switches and reactors
  88. s1.Start()
  89. s2.Start()
  90. // Create a listener for s1
  91. l := NewDefaultListener("tcp", ":8001", true)
  92. // Dial the listener & add the connection to s2.
  93. lAddr := l.ExternalAddress()
  94. connOut, err := lAddr.Dial()
  95. if err != nil {
  96. t.Fatalf("Could not connect to listener address %v", lAddr)
  97. } else {
  98. t.Logf("Created a connection to listener address %v", lAddr)
  99. }
  100. connIn, ok := <-l.Connections()
  101. if !ok {
  102. t.Fatalf("Could not get inbound connection from listener")
  103. }
  104. go s1.AddPeerWithConnection(connIn, false) // AddPeer is blocking, requires handshake.
  105. s2.AddPeerWithConnection(connOut, true)
  106. // Wait for things to happen, peers to get added...
  107. time.Sleep(100 * time.Millisecond)
  108. // Close the server, no longer needed.
  109. l.Stop()
  110. return s1, s2
  111. }
  112. func TestSwitches(t *testing.T) {
  113. s1, s2 := makeSwitchPair(t, func(sw *Switch) *Switch {
  114. // Make two reactors of two channels each
  115. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  116. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  117. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  118. }, true))
  119. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  120. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  121. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  122. }, true))
  123. return sw
  124. })
  125. defer s1.Stop()
  126. defer s2.Stop()
  127. // Lets send a message from s1 to s2.
  128. if s1.Peers().Size() != 1 {
  129. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  130. }
  131. if s2.Peers().Size() != 1 {
  132. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  133. }
  134. ch0Msg := "channel zero"
  135. ch1Msg := "channel foo"
  136. ch2Msg := "channel bar"
  137. s1.Broadcast(byte(0x00), ch0Msg)
  138. s1.Broadcast(byte(0x01), ch1Msg)
  139. s1.Broadcast(byte(0x02), ch2Msg)
  140. // Wait for things to settle...
  141. time.Sleep(5000 * time.Millisecond)
  142. // Check message on ch0
  143. ch0Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x00)]
  144. if len(ch0Msgs) != 1 {
  145. t.Errorf("Expected to have received 1 message in ch0")
  146. }
  147. if !bytes.Equal(ch0Msgs[0].Bytes, wire.BinaryBytes(ch0Msg)) {
  148. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes)
  149. }
  150. // Check message on ch1
  151. ch1Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x01)]
  152. if len(ch1Msgs) != 1 {
  153. t.Errorf("Expected to have received 1 message in ch1")
  154. }
  155. if !bytes.Equal(ch1Msgs[0].Bytes, wire.BinaryBytes(ch1Msg)) {
  156. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch1Msg), ch1Msgs[0].Bytes)
  157. }
  158. // Check message on ch2
  159. ch2Msgs := s2.Reactor("bar").(*TestReactor).msgsReceived[byte(0x02)]
  160. if len(ch2Msgs) != 1 {
  161. t.Errorf("Expected to have received 1 message in ch2")
  162. }
  163. if !bytes.Equal(ch2Msgs[0].Bytes, wire.BinaryBytes(ch2Msg)) {
  164. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch2Msg), ch2Msgs[0].Bytes)
  165. }
  166. }
  167. func BenchmarkSwitches(b *testing.B) {
  168. b.StopTimer()
  169. s1, s2 := makeSwitchPair(b, func(sw *Switch) *Switch {
  170. // Make bar reactors of bar channels each
  171. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  172. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  173. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  174. }, false))
  175. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  176. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  177. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  178. }, false))
  179. return sw
  180. })
  181. defer s1.Stop()
  182. defer s2.Stop()
  183. // Allow time for goroutines to boot up
  184. time.Sleep(1000 * time.Millisecond)
  185. b.StartTimer()
  186. numSuccess, numFailure := 0, 0
  187. // Send random message from foo channel to another
  188. for i := 0; i < b.N; i++ {
  189. chID := byte(i % 4)
  190. successChan := s1.Broadcast(chID, "test data")
  191. for s := range successChan {
  192. if s {
  193. numSuccess += 1
  194. } else {
  195. numFailure += 1
  196. }
  197. }
  198. }
  199. log.Warn(Fmt("success: %v, failure: %v", numSuccess, numFailure))
  200. // Allow everything to flush before stopping switches & closing connections.
  201. b.StopTimer()
  202. time.Sleep(1000 * time.Millisecond)
  203. }