You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

214 lines
5.6 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "io"
  5. "sync"
  6. "testing"
  7. "time"
  8. . "github.com/tendermint/tendermint/binary"
  9. )
  10. type String string
  11. func (s String) WriteTo(w io.Writer) (n int64, err error) {
  12. WriteString(w, string(s), &n, &err)
  13. return
  14. }
  15. //-----------------------------------------------------------------------------
  16. type PeerMessage struct {
  17. PeerKey string
  18. Bytes []byte
  19. Counter int
  20. }
  21. type TestReactor struct {
  22. mtx sync.Mutex
  23. channels []*ChannelDescriptor
  24. peersAdded []*Peer
  25. peersRemoved []*Peer
  26. logMessages bool
  27. msgsCounter int
  28. msgsReceived map[byte][]PeerMessage
  29. }
  30. func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
  31. return &TestReactor{
  32. channels: channels,
  33. logMessages: logMessages,
  34. msgsReceived: make(map[byte][]PeerMessage),
  35. }
  36. }
  37. func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
  38. return tr.channels
  39. }
  40. func (tr *TestReactor) AddPeer(peer *Peer) {
  41. tr.mtx.Lock()
  42. defer tr.mtx.Unlock()
  43. tr.peersAdded = append(tr.peersAdded, peer)
  44. }
  45. func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
  46. tr.mtx.Lock()
  47. defer tr.mtx.Unlock()
  48. tr.peersRemoved = append(tr.peersRemoved, peer)
  49. }
  50. func (tr *TestReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {
  51. if tr.logMessages {
  52. tr.mtx.Lock()
  53. defer tr.mtx.Unlock()
  54. //fmt.Printf("Received: %X, %X\n", chId, msgBytes)
  55. tr.msgsReceived[chId] = append(tr.msgsReceived[chId], PeerMessage{peer.Key, msgBytes, tr.msgsCounter})
  56. tr.msgsCounter++
  57. }
  58. }
  59. //-----------------------------------------------------------------------------
  60. // convenience method for creating two switches connected to each other.
  61. func makeSwitchPair(t testing.TB, reactorsGenerator func() []Reactor) (*Switch, *Switch) {
  62. // Create two switches that will be interconnected.
  63. s1 := NewSwitch(reactorsGenerator())
  64. s2 := NewSwitch(reactorsGenerator())
  65. // Create a listener for s1
  66. l := NewDefaultListener("tcp", ":8001")
  67. // Dial the listener & add the connection to s2.
  68. lAddr := l.ExternalAddress()
  69. connOut, err := lAddr.Dial()
  70. if err != nil {
  71. t.Fatalf("Could not connect to listener address %v", lAddr)
  72. } else {
  73. t.Logf("Created a connection to listener address %v", lAddr)
  74. }
  75. connIn, ok := <-l.Connections()
  76. if !ok {
  77. t.Fatalf("Could not get inbound connection from listener")
  78. }
  79. s1.AddPeerWithConnection(connIn, false)
  80. s2.AddPeerWithConnection(connOut, true)
  81. // Wait for things to happen, peers to get added...
  82. time.Sleep(100 * time.Millisecond)
  83. // Close the server, no longer needed.
  84. l.Stop()
  85. return s1, s2
  86. }
  87. func TestSwitches(t *testing.T) {
  88. s1, s2 := makeSwitchPair(t, func() []Reactor {
  89. // Make two reactors of two channels each
  90. reactors := make([]Reactor, 2)
  91. reactors[0] = NewTestReactor([]*ChannelDescriptor{
  92. &ChannelDescriptor{Id: byte(0x00), Priority: 10},
  93. &ChannelDescriptor{Id: byte(0x01), Priority: 10},
  94. }, true)
  95. reactors[1] = NewTestReactor([]*ChannelDescriptor{
  96. &ChannelDescriptor{Id: byte(0x02), Priority: 10},
  97. &ChannelDescriptor{Id: byte(0x03), Priority: 10},
  98. }, true)
  99. return reactors
  100. })
  101. defer s1.Stop()
  102. defer s2.Stop()
  103. // Lets send a message from s1 to s2.
  104. if s1.Peers().Size() != 1 {
  105. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  106. }
  107. if s2.Peers().Size() != 1 {
  108. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  109. }
  110. ch0Msg := String("channel zero")
  111. ch1Msg := String("channel one")
  112. ch2Msg := String("channel two")
  113. s1.Broadcast(byte(0x00), ch0Msg)
  114. s1.Broadcast(byte(0x01), ch1Msg)
  115. s1.Broadcast(byte(0x02), ch2Msg)
  116. // Wait for things to settle...
  117. time.Sleep(100 * time.Millisecond)
  118. // Check message on ch0
  119. ch0Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x00)]
  120. if len(ch0Msgs) != 1 {
  121. t.Errorf("Expected to have received 1 message in ch0")
  122. }
  123. if !bytes.Equal(ch0Msgs[0].Bytes, BinaryBytes(ch0Msg)) {
  124. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", BinaryBytes(ch0Msg), ch0Msgs[0].Bytes)
  125. }
  126. // Check message on ch1
  127. ch1Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x01)]
  128. if len(ch1Msgs) != 1 {
  129. t.Errorf("Expected to have received 1 message in ch1")
  130. }
  131. if !bytes.Equal(ch1Msgs[0].Bytes, BinaryBytes(ch1Msg)) {
  132. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", BinaryBytes(ch1Msg), ch1Msgs[0].Bytes)
  133. }
  134. // Check message on ch2
  135. ch2Msgs := s2.Reactors()[1].(*TestReactor).msgsReceived[byte(0x02)]
  136. if len(ch2Msgs) != 1 {
  137. t.Errorf("Expected to have received 1 message in ch2")
  138. }
  139. if !bytes.Equal(ch2Msgs[0].Bytes, BinaryBytes(ch2Msg)) {
  140. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", BinaryBytes(ch2Msg), ch2Msgs[0].Bytes)
  141. }
  142. }
  143. func BenchmarkSwitches(b *testing.B) {
  144. b.StopTimer()
  145. s1, s2 := makeSwitchPair(b, func() []Reactor {
  146. // Make two reactors of two channels each
  147. reactors := make([]Reactor, 2)
  148. reactors[0] = NewTestReactor([]*ChannelDescriptor{
  149. &ChannelDescriptor{Id: byte(0x00), Priority: 10},
  150. &ChannelDescriptor{Id: byte(0x01), Priority: 10},
  151. }, false)
  152. reactors[1] = NewTestReactor([]*ChannelDescriptor{
  153. &ChannelDescriptor{Id: byte(0x02), Priority: 10},
  154. &ChannelDescriptor{Id: byte(0x03), Priority: 10},
  155. }, false)
  156. return reactors
  157. })
  158. defer s1.Stop()
  159. defer s2.Stop()
  160. // Allow time for goroutines to boot up
  161. time.Sleep(1000 * time.Millisecond)
  162. b.StartTimer()
  163. numSuccess, numFailure := 0, 0
  164. // Send random message from one channel to another
  165. for i := 0; i < b.N; i++ {
  166. chId := byte(i % 4)
  167. nS, nF := s1.Broadcast(chId, String("test data"))
  168. numSuccess += nS
  169. numFailure += nF
  170. }
  171. log.Warning("success: %v, failure: %v", numSuccess, numFailure)
  172. // Allow everything to flush before stopping switches & closing connections.
  173. b.StopTimer()
  174. time.Sleep(1000 * time.Millisecond)
  175. }