You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

381 lines
9.9 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. golog "log"
  6. "net"
  7. "sync"
  8. "testing"
  9. "time"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/stretchr/testify/require"
  12. . "github.com/tendermint/go-common"
  13. cmn "github.com/tendermint/go-common"
  14. cfg "github.com/tendermint/go-config"
  15. crypto "github.com/tendermint/go-crypto"
  16. wire "github.com/tendermint/go-wire"
  17. )
  18. var (
  19. config cfg.Config
  20. )
  21. func init() {
  22. config = cfg.NewMapConfig(nil)
  23. setConfigDefaults(config)
  24. }
  25. type PeerMessage struct {
  26. PeerKey string
  27. Bytes []byte
  28. Counter int
  29. }
  30. type TestReactor struct {
  31. BaseReactor
  32. mtx sync.Mutex
  33. channels []*ChannelDescriptor
  34. peersAdded []*Peer
  35. peersRemoved []*Peer
  36. logMessages bool
  37. msgsCounter int
  38. msgsReceived map[byte][]PeerMessage
  39. }
  40. func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
  41. tr := &TestReactor{
  42. channels: channels,
  43. logMessages: logMessages,
  44. msgsReceived: make(map[byte][]PeerMessage),
  45. }
  46. tr.BaseReactor = *NewBaseReactor(log, "TestReactor", tr)
  47. return tr
  48. }
  49. func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
  50. return tr.channels
  51. }
  52. func (tr *TestReactor) AddPeer(peer *Peer) {
  53. tr.mtx.Lock()
  54. defer tr.mtx.Unlock()
  55. tr.peersAdded = append(tr.peersAdded, peer)
  56. }
  57. func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) {
  58. tr.mtx.Lock()
  59. defer tr.mtx.Unlock()
  60. tr.peersRemoved = append(tr.peersRemoved, peer)
  61. }
  62. func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
  63. if tr.logMessages {
  64. tr.mtx.Lock()
  65. defer tr.mtx.Unlock()
  66. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  67. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key, msgBytes, tr.msgsCounter})
  68. tr.msgsCounter++
  69. }
  70. }
  71. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  72. tr.mtx.Lock()
  73. defer tr.mtx.Unlock()
  74. return tr.msgsReceived[chID]
  75. }
  76. //-----------------------------------------------------------------------------
  77. // convenience method for creating two switches connected to each other.
  78. // XXX: note this uses net.Pipe and not a proper TCP conn
  79. func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  80. // Create two switches that will be interconnected.
  81. switches := MakeConnectedSwitches(2, initSwitch, Connect2Switches)
  82. return switches[0], switches[1]
  83. }
  84. func initSwitchFunc(i int, sw *Switch) *Switch {
  85. // Make two reactors of two channels each
  86. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  87. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  88. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  89. }, true))
  90. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  91. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  92. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  93. }, true))
  94. return sw
  95. }
  96. func TestSwitches(t *testing.T) {
  97. s1, s2 := makeSwitchPair(t, initSwitchFunc)
  98. defer s1.Stop()
  99. defer s2.Stop()
  100. if s1.Peers().Size() != 1 {
  101. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  102. }
  103. if s2.Peers().Size() != 1 {
  104. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  105. }
  106. // Lets send some messages
  107. ch0Msg := "channel zero"
  108. ch1Msg := "channel foo"
  109. ch2Msg := "channel bar"
  110. s1.Broadcast(byte(0x00), ch0Msg)
  111. s1.Broadcast(byte(0x01), ch1Msg)
  112. s1.Broadcast(byte(0x02), ch2Msg)
  113. // Wait for things to settle...
  114. time.Sleep(5000 * time.Millisecond)
  115. // Check message on ch0
  116. ch0Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x00))
  117. if len(ch0Msgs) != 1 {
  118. t.Errorf("Expected to have received 1 message in ch0")
  119. }
  120. if !bytes.Equal(ch0Msgs[0].Bytes, wire.BinaryBytes(ch0Msg)) {
  121. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes)
  122. }
  123. // Check message on ch1
  124. ch1Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x01))
  125. if len(ch1Msgs) != 1 {
  126. t.Errorf("Expected to have received 1 message in ch1")
  127. }
  128. if !bytes.Equal(ch1Msgs[0].Bytes, wire.BinaryBytes(ch1Msg)) {
  129. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch1Msg), ch1Msgs[0].Bytes)
  130. }
  131. // Check message on ch2
  132. ch2Msgs := s2.Reactor("bar").(*TestReactor).getMsgs(byte(0x02))
  133. if len(ch2Msgs) != 1 {
  134. t.Errorf("Expected to have received 1 message in ch2")
  135. }
  136. if !bytes.Equal(ch2Msgs[0].Bytes, wire.BinaryBytes(ch2Msg)) {
  137. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch2Msg), ch2Msgs[0].Bytes)
  138. }
  139. }
  140. func TestConnAddrFilter(t *testing.T) {
  141. s1 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  142. s2 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  143. c1, c2 := net.Pipe()
  144. s1.SetAddrFilter(func(addr net.Addr) error {
  145. if addr.String() == c1.RemoteAddr().String() {
  146. return fmt.Errorf("Error: pipe is blacklisted")
  147. }
  148. return nil
  149. })
  150. // connect to good peer
  151. go func() {
  152. s1.AddPeerWithConnection(c1, false, s1.reactorsByCh, s1.chDescs, s1.StopPeerForError, s1.config, s1.nodePrivKey)
  153. }()
  154. go func() {
  155. s2.AddPeerWithConnection(c2, true, s2.reactorsByCh, s2.chDescs, s2.StopPeerForError, s2.config, s2.nodePrivKey)
  156. }()
  157. // Wait for things to happen, peers to get added...
  158. time.Sleep(100 * time.Millisecond * time.Duration(4))
  159. defer s1.Stop()
  160. defer s2.Stop()
  161. if s1.Peers().Size() != 0 {
  162. t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
  163. }
  164. if s2.Peers().Size() != 0 {
  165. t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
  166. }
  167. }
  168. func TestConnPubKeyFilter(t *testing.T) {
  169. s1 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  170. s2 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  171. c1, c2 := net.Pipe()
  172. // set pubkey filter
  173. s1.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  174. if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
  175. return fmt.Errorf("Error: pipe is blacklisted")
  176. }
  177. return nil
  178. })
  179. // connect to good peer
  180. go func() {
  181. s1.AddPeerWithConnection(c1, false, s1.reactorsByCh, s1.chDescs, s1.StopPeerForError, s1.config, s1.nodePrivKey)
  182. }()
  183. go func() {
  184. s2.AddPeerWithConnection(c2, true, s2.reactorsByCh, s2.chDescs, s2.StopPeerForError, s2.config, s2.nodePrivKey)
  185. }()
  186. // Wait for things to happen, peers to get added...
  187. time.Sleep(100 * time.Millisecond * time.Duration(4))
  188. defer s1.Stop()
  189. defer s2.Stop()
  190. if s1.Peers().Size() != 0 {
  191. t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
  192. }
  193. if s2.Peers().Size() != 0 {
  194. t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
  195. }
  196. }
  197. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  198. assert, require := assert.New(t), require.New(t)
  199. sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  200. sw.Start()
  201. defer sw.Stop()
  202. sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc)
  203. defer sw2.Stop()
  204. l, serverAddr := listenTCP()
  205. done := make(chan struct{})
  206. go accept(l, done, sw2)
  207. defer close(done)
  208. peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
  209. require.Nil(err)
  210. err = sw.AddPeer(peer)
  211. require.Nil(err)
  212. // simulate failure by closing connection
  213. peer.CloseConn()
  214. time.Sleep(100 * time.Millisecond)
  215. assert.Zero(sw.Peers().Size())
  216. assert.False(peer.IsRunning())
  217. }
  218. func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
  219. assert, require := assert.New(t), require.New(t)
  220. sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
  221. sw.Start()
  222. defer sw.Stop()
  223. sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc)
  224. defer sw2.Stop()
  225. l, serverAddr := listenTCP()
  226. done := make(chan struct{})
  227. go accept(l, done, sw2)
  228. defer close(done)
  229. peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
  230. peer.MakePersistent()
  231. require.Nil(err)
  232. err = sw.AddPeer(peer)
  233. require.Nil(err)
  234. // simulate failure by closing connection
  235. peer.CloseConn()
  236. time.Sleep(100 * time.Millisecond)
  237. assert.NotZero(sw.Peers().Size())
  238. assert.False(peer.IsRunning())
  239. }
  240. func BenchmarkSwitches(b *testing.B) {
  241. b.StopTimer()
  242. s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
  243. // Make bar reactors of bar channels each
  244. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  245. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  246. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  247. }, false))
  248. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  249. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  250. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  251. }, false))
  252. return sw
  253. })
  254. defer s1.Stop()
  255. defer s2.Stop()
  256. // Allow time for goroutines to boot up
  257. time.Sleep(1000 * time.Millisecond)
  258. b.StartTimer()
  259. numSuccess, numFailure := 0, 0
  260. // Send random message from foo channel to another
  261. for i := 0; i < b.N; i++ {
  262. chID := byte(i % 4)
  263. successChan := s1.Broadcast(chID, "test data")
  264. for s := range successChan {
  265. if s {
  266. numSuccess++
  267. } else {
  268. numFailure++
  269. }
  270. }
  271. }
  272. log.Warn(Fmt("success: %v, failure: %v", numSuccess, numFailure))
  273. // Allow everything to flush before stopping switches & closing connections.
  274. b.StopTimer()
  275. time.Sleep(1000 * time.Millisecond)
  276. }
  277. func listenTCP() (net.Listener, net.Addr) {
  278. l, e := net.Listen("tcp", "127.0.0.1:0") // any available address
  279. if e != nil {
  280. golog.Fatalf("net.Listen tcp :0: %+v", e)
  281. }
  282. return l, l.Addr()
  283. }
  284. // simulate remote peer
  285. func accept(l net.Listener, done <-chan struct{}, sw *Switch) {
  286. for {
  287. conn, err := l.Accept()
  288. if err != nil {
  289. golog.Fatalf("Failed to accept conn: %+v", err)
  290. }
  291. conn, err = MakeSecretConnection(conn, sw.nodePrivKey)
  292. if err != nil {
  293. golog.Fatalf("Failed to make secret conn: %+v", err)
  294. }
  295. var err1, err2 error
  296. nodeInfo := new(NodeInfo)
  297. cmn.Parallel(
  298. func() {
  299. var n int
  300. wire.WriteBinary(sw.nodeInfo, conn, &n, &err1)
  301. },
  302. func() {
  303. var n int
  304. wire.ReadBinary(nodeInfo, conn, maxNodeInfoSize, &n, &err2)
  305. })
  306. if err1 != nil {
  307. golog.Fatalf("Failed to do handshake: %+v", err1)
  308. }
  309. if err2 != nil {
  310. golog.Fatalf("Failed to do handshake: %+v", err2)
  311. }
  312. select {
  313. case <-done:
  314. conn.Close()
  315. return
  316. default:
  317. }
  318. }
  319. }