You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

487 lines
12 KiB

  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. "github.com/tendermint/tendermint/config"
  11. "github.com/tendermint/tendermint/crypto/ed25519"
  12. "github.com/tendermint/tendermint/libs/log"
  13. "github.com/tendermint/tendermint/p2p/conn"
  14. )
  15. var (
  16. cfg *config.P2PConfig
  17. )
  18. func init() {
  19. cfg = config.DefaultP2PConfig()
  20. cfg.PexReactor = true
  21. cfg.AllowDuplicateIP = true
  22. }
  23. type PeerMessage struct {
  24. PeerID ID
  25. Bytes []byte
  26. Counter int
  27. }
  28. type TestReactor struct {
  29. BaseReactor
  30. mtx sync.Mutex
  31. channels []*conn.ChannelDescriptor
  32. logMessages bool
  33. msgsCounter int
  34. msgsReceived map[byte][]PeerMessage
  35. }
  36. func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
  37. tr := &TestReactor{
  38. channels: channels,
  39. logMessages: logMessages,
  40. msgsReceived: make(map[byte][]PeerMessage),
  41. }
  42. tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
  43. tr.SetLogger(log.TestingLogger())
  44. return tr
  45. }
  46. func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
  47. return tr.channels
  48. }
  49. func (tr *TestReactor) AddPeer(peer Peer) {}
  50. func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
  51. func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
  52. if tr.logMessages {
  53. tr.mtx.Lock()
  54. defer tr.mtx.Unlock()
  55. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  56. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
  57. tr.msgsCounter++
  58. }
  59. }
  60. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  61. tr.mtx.Lock()
  62. defer tr.mtx.Unlock()
  63. return tr.msgsReceived[chID]
  64. }
  65. //-----------------------------------------------------------------------------
  66. // convenience method for creating two switches connected to each other.
  67. // XXX: note this uses net.Pipe and not a proper TCP conn
  68. func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  69. // Create two switches that will be interconnected.
  70. switches := MakeConnectedSwitches(cfg, 2, initSwitch, Connect2Switches)
  71. return switches[0], switches[1]
  72. }
  73. func initSwitchFunc(i int, sw *Switch) *Switch {
  74. sw.SetAddrBook(&addrBookMock{
  75. addrs: make(map[string]struct{}),
  76. ourAddrs: make(map[string]struct{})})
  77. // Make two reactors of two channels each
  78. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  79. {ID: byte(0x00), Priority: 10},
  80. {ID: byte(0x01), Priority: 10},
  81. }, true))
  82. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  83. {ID: byte(0x02), Priority: 10},
  84. {ID: byte(0x03), Priority: 10},
  85. }, true))
  86. return sw
  87. }
  88. func TestSwitches(t *testing.T) {
  89. s1, s2 := MakeSwitchPair(t, initSwitchFunc)
  90. defer s1.Stop()
  91. defer s2.Stop()
  92. if s1.Peers().Size() != 1 {
  93. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  94. }
  95. if s2.Peers().Size() != 1 {
  96. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  97. }
  98. // Lets send some messages
  99. ch0Msg := []byte("channel zero")
  100. ch1Msg := []byte("channel foo")
  101. ch2Msg := []byte("channel bar")
  102. s1.Broadcast(byte(0x00), ch0Msg)
  103. s1.Broadcast(byte(0x01), ch1Msg)
  104. s1.Broadcast(byte(0x02), ch2Msg)
  105. assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  106. assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  107. assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  108. }
  109. func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
  110. ticker := time.NewTicker(checkPeriod)
  111. for {
  112. select {
  113. case <-ticker.C:
  114. msgs := reactor.getMsgs(channel)
  115. if len(msgs) > 0 {
  116. if !bytes.Equal(msgs[0].Bytes, msgBytes) {
  117. t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
  118. }
  119. return
  120. }
  121. case <-time.After(timeout):
  122. t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
  123. }
  124. }
  125. }
  126. func TestSwitchFiltersOutItself(t *testing.T) {
  127. s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc)
  128. // addr := s1.NodeInfo().NetAddress()
  129. // // add ourselves like we do in node.go#427
  130. // s1.addrBook.AddOurAddress(addr)
  131. // simulate s1 having a public IP by creating a remote peer with the same ID
  132. rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg}
  133. rp.Start()
  134. // addr should be rejected in addPeer based on the same ID
  135. err := s1.DialPeerWithAddress(rp.Addr(), false)
  136. if assert.Error(t, err) {
  137. if err, ok := err.(ErrRejected); ok {
  138. if !err.IsSelf() {
  139. t.Errorf("expected self to be rejected")
  140. }
  141. } else {
  142. t.Errorf("expected ErrRejected")
  143. }
  144. }
  145. assert.True(t, s1.addrBook.OurAddress(rp.Addr()))
  146. assert.False(t, s1.addrBook.HasAddress(rp.Addr()))
  147. rp.Stop()
  148. assertNoPeersAfterTimeout(t, s1, 100*time.Millisecond)
  149. }
  150. func TestSwitchPeerFilter(t *testing.T) {
  151. var (
  152. filters = []PeerFilterFunc{
  153. func(_ IPeerSet, _ Peer) error { return nil },
  154. func(_ IPeerSet, _ Peer) error { return fmt.Errorf("denied!") },
  155. func(_ IPeerSet, _ Peer) error { return nil },
  156. }
  157. sw = MakeSwitch(
  158. cfg,
  159. 1,
  160. "testing",
  161. "123.123.123",
  162. initSwitchFunc,
  163. SwitchPeerFilters(filters...),
  164. )
  165. )
  166. defer sw.Stop()
  167. // simulate remote peer
  168. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  169. rp.Start()
  170. defer rp.Stop()
  171. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  172. chDescs: sw.chDescs,
  173. onPeerError: sw.StopPeerForError,
  174. reactorsByCh: sw.reactorsByCh,
  175. })
  176. if err != nil {
  177. t.Fatal(err)
  178. }
  179. err = sw.addPeer(p)
  180. if err, ok := err.(ErrRejected); ok {
  181. if !err.IsFiltered() {
  182. t.Errorf("expected peer to be filtered")
  183. }
  184. } else {
  185. t.Errorf("expected ErrRejected")
  186. }
  187. }
  188. func TestSwitchPeerFilterTimeout(t *testing.T) {
  189. var (
  190. filters = []PeerFilterFunc{
  191. func(_ IPeerSet, _ Peer) error {
  192. time.Sleep(10 * time.Millisecond)
  193. return nil
  194. },
  195. }
  196. sw = MakeSwitch(
  197. cfg,
  198. 1,
  199. "testing",
  200. "123.123.123",
  201. initSwitchFunc,
  202. SwitchFilterTimeout(5*time.Millisecond),
  203. SwitchPeerFilters(filters...),
  204. )
  205. )
  206. defer sw.Stop()
  207. // simulate remote peer
  208. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  209. rp.Start()
  210. defer rp.Stop()
  211. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  212. chDescs: sw.chDescs,
  213. onPeerError: sw.StopPeerForError,
  214. reactorsByCh: sw.reactorsByCh,
  215. })
  216. if err != nil {
  217. t.Fatal(err)
  218. }
  219. err = sw.addPeer(p)
  220. if _, ok := err.(ErrFilterTimeout); !ok {
  221. t.Errorf("expected ErrFilterTimeout")
  222. }
  223. }
  224. func TestSwitchPeerFilterDuplicate(t *testing.T) {
  225. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  226. // simulate remote peer
  227. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  228. rp.Start()
  229. defer rp.Stop()
  230. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  231. chDescs: sw.chDescs,
  232. onPeerError: sw.StopPeerForError,
  233. reactorsByCh: sw.reactorsByCh,
  234. })
  235. if err != nil {
  236. t.Fatal(err)
  237. }
  238. if err := sw.addPeer(p); err != nil {
  239. t.Fatal(err)
  240. }
  241. err = sw.addPeer(p)
  242. if err, ok := err.(ErrRejected); ok {
  243. if !err.IsDuplicate() {
  244. t.Errorf("expected peer to be duplicate")
  245. }
  246. } else {
  247. t.Errorf("expected ErrRejected")
  248. }
  249. }
  250. func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
  251. time.Sleep(timeout)
  252. if sw.Peers().Size() != 0 {
  253. t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
  254. }
  255. }
  256. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  257. assert, require := assert.New(t), require.New(t)
  258. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  259. err := sw.Start()
  260. if err != nil {
  261. t.Error(err)
  262. }
  263. defer sw.Stop()
  264. // simulate remote peer
  265. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  266. rp.Start()
  267. defer rp.Stop()
  268. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  269. chDescs: sw.chDescs,
  270. onPeerError: sw.StopPeerForError,
  271. reactorsByCh: sw.reactorsByCh,
  272. })
  273. require.Nil(err)
  274. err = sw.addPeer(p)
  275. require.Nil(err)
  276. require.NotNil(sw.Peers().Get(rp.ID()))
  277. // simulate failure by closing connection
  278. p.(*peer).CloseConn()
  279. assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
  280. assert.False(p.IsRunning())
  281. }
  282. func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
  283. assert, require := assert.New(t), require.New(t)
  284. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  285. err := sw.Start()
  286. if err != nil {
  287. t.Error(err)
  288. }
  289. defer sw.Stop()
  290. // simulate remote peer
  291. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  292. rp.Start()
  293. defer rp.Stop()
  294. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  295. chDescs: sw.chDescs,
  296. onPeerError: sw.StopPeerForError,
  297. persistent: true,
  298. reactorsByCh: sw.reactorsByCh,
  299. })
  300. require.Nil(err)
  301. require.Nil(sw.addPeer(p))
  302. require.NotNil(sw.Peers().Get(rp.ID()))
  303. // simulate failure by closing connection
  304. p.(*peer).CloseConn()
  305. // TODO: remove sleep, detect the disconnection, wait for reconnect
  306. npeers := sw.Peers().Size()
  307. for i := 0; i < 20; i++ {
  308. time.Sleep(250 * time.Millisecond)
  309. npeers = sw.Peers().Size()
  310. if npeers > 0 {
  311. break
  312. }
  313. }
  314. assert.NotZero(npeers)
  315. assert.False(p.IsRunning())
  316. // simulate another remote peer
  317. rp = &remotePeer{
  318. PrivKey: ed25519.GenPrivKey(),
  319. Config: cfg,
  320. // Use different interface to prevent duplicate IP filter, this will break
  321. // beyond two peers.
  322. listenAddr: "127.0.0.1:0",
  323. }
  324. rp.Start()
  325. defer rp.Stop()
  326. // simulate first time dial failure
  327. conf := config.DefaultP2PConfig()
  328. conf.TestDialFail = true
  329. err = sw.addOutboundPeerWithConfig(rp.Addr(), conf, true)
  330. require.NotNil(err)
  331. // DialPeerWithAddres - sw.peerConfig resets the dialer
  332. // TODO: same as above
  333. for i := 0; i < 20; i++ {
  334. time.Sleep(250 * time.Millisecond)
  335. npeers = sw.Peers().Size()
  336. if npeers > 1 {
  337. break
  338. }
  339. }
  340. assert.EqualValues(2, npeers)
  341. }
  342. func TestSwitchFullConnectivity(t *testing.T) {
  343. switches := MakeConnectedSwitches(cfg, 3, initSwitchFunc, Connect2Switches)
  344. defer func() {
  345. for _, sw := range switches {
  346. sw.Stop()
  347. }
  348. }()
  349. for i, sw := range switches {
  350. if sw.Peers().Size() != 2 {
  351. t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
  352. }
  353. }
  354. }
  355. func BenchmarkSwitchBroadcast(b *testing.B) {
  356. s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
  357. // Make bar reactors of bar channels each
  358. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  359. {ID: byte(0x00), Priority: 10},
  360. {ID: byte(0x01), Priority: 10},
  361. }, false))
  362. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  363. {ID: byte(0x02), Priority: 10},
  364. {ID: byte(0x03), Priority: 10},
  365. }, false))
  366. return sw
  367. })
  368. defer s1.Stop()
  369. defer s2.Stop()
  370. // Allow time for goroutines to boot up
  371. time.Sleep(1 * time.Second)
  372. b.ResetTimer()
  373. numSuccess, numFailure := 0, 0
  374. // Send random message from foo channel to another
  375. for i := 0; i < b.N; i++ {
  376. chID := byte(i % 4)
  377. successChan := s1.Broadcast(chID, []byte("test data"))
  378. for s := range successChan {
  379. if s {
  380. numSuccess++
  381. } else {
  382. numFailure++
  383. }
  384. }
  385. }
  386. b.Logf("success: %v, failure: %v", numSuccess, numFailure)
  387. }
  388. type addrBookMock struct {
  389. addrs map[string]struct{}
  390. ourAddrs map[string]struct{}
  391. }
  392. var _ AddrBook = (*addrBookMock)(nil)
  393. func (book *addrBookMock) AddAddress(addr *NetAddress, src *NetAddress) error {
  394. book.addrs[addr.String()] = struct{}{}
  395. return nil
  396. }
  397. func (book *addrBookMock) AddOurAddress(addr *NetAddress) { book.ourAddrs[addr.String()] = struct{}{} }
  398. func (book *addrBookMock) OurAddress(addr *NetAddress) bool {
  399. _, ok := book.ourAddrs[addr.String()]
  400. return ok
  401. }
  402. func (book *addrBookMock) MarkGood(*NetAddress) {}
  403. func (book *addrBookMock) HasAddress(addr *NetAddress) bool {
  404. _, ok := book.addrs[addr.String()]
  405. return ok
  406. }
  407. func (book *addrBookMock) RemoveAddress(addr *NetAddress) {
  408. delete(book.addrs, addr.String())
  409. }
  410. func (book *addrBookMock) Save() {}