You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

235 lines
6.2 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "sync"
  5. "testing"
  6. "time"
  7. acm "github.com/tendermint/tendermint/account"
  8. "github.com/tendermint/tendermint/wire"
  9. . "github.com/tendermint/tendermint/common"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. type PeerMessage struct {
  13. PeerKey string
  14. Bytes []byte
  15. Counter int
  16. }
  17. type TestReactor struct {
  18. BaseReactor
  19. mtx sync.Mutex
  20. channels []*ChannelDescriptor
  21. peersAdded []*Peer
  22. peersRemoved []*Peer
  23. logMessages bool
  24. msgsCounter int
  25. msgsReceived map[byte][]PeerMessage
  26. }
  27. func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
  28. tr := &TestReactor{
  29. channels: channels,
  30. logMessages: logMessages,
  31. msgsReceived: make(map[byte][]PeerMessage),
  32. }
  33. tr.BaseReactor = *NewBaseReactor(log, "TestReactor", tr)
  34. return tr
  35. }
  36. func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
  37. return tr.channels
  38. }
  39. func (tr *TestReactor) AddPeer(peer *Peer) {
  40. tr.mtx.Lock()
  41. defer tr.mtx.Unlock()
  42. tr.peersAdded = append(tr.peersAdded, peer)
  43. }
  44. func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
  45. tr.mtx.Lock()
  46. defer tr.mtx.Unlock()
  47. tr.peersRemoved = append(tr.peersRemoved, peer)
  48. }
  49. func (tr *TestReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {
  50. if tr.logMessages {
  51. tr.mtx.Lock()
  52. defer tr.mtx.Unlock()
  53. //fmt.Printf("Received: %X, %X\n", chId, msgBytes)
  54. tr.msgsReceived[chId] = append(tr.msgsReceived[chId], PeerMessage{peer.Key, msgBytes, tr.msgsCounter})
  55. tr.msgsCounter++
  56. }
  57. }
  58. //-----------------------------------------------------------------------------
  59. // convenience method for creating two switches connected to each other.
  60. func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *Switch) {
  61. s1PrivKey := acm.GenPrivKeyEd25519()
  62. s2PrivKey := acm.GenPrivKeyEd25519()
  63. // Create two switches that will be interconnected.
  64. s1 := initSwitch(NewSwitch())
  65. s1.SetNodeInfo(&types.NodeInfo{
  66. PubKey: s1PrivKey.PubKey().(acm.PubKeyEd25519),
  67. Moniker: "switch1",
  68. ChainID: "testing",
  69. Version: "123.123.123",
  70. })
  71. s1.SetNodePrivKey(s1PrivKey)
  72. s2 := initSwitch(NewSwitch())
  73. s2.SetNodeInfo(&types.NodeInfo{
  74. PubKey: s2PrivKey.PubKey().(acm.PubKeyEd25519),
  75. Moniker: "switch2",
  76. ChainID: "testing",
  77. Version: "123.123.123",
  78. })
  79. s2.SetNodePrivKey(s2PrivKey)
  80. // Start switches and reactors
  81. s1.Start()
  82. s2.Start()
  83. // Create a listener for s1
  84. l := NewDefaultListener("tcp", ":8001", true)
  85. // Dial the listener & add the connection to s2.
  86. lAddr := l.ExternalAddress()
  87. connOut, err := lAddr.Dial()
  88. if err != nil {
  89. t.Fatalf("Could not connect to listener address %v", lAddr)
  90. } else {
  91. t.Logf("Created a connection to listener address %v", lAddr)
  92. }
  93. connIn, ok := <-l.Connections()
  94. if !ok {
  95. t.Fatalf("Could not get inbound connection from listener")
  96. }
  97. go s1.AddPeerWithConnection(connIn, false) // AddPeer is blocking, requires handshake.
  98. s2.AddPeerWithConnection(connOut, true)
  99. // Wait for things to happen, peers to get added...
  100. time.Sleep(100 * time.Millisecond)
  101. // Close the server, no longer needed.
  102. l.Stop()
  103. return s1, s2
  104. }
  105. func TestSwitches(t *testing.T) {
  106. s1, s2 := makeSwitchPair(t, func(sw *Switch) *Switch {
  107. // Make two reactors of two channels each
  108. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  109. &ChannelDescriptor{Id: byte(0x00), Priority: 10},
  110. &ChannelDescriptor{Id: byte(0x01), Priority: 10},
  111. }, true))
  112. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  113. &ChannelDescriptor{Id: byte(0x02), Priority: 10},
  114. &ChannelDescriptor{Id: byte(0x03), Priority: 10},
  115. }, true))
  116. return sw
  117. })
  118. defer s1.Stop()
  119. defer s2.Stop()
  120. // Lets send a message from s1 to s2.
  121. if s1.Peers().Size() != 1 {
  122. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  123. }
  124. if s2.Peers().Size() != 1 {
  125. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  126. }
  127. ch0Msg := "channel zero"
  128. ch1Msg := "channel foo"
  129. ch2Msg := "channel bar"
  130. s1.Broadcast(byte(0x00), ch0Msg)
  131. s1.Broadcast(byte(0x01), ch1Msg)
  132. s1.Broadcast(byte(0x02), ch2Msg)
  133. // Wait for things to settle...
  134. time.Sleep(5000 * time.Millisecond)
  135. // Check message on ch0
  136. ch0Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x00)]
  137. if len(ch0Msgs) != 1 {
  138. t.Errorf("Expected to have received 1 message in ch0")
  139. }
  140. if !bytes.Equal(ch0Msgs[0].Bytes, wire.BinaryBytes(ch0Msg)) {
  141. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes)
  142. }
  143. // Check message on ch1
  144. ch1Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x01)]
  145. if len(ch1Msgs) != 1 {
  146. t.Errorf("Expected to have received 1 message in ch1")
  147. }
  148. if !bytes.Equal(ch1Msgs[0].Bytes, wire.BinaryBytes(ch1Msg)) {
  149. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch1Msg), ch1Msgs[0].Bytes)
  150. }
  151. // Check message on ch2
  152. ch2Msgs := s2.Reactor("bar").(*TestReactor).msgsReceived[byte(0x02)]
  153. if len(ch2Msgs) != 1 {
  154. t.Errorf("Expected to have received 1 message in ch2")
  155. }
  156. if !bytes.Equal(ch2Msgs[0].Bytes, wire.BinaryBytes(ch2Msg)) {
  157. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch2Msg), ch2Msgs[0].Bytes)
  158. }
  159. }
  160. func BenchmarkSwitches(b *testing.B) {
  161. b.StopTimer()
  162. s1, s2 := makeSwitchPair(b, func(sw *Switch) *Switch {
  163. // Make bar reactors of bar channels each
  164. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  165. &ChannelDescriptor{Id: byte(0x00), Priority: 10},
  166. &ChannelDescriptor{Id: byte(0x01), Priority: 10},
  167. }, false))
  168. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  169. &ChannelDescriptor{Id: byte(0x02), Priority: 10},
  170. &ChannelDescriptor{Id: byte(0x03), Priority: 10},
  171. }, false))
  172. return sw
  173. })
  174. defer s1.Stop()
  175. defer s2.Stop()
  176. // Allow time for goroutines to boot up
  177. time.Sleep(1000 * time.Millisecond)
  178. b.StartTimer()
  179. numSuccess, numFailure := 0, 0
  180. // Send random message from foo channel to another
  181. for i := 0; i < b.N; i++ {
  182. chId := byte(i % 4)
  183. successChan := s1.Broadcast(chId, "test data")
  184. for s := range successChan {
  185. if s {
  186. numSuccess += 1
  187. } else {
  188. numFailure += 1
  189. }
  190. }
  191. }
  192. log.Warn(Fmt("success: %v, failure: %v", numSuccess, numFailure))
  193. // Allow everything to flush before stopping switches & closing connections.
  194. b.StopTimer()
  195. time.Sleep(1000 * time.Millisecond)
  196. }