You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

543 lines
13 KiB

  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "net/http/httptest"
  8. "regexp"
  9. "strconv"
  10. "sync"
  11. "testing"
  12. "time"
  13. stdprometheus "github.com/prometheus/client_golang/prometheus"
  14. "github.com/stretchr/testify/assert"
  15. "github.com/stretchr/testify/require"
  16. "github.com/tendermint/tendermint/config"
  17. "github.com/tendermint/tendermint/crypto/ed25519"
  18. "github.com/tendermint/tendermint/libs/log"
  19. "github.com/tendermint/tendermint/p2p/conn"
  20. )
  21. var (
  22. cfg *config.P2PConfig
  23. )
  24. func init() {
  25. cfg = config.DefaultP2PConfig()
  26. cfg.PexReactor = true
  27. cfg.AllowDuplicateIP = true
  28. }
  29. type PeerMessage struct {
  30. PeerID ID
  31. Bytes []byte
  32. Counter int
  33. }
  34. type TestReactor struct {
  35. BaseReactor
  36. mtx sync.Mutex
  37. channels []*conn.ChannelDescriptor
  38. logMessages bool
  39. msgsCounter int
  40. msgsReceived map[byte][]PeerMessage
  41. }
  42. func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
  43. tr := &TestReactor{
  44. channels: channels,
  45. logMessages: logMessages,
  46. msgsReceived: make(map[byte][]PeerMessage),
  47. }
  48. tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
  49. tr.SetLogger(log.TestingLogger())
  50. return tr
  51. }
  52. func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
  53. return tr.channels
  54. }
  55. func (tr *TestReactor) AddPeer(peer Peer) {}
  56. func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
  57. func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
  58. if tr.logMessages {
  59. tr.mtx.Lock()
  60. defer tr.mtx.Unlock()
  61. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  62. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
  63. tr.msgsCounter++
  64. }
  65. }
  66. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  67. tr.mtx.Lock()
  68. defer tr.mtx.Unlock()
  69. return tr.msgsReceived[chID]
  70. }
  71. //-----------------------------------------------------------------------------
  72. // convenience method for creating two switches connected to each other.
  73. // XXX: note this uses net.Pipe and not a proper TCP conn
  74. func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  75. // Create two switches that will be interconnected.
  76. switches := MakeConnectedSwitches(cfg, 2, initSwitch, Connect2Switches)
  77. return switches[0], switches[1]
  78. }
  79. func initSwitchFunc(i int, sw *Switch) *Switch {
  80. sw.SetAddrBook(&addrBookMock{
  81. addrs: make(map[string]struct{}),
  82. ourAddrs: make(map[string]struct{})})
  83. // Make two reactors of two channels each
  84. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  85. {ID: byte(0x00), Priority: 10},
  86. {ID: byte(0x01), Priority: 10},
  87. }, true))
  88. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  89. {ID: byte(0x02), Priority: 10},
  90. {ID: byte(0x03), Priority: 10},
  91. }, true))
  92. return sw
  93. }
  94. func TestSwitches(t *testing.T) {
  95. s1, s2 := MakeSwitchPair(t, initSwitchFunc)
  96. defer s1.Stop()
  97. defer s2.Stop()
  98. if s1.Peers().Size() != 1 {
  99. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  100. }
  101. if s2.Peers().Size() != 1 {
  102. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  103. }
  104. // Lets send some messages
  105. ch0Msg := []byte("channel zero")
  106. ch1Msg := []byte("channel foo")
  107. ch2Msg := []byte("channel bar")
  108. s1.Broadcast(byte(0x00), ch0Msg)
  109. s1.Broadcast(byte(0x01), ch1Msg)
  110. s1.Broadcast(byte(0x02), ch2Msg)
  111. assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  112. assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  113. assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  114. }
  115. func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
  116. ticker := time.NewTicker(checkPeriod)
  117. for {
  118. select {
  119. case <-ticker.C:
  120. msgs := reactor.getMsgs(channel)
  121. if len(msgs) > 0 {
  122. if !bytes.Equal(msgs[0].Bytes, msgBytes) {
  123. t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
  124. }
  125. return
  126. }
  127. case <-time.After(timeout):
  128. t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
  129. }
  130. }
  131. }
  132. func TestSwitchFiltersOutItself(t *testing.T) {
  133. s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc)
  134. // addr := s1.NodeInfo().NetAddress()
  135. // // add ourselves like we do in node.go#427
  136. // s1.addrBook.AddOurAddress(addr)
  137. // simulate s1 having a public IP by creating a remote peer with the same ID
  138. rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg}
  139. rp.Start()
  140. // addr should be rejected in addPeer based on the same ID
  141. err := s1.DialPeerWithAddress(rp.Addr(), false)
  142. if assert.Error(t, err) {
  143. if err, ok := err.(ErrRejected); ok {
  144. if !err.IsSelf() {
  145. t.Errorf("expected self to be rejected")
  146. }
  147. } else {
  148. t.Errorf("expected ErrRejected")
  149. }
  150. }
  151. assert.True(t, s1.addrBook.OurAddress(rp.Addr()))
  152. assert.False(t, s1.addrBook.HasAddress(rp.Addr()))
  153. rp.Stop()
  154. assertNoPeersAfterTimeout(t, s1, 100*time.Millisecond)
  155. }
  156. func TestSwitchPeerFilter(t *testing.T) {
  157. var (
  158. filters = []PeerFilterFunc{
  159. func(_ IPeerSet, _ Peer) error { return nil },
  160. func(_ IPeerSet, _ Peer) error { return fmt.Errorf("denied!") },
  161. func(_ IPeerSet, _ Peer) error { return nil },
  162. }
  163. sw = MakeSwitch(
  164. cfg,
  165. 1,
  166. "testing",
  167. "123.123.123",
  168. initSwitchFunc,
  169. SwitchPeerFilters(filters...),
  170. )
  171. )
  172. defer sw.Stop()
  173. // simulate remote peer
  174. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  175. rp.Start()
  176. defer rp.Stop()
  177. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  178. chDescs: sw.chDescs,
  179. onPeerError: sw.StopPeerForError,
  180. reactorsByCh: sw.reactorsByCh,
  181. })
  182. if err != nil {
  183. t.Fatal(err)
  184. }
  185. err = sw.addPeer(p)
  186. if err, ok := err.(ErrRejected); ok {
  187. if !err.IsFiltered() {
  188. t.Errorf("expected peer to be filtered")
  189. }
  190. } else {
  191. t.Errorf("expected ErrRejected")
  192. }
  193. }
  194. func TestSwitchPeerFilterTimeout(t *testing.T) {
  195. var (
  196. filters = []PeerFilterFunc{
  197. func(_ IPeerSet, _ Peer) error {
  198. time.Sleep(10 * time.Millisecond)
  199. return nil
  200. },
  201. }
  202. sw = MakeSwitch(
  203. cfg,
  204. 1,
  205. "testing",
  206. "123.123.123",
  207. initSwitchFunc,
  208. SwitchFilterTimeout(5*time.Millisecond),
  209. SwitchPeerFilters(filters...),
  210. )
  211. )
  212. defer sw.Stop()
  213. // simulate remote peer
  214. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  215. rp.Start()
  216. defer rp.Stop()
  217. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  218. chDescs: sw.chDescs,
  219. onPeerError: sw.StopPeerForError,
  220. reactorsByCh: sw.reactorsByCh,
  221. })
  222. if err != nil {
  223. t.Fatal(err)
  224. }
  225. err = sw.addPeer(p)
  226. if _, ok := err.(ErrFilterTimeout); !ok {
  227. t.Errorf("expected ErrFilterTimeout")
  228. }
  229. }
  230. func TestSwitchPeerFilterDuplicate(t *testing.T) {
  231. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  232. // simulate remote peer
  233. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  234. rp.Start()
  235. defer rp.Stop()
  236. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  237. chDescs: sw.chDescs,
  238. onPeerError: sw.StopPeerForError,
  239. reactorsByCh: sw.reactorsByCh,
  240. })
  241. if err != nil {
  242. t.Fatal(err)
  243. }
  244. if err := sw.addPeer(p); err != nil {
  245. t.Fatal(err)
  246. }
  247. err = sw.addPeer(p)
  248. if err, ok := err.(ErrRejected); ok {
  249. if !err.IsDuplicate() {
  250. t.Errorf("expected peer to be duplicate")
  251. }
  252. } else {
  253. t.Errorf("expected ErrRejected")
  254. }
  255. }
  256. func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
  257. time.Sleep(timeout)
  258. if sw.Peers().Size() != 0 {
  259. t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
  260. }
  261. }
  262. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  263. assert, require := assert.New(t), require.New(t)
  264. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  265. err := sw.Start()
  266. if err != nil {
  267. t.Error(err)
  268. }
  269. defer sw.Stop()
  270. // simulate remote peer
  271. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  272. rp.Start()
  273. defer rp.Stop()
  274. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  275. chDescs: sw.chDescs,
  276. onPeerError: sw.StopPeerForError,
  277. reactorsByCh: sw.reactorsByCh,
  278. })
  279. require.Nil(err)
  280. err = sw.addPeer(p)
  281. require.Nil(err)
  282. require.NotNil(sw.Peers().Get(rp.ID()))
  283. // simulate failure by closing connection
  284. p.(*peer).CloseConn()
  285. assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
  286. assert.False(p.IsRunning())
  287. }
  288. func TestSwitchStopPeerForError(t *testing.T) {
  289. s := httptest.NewServer(stdprometheus.UninstrumentedHandler())
  290. defer s.Close()
  291. scrapeMetrics := func() string {
  292. resp, _ := http.Get(s.URL)
  293. buf, _ := ioutil.ReadAll(resp.Body)
  294. return string(buf)
  295. }
  296. namespace, subsystem, name := config.TestInstrumentationConfig().Namespace, MetricsSubsystem, "peers"
  297. re := regexp.MustCompile(namespace + `_` + subsystem + `_` + name + ` ([0-9\.]+)`)
  298. peersMetricValue := func() float64 {
  299. matches := re.FindStringSubmatch(scrapeMetrics())
  300. f, _ := strconv.ParseFloat(matches[1], 64)
  301. return f
  302. }
  303. p2pMetrics := PrometheusMetrics(namespace)
  304. // make two connected switches
  305. sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch {
  306. // set metrics on sw1
  307. if i == 0 {
  308. opt := WithMetrics(p2pMetrics)
  309. opt(sw)
  310. }
  311. return initSwitchFunc(i, sw)
  312. })
  313. assert.Equal(t, len(sw1.Peers().List()), 1)
  314. assert.EqualValues(t, 1, peersMetricValue())
  315. // send messages to the peer from sw1
  316. p := sw1.Peers().List()[0]
  317. p.Send(0x1, []byte("here's a message to send"))
  318. // stop sw2. this should cause the p to fail,
  319. // which results in calling StopPeerForError internally
  320. sw2.Stop()
  321. // now call StopPeerForError explicitly, eg. from a reactor
  322. sw1.StopPeerForError(p, fmt.Errorf("some err"))
  323. assert.Equal(t, len(sw1.Peers().List()), 0)
  324. assert.EqualValues(t, 0, peersMetricValue())
  325. }
  326. func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
  327. assert, require := assert.New(t), require.New(t)
  328. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  329. err := sw.Start()
  330. if err != nil {
  331. t.Error(err)
  332. }
  333. defer sw.Stop()
  334. // simulate remote peer
  335. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  336. rp.Start()
  337. defer rp.Stop()
  338. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  339. chDescs: sw.chDescs,
  340. onPeerError: sw.StopPeerForError,
  341. persistent: true,
  342. reactorsByCh: sw.reactorsByCh,
  343. })
  344. require.Nil(err)
  345. require.Nil(sw.addPeer(p))
  346. require.NotNil(sw.Peers().Get(rp.ID()))
  347. // simulate failure by closing connection
  348. p.(*peer).CloseConn()
  349. // TODO: remove sleep, detect the disconnection, wait for reconnect
  350. npeers := sw.Peers().Size()
  351. for i := 0; i < 20; i++ {
  352. time.Sleep(250 * time.Millisecond)
  353. npeers = sw.Peers().Size()
  354. if npeers > 0 {
  355. break
  356. }
  357. }
  358. assert.NotZero(npeers)
  359. assert.False(p.IsRunning())
  360. // simulate another remote peer
  361. rp = &remotePeer{
  362. PrivKey: ed25519.GenPrivKey(),
  363. Config: cfg,
  364. // Use different interface to prevent duplicate IP filter, this will break
  365. // beyond two peers.
  366. listenAddr: "127.0.0.1:0",
  367. }
  368. rp.Start()
  369. defer rp.Stop()
  370. // simulate first time dial failure
  371. conf := config.DefaultP2PConfig()
  372. conf.TestDialFail = true
  373. err = sw.addOutboundPeerWithConfig(rp.Addr(), conf, true)
  374. require.NotNil(err)
  375. // DialPeerWithAddres - sw.peerConfig resets the dialer
  376. // TODO: same as above
  377. for i := 0; i < 20; i++ {
  378. time.Sleep(250 * time.Millisecond)
  379. npeers = sw.Peers().Size()
  380. if npeers > 1 {
  381. break
  382. }
  383. }
  384. assert.EqualValues(2, npeers)
  385. }
  386. func TestSwitchFullConnectivity(t *testing.T) {
  387. switches := MakeConnectedSwitches(cfg, 3, initSwitchFunc, Connect2Switches)
  388. defer func() {
  389. for _, sw := range switches {
  390. sw.Stop()
  391. }
  392. }()
  393. for i, sw := range switches {
  394. if sw.Peers().Size() != 2 {
  395. t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
  396. }
  397. }
  398. }
  399. func BenchmarkSwitchBroadcast(b *testing.B) {
  400. s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
  401. // Make bar reactors of bar channels each
  402. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  403. {ID: byte(0x00), Priority: 10},
  404. {ID: byte(0x01), Priority: 10},
  405. }, false))
  406. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  407. {ID: byte(0x02), Priority: 10},
  408. {ID: byte(0x03), Priority: 10},
  409. }, false))
  410. return sw
  411. })
  412. defer s1.Stop()
  413. defer s2.Stop()
  414. // Allow time for goroutines to boot up
  415. time.Sleep(1 * time.Second)
  416. b.ResetTimer()
  417. numSuccess, numFailure := 0, 0
  418. // Send random message from foo channel to another
  419. for i := 0; i < b.N; i++ {
  420. chID := byte(i % 4)
  421. successChan := s1.Broadcast(chID, []byte("test data"))
  422. for s := range successChan {
  423. if s {
  424. numSuccess++
  425. } else {
  426. numFailure++
  427. }
  428. }
  429. }
  430. b.Logf("success: %v, failure: %v", numSuccess, numFailure)
  431. }
  432. type addrBookMock struct {
  433. addrs map[string]struct{}
  434. ourAddrs map[string]struct{}
  435. }
  436. var _ AddrBook = (*addrBookMock)(nil)
  437. func (book *addrBookMock) AddAddress(addr *NetAddress, src *NetAddress) error {
  438. book.addrs[addr.String()] = struct{}{}
  439. return nil
  440. }
  441. func (book *addrBookMock) AddOurAddress(addr *NetAddress) { book.ourAddrs[addr.String()] = struct{}{} }
  442. func (book *addrBookMock) OurAddress(addr *NetAddress) bool {
  443. _, ok := book.ourAddrs[addr.String()]
  444. return ok
  445. }
  446. func (book *addrBookMock) MarkGood(*NetAddress) {}
  447. func (book *addrBookMock) HasAddress(addr *NetAddress) bool {
  448. _, ok := book.addrs[addr.String()]
  449. return ok
  450. }
  451. func (book *addrBookMock) RemoveAddress(addr *NetAddress) {
  452. delete(book.addrs, addr.String())
  453. }
  454. func (book *addrBookMock) Save() {}