You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

330 lines
8.4 KiB

9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. . "github.com/tendermint/tmlibs/common"
  12. cfg "github.com/tendermint/go-config"
  13. crypto "github.com/tendermint/go-crypto"
  14. wire "github.com/tendermint/go-wire"
  15. )
  16. var (
  17. config cfg.Config
  18. )
  19. func init() {
  20. config = cfg.NewMapConfig(nil)
  21. setConfigDefaults(config)
  22. }
  23. type PeerMessage struct {
  24. PeerKey string
  25. Bytes []byte
  26. Counter int
  27. }
  28. type TestReactor struct {
  29. BaseReactor
  30. mtx sync.Mutex
  31. channels []*ChannelDescriptor
  32. peersAdded []*Peer
  33. peersRemoved []*Peer
  34. logMessages bool
  35. msgsCounter int
  36. msgsReceived map[byte][]PeerMessage
  37. }
  38. func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
  39. tr := &TestReactor{
  40. channels: channels,
  41. logMessages: logMessages,
  42. msgsReceived: make(map[byte][]PeerMessage),
  43. }
  44. tr.BaseReactor = *NewBaseReactor(log, "TestReactor", tr)
  45. return tr
  46. }
  47. func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
  48. return tr.channels
  49. }
  50. func (tr *TestReactor) AddPeer(peer *Peer) {
  51. tr.mtx.Lock()
  52. defer tr.mtx.Unlock()
  53. tr.peersAdded = append(tr.peersAdded, peer)
  54. }
  55. func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
  56. tr.mtx.Lock()
  57. defer tr.mtx.Unlock()
  58. tr.peersRemoved = append(tr.peersRemoved, peer)
  59. }
  60. func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
  61. if tr.logMessages {
  62. tr.mtx.Lock()
  63. defer tr.mtx.Unlock()
  64. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  65. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key, msgBytes, tr.msgsCounter})
  66. tr.msgsCounter++
  67. }
  68. }
  69. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  70. tr.mtx.Lock()
  71. defer tr.mtx.Unlock()
  72. return tr.msgsReceived[chID]
  73. }
  74. //-----------------------------------------------------------------------------
  75. // convenience method for creating two switches connected to each other.
  76. // XXX: note this uses net.Pipe and not a proper TCP conn
  77. func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  78. // Create two switches that will be interconnected.
  79. switches := MakeConnectedSwitches(2, initSwitch, Connect2Switches)
  80. return switches[0], switches[1]
  81. }
  82. func initSwitchFunc(i int, sw *Switch) *Switch {
  83. // Make two reactors of two channels each
  84. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  85. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  86. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  87. }, true))
  88. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  89. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  90. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  91. }, true))
  92. return sw
  93. }
  94. func TestSwitches(t *testing.T) {
  95. s1, s2 := makeSwitchPair(t, initSwitchFunc)
  96. defer s1.Stop()
  97. defer s2.Stop()
  98. if s1.Peers().Size() != 1 {
  99. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  100. }
  101. if s2.Peers().Size() != 1 {
  102. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  103. }
  104. // Lets send some messages
  105. ch0Msg := "channel zero"
  106. ch1Msg := "channel foo"
  107. ch2Msg := "channel bar"
  108. s1.Broadcast(byte(0x00), ch0Msg)
  109. s1.Broadcast(byte(0x01), ch1Msg)
  110. s1.Broadcast(byte(0x02), ch2Msg)
  111. // Wait for things to settle...
  112. time.Sleep(5000 * time.Millisecond)
  113. // Check message on ch0
  114. ch0Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x00))
  115. if len(ch0Msgs) != 1 {
  116. t.Errorf("Expected to have received 1 message in ch0")
  117. }
  118. if !bytes.Equal(ch0Msgs[0].Bytes, wire.BinaryBytes(ch0Msg)) {
  119. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes)
  120. }
  121. // Check message on ch1
  122. ch1Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x01))
  123. if len(ch1Msgs) != 1 {
  124. t.Errorf("Expected to have received 1 message in ch1")
  125. }
  126. if !bytes.Equal(ch1Msgs[0].Bytes, wire.BinaryBytes(ch1Msg)) {
  127. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch1Msg), ch1Msgs[0].Bytes)
  128. }
  129. // Check message on ch2
  130. ch2Msgs := s2.Reactor("bar").(*TestReactor).getMsgs(byte(0x02))
  131. if len(ch2Msgs) != 1 {
  132. t.Errorf("Expected to have received 1 message in ch2")
  133. }
  134. if !bytes.Equal(ch2Msgs[0].Bytes, wire.BinaryBytes(ch2Msg)) {
  135. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch2Msg), ch2Msgs[0].Bytes)
  136. }
  137. }
  138. func TestConnAddrFilter(t *testing.T) {
  139. s1 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  140. s2 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  141. c1, c2 := net.Pipe()
  142. s1.SetAddrFilter(func(addr net.Addr) error {
  143. if addr.String() == c1.RemoteAddr().String() {
  144. return fmt.Errorf("Error: pipe is blacklisted")
  145. }
  146. return nil
  147. })
  148. // connect to good peer
  149. go func() {
  150. s1.addPeerWithConnection(c1)
  151. }()
  152. go func() {
  153. s2.addPeerWithConnection(c2)
  154. }()
  155. // Wait for things to happen, peers to get added...
  156. time.Sleep(100 * time.Millisecond * time.Duration(4))
  157. defer s1.Stop()
  158. defer s2.Stop()
  159. if s1.Peers().Size() != 0 {
  160. t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
  161. }
  162. if s2.Peers().Size() != 0 {
  163. t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
  164. }
  165. }
  166. func TestConnPubKeyFilter(t *testing.T) {
  167. s1 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  168. s2 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  169. c1, c2 := net.Pipe()
  170. // set pubkey filter
  171. s1.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  172. if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
  173. return fmt.Errorf("Error: pipe is blacklisted")
  174. }
  175. return nil
  176. })
  177. // connect to good peer
  178. go func() {
  179. s1.addPeerWithConnection(c1)
  180. }()
  181. go func() {
  182. s2.addPeerWithConnection(c2)
  183. }()
  184. // Wait for things to happen, peers to get added...
  185. time.Sleep(100 * time.Millisecond * time.Duration(4))
  186. defer s1.Stop()
  187. defer s2.Stop()
  188. if s1.Peers().Size() != 0 {
  189. t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
  190. }
  191. if s2.Peers().Size() != 0 {
  192. t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
  193. }
  194. }
  195. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  196. assert, require := assert.New(t), require.New(t)
  197. sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  198. sw.Start()
  199. defer sw.Stop()
  200. // simulate remote peer
  201. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
  202. rp.Start()
  203. defer rp.Stop()
  204. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
  205. require.Nil(err)
  206. err = sw.AddPeer(peer)
  207. require.Nil(err)
  208. // simulate failure by closing connection
  209. peer.CloseConn()
  210. time.Sleep(100 * time.Millisecond)
  211. assert.Zero(sw.Peers().Size())
  212. assert.False(peer.IsRunning())
  213. }
  214. func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
  215. assert, require := assert.New(t), require.New(t)
  216. sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  217. sw.Start()
  218. defer sw.Stop()
  219. // simulate remote peer
  220. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
  221. rp.Start()
  222. defer rp.Stop()
  223. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
  224. peer.makePersistent()
  225. require.Nil(err)
  226. err = sw.AddPeer(peer)
  227. require.Nil(err)
  228. // simulate failure by closing connection
  229. peer.CloseConn()
  230. time.Sleep(100 * time.Millisecond)
  231. assert.NotZero(sw.Peers().Size())
  232. assert.False(peer.IsRunning())
  233. }
  234. func BenchmarkSwitches(b *testing.B) {
  235. b.StopTimer()
  236. s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
  237. // Make bar reactors of bar channels each
  238. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  239. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  240. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  241. }, false))
  242. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  243. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  244. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  245. }, false))
  246. return sw
  247. })
  248. defer s1.Stop()
  249. defer s2.Stop()
  250. // Allow time for goroutines to boot up
  251. time.Sleep(1000 * time.Millisecond)
  252. b.StartTimer()
  253. numSuccess, numFailure := 0, 0
  254. // Send random message from foo channel to another
  255. for i := 0; i < b.N; i++ {
  256. chID := byte(i % 4)
  257. successChan := s1.Broadcast(chID, "test data")
  258. for s := range successChan {
  259. if s {
  260. numSuccess++
  261. } else {
  262. numFailure++
  263. }
  264. }
  265. }
  266. log.Warn(Fmt("success: %v, failure: %v", numSuccess, numFailure))
  267. // Allow everything to flush before stopping switches & closing connections.
  268. b.StopTimer()
  269. time.Sleep(1000 * time.Millisecond)
  270. }