You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

268 lines
7.1 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "testing"
  8. "time"
  9. . "github.com/tendermint/go-common"
  10. cfg "github.com/tendermint/go-config"
  11. "github.com/tendermint/go-crypto"
  12. "github.com/tendermint/go-wire"
  13. )
  14. var (
  15. config cfg.Config
  16. )
  17. func init() {
  18. config = cfg.NewMapConfig(nil)
  19. setConfigDefaults(config)
  20. }
  21. type PeerMessage struct {
  22. PeerKey string
  23. Bytes []byte
  24. Counter int
  25. }
  26. type TestReactor struct {
  27. BaseReactor
  28. mtx sync.Mutex
  29. channels []*ChannelDescriptor
  30. peersAdded []*Peer
  31. peersRemoved []*Peer
  32. logMessages bool
  33. msgsCounter int
  34. msgsReceived map[byte][]PeerMessage
  35. }
  36. func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
  37. tr := &TestReactor{
  38. channels: channels,
  39. logMessages: logMessages,
  40. msgsReceived: make(map[byte][]PeerMessage),
  41. }
  42. tr.BaseReactor = *NewBaseReactor(log, "TestReactor", tr)
  43. return tr
  44. }
  45. func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
  46. return tr.channels
  47. }
  48. func (tr *TestReactor) AddPeer(peer *Peer) {
  49. tr.mtx.Lock()
  50. defer tr.mtx.Unlock()
  51. tr.peersAdded = append(tr.peersAdded, peer)
  52. }
  53. func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
  54. tr.mtx.Lock()
  55. defer tr.mtx.Unlock()
  56. tr.peersRemoved = append(tr.peersRemoved, peer)
  57. }
  58. func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
  59. if tr.logMessages {
  60. tr.mtx.Lock()
  61. defer tr.mtx.Unlock()
  62. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  63. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key, msgBytes, tr.msgsCounter})
  64. tr.msgsCounter++
  65. }
  66. }
  67. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  68. tr.mtx.Lock()
  69. defer tr.mtx.Unlock()
  70. return tr.msgsReceived[chID]
  71. }
  72. //-----------------------------------------------------------------------------
  73. // convenience method for creating two switches connected to each other.
  74. // XXX: note this uses net.Pipe and not a proper TCP conn
  75. func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  76. // Create two switches that will be interconnected.
  77. switches := MakeConnectedSwitches(2, initSwitch, Connect2Switches)
  78. return switches[0], switches[1]
  79. }
  80. func initSwitchFunc(i int, sw *Switch) *Switch {
  81. // Make two reactors of two channels each
  82. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  83. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  84. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  85. }, true))
  86. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  87. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  88. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  89. }, true))
  90. return sw
  91. }
  92. func TestSwitches(t *testing.T) {
  93. s1, s2 := makeSwitchPair(t, initSwitchFunc)
  94. defer s1.Stop()
  95. defer s2.Stop()
  96. if s1.Peers().Size() != 1 {
  97. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  98. }
  99. if s2.Peers().Size() != 1 {
  100. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  101. }
  102. // Lets send some messages
  103. ch0Msg := "channel zero"
  104. ch1Msg := "channel foo"
  105. ch2Msg := "channel bar"
  106. s1.Broadcast(byte(0x00), ch0Msg)
  107. s1.Broadcast(byte(0x01), ch1Msg)
  108. s1.Broadcast(byte(0x02), ch2Msg)
  109. // Wait for things to settle...
  110. time.Sleep(5000 * time.Millisecond)
  111. // Check message on ch0
  112. ch0Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x00))
  113. if len(ch0Msgs) != 1 {
  114. t.Errorf("Expected to have received 1 message in ch0")
  115. }
  116. if !bytes.Equal(ch0Msgs[0].Bytes, wire.BinaryBytes(ch0Msg)) {
  117. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes)
  118. }
  119. // Check message on ch1
  120. ch1Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x01))
  121. if len(ch1Msgs) != 1 {
  122. t.Errorf("Expected to have received 1 message in ch1")
  123. }
  124. if !bytes.Equal(ch1Msgs[0].Bytes, wire.BinaryBytes(ch1Msg)) {
  125. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch1Msg), ch1Msgs[0].Bytes)
  126. }
  127. // Check message on ch2
  128. ch2Msgs := s2.Reactor("bar").(*TestReactor).getMsgs(byte(0x02))
  129. if len(ch2Msgs) != 1 {
  130. t.Errorf("Expected to have received 1 message in ch2")
  131. }
  132. if !bytes.Equal(ch2Msgs[0].Bytes, wire.BinaryBytes(ch2Msg)) {
  133. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch2Msg), ch2Msgs[0].Bytes)
  134. }
  135. }
  136. func TestConnAddrFilter(t *testing.T) {
  137. s1 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  138. s2 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  139. c1, c2 := net.Pipe()
  140. s1.SetAddrFilter(func(addr net.Addr) error {
  141. if addr.String() == c1.RemoteAddr().String() {
  142. return fmt.Errorf("Error: pipe is blacklisted")
  143. }
  144. return nil
  145. })
  146. // connect to good peer
  147. go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
  148. go s2.AddPeerWithConnection(c2, true)
  149. // Wait for things to happen, peers to get added...
  150. time.Sleep(100 * time.Millisecond * time.Duration(4))
  151. defer s1.Stop()
  152. defer s2.Stop()
  153. if s1.Peers().Size() != 0 {
  154. t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
  155. }
  156. if s2.Peers().Size() != 0 {
  157. t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
  158. }
  159. }
  160. func TestConnPubKeyFilter(t *testing.T) {
  161. s1 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  162. s2 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  163. c1, c2 := net.Pipe()
  164. // set pubkey filter
  165. s1.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  166. if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
  167. return fmt.Errorf("Error: pipe is blacklisted")
  168. }
  169. return nil
  170. })
  171. // connect to good peer
  172. go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
  173. go s2.AddPeerWithConnection(c2, true)
  174. // Wait for things to happen, peers to get added...
  175. time.Sleep(100 * time.Millisecond * time.Duration(4))
  176. defer s1.Stop()
  177. defer s2.Stop()
  178. if s1.Peers().Size() != 0 {
  179. t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
  180. }
  181. if s2.Peers().Size() != 0 {
  182. t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
  183. }
  184. }
  185. func BenchmarkSwitches(b *testing.B) {
  186. b.StopTimer()
  187. s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
  188. // Make bar reactors of bar channels each
  189. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  190. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  191. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  192. }, false))
  193. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  194. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  195. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  196. }, false))
  197. return sw
  198. })
  199. defer s1.Stop()
  200. defer s2.Stop()
  201. // Allow time for goroutines to boot up
  202. time.Sleep(1000 * time.Millisecond)
  203. b.StartTimer()
  204. numSuccess, numFailure := 0, 0
  205. // Send random message from foo channel to another
  206. for i := 0; i < b.N; i++ {
  207. chID := byte(i % 4)
  208. successChan := s1.Broadcast(chID, "test data")
  209. for s := range successChan {
  210. if s {
  211. numSuccess += 1
  212. } else {
  213. numFailure += 1
  214. }
  215. }
  216. }
  217. log.Warn(Fmt("success: %v, failure: %v", numSuccess, numFailure))
  218. // Allow everything to flush before stopping switches & closing connections.
  219. b.StopTimer()
  220. time.Sleep(1000 * time.Millisecond)
  221. }