You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

669 lines
17 KiB

p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "net"
  9. "net/http"
  10. "net/http/httptest"
  11. "regexp"
  12. "strconv"
  13. "sync"
  14. "testing"
  15. "time"
  16. stdprometheus "github.com/prometheus/client_golang/prometheus"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. "github.com/tendermint/tendermint/config"
  20. "github.com/tendermint/tendermint/crypto/ed25519"
  21. "github.com/tendermint/tendermint/libs/log"
  22. "github.com/tendermint/tendermint/p2p/conn"
  23. )
  24. var (
  25. cfg *config.P2PConfig
  26. )
  27. func init() {
  28. cfg = config.DefaultP2PConfig()
  29. cfg.PexReactor = true
  30. cfg.AllowDuplicateIP = true
  31. }
  32. type PeerMessage struct {
  33. PeerID ID
  34. Bytes []byte
  35. Counter int
  36. }
  37. type TestReactor struct {
  38. BaseReactor
  39. mtx sync.Mutex
  40. channels []*conn.ChannelDescriptor
  41. logMessages bool
  42. msgsCounter int
  43. msgsReceived map[byte][]PeerMessage
  44. }
  45. func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
  46. tr := &TestReactor{
  47. channels: channels,
  48. logMessages: logMessages,
  49. msgsReceived: make(map[byte][]PeerMessage),
  50. }
  51. tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
  52. tr.SetLogger(log.TestingLogger())
  53. return tr
  54. }
  55. func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
  56. return tr.channels
  57. }
  58. func (tr *TestReactor) AddPeer(peer Peer) {}
  59. func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
  60. func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
  61. if tr.logMessages {
  62. tr.mtx.Lock()
  63. defer tr.mtx.Unlock()
  64. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  65. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
  66. tr.msgsCounter++
  67. }
  68. }
  69. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  70. tr.mtx.Lock()
  71. defer tr.mtx.Unlock()
  72. return tr.msgsReceived[chID]
  73. }
  74. //-----------------------------------------------------------------------------
  75. // convenience method for creating two switches connected to each other.
  76. // XXX: note this uses net.Pipe and not a proper TCP conn
  77. func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  78. // Create two switches that will be interconnected.
  79. switches := MakeConnectedSwitches(cfg, 2, initSwitch, Connect2Switches)
  80. return switches[0], switches[1]
  81. }
  82. func initSwitchFunc(i int, sw *Switch) *Switch {
  83. sw.SetAddrBook(&addrBookMock{
  84. addrs: make(map[string]struct{}),
  85. ourAddrs: make(map[string]struct{})})
  86. // Make two reactors of two channels each
  87. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  88. {ID: byte(0x00), Priority: 10},
  89. {ID: byte(0x01), Priority: 10},
  90. }, true))
  91. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  92. {ID: byte(0x02), Priority: 10},
  93. {ID: byte(0x03), Priority: 10},
  94. }, true))
  95. return sw
  96. }
  97. func TestSwitches(t *testing.T) {
  98. s1, s2 := MakeSwitchPair(t, initSwitchFunc)
  99. defer s1.Stop()
  100. defer s2.Stop()
  101. if s1.Peers().Size() != 1 {
  102. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  103. }
  104. if s2.Peers().Size() != 1 {
  105. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  106. }
  107. // Lets send some messages
  108. ch0Msg := []byte("channel zero")
  109. ch1Msg := []byte("channel foo")
  110. ch2Msg := []byte("channel bar")
  111. s1.Broadcast(byte(0x00), ch0Msg)
  112. s1.Broadcast(byte(0x01), ch1Msg)
  113. s1.Broadcast(byte(0x02), ch2Msg)
  114. assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  115. assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  116. assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  117. }
  118. func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
  119. ticker := time.NewTicker(checkPeriod)
  120. for {
  121. select {
  122. case <-ticker.C:
  123. msgs := reactor.getMsgs(channel)
  124. if len(msgs) > 0 {
  125. if !bytes.Equal(msgs[0].Bytes, msgBytes) {
  126. t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
  127. }
  128. return
  129. }
  130. case <-time.After(timeout):
  131. t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
  132. }
  133. }
  134. }
  135. func TestSwitchFiltersOutItself(t *testing.T) {
  136. s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc)
  137. // simulate s1 having a public IP by creating a remote peer with the same ID
  138. rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg}
  139. rp.Start()
  140. // addr should be rejected in addPeer based on the same ID
  141. err := s1.DialPeerWithAddress(rp.Addr())
  142. if assert.Error(t, err) {
  143. if err, ok := err.(ErrRejected); ok {
  144. if !err.IsSelf() {
  145. t.Errorf("expected self to be rejected")
  146. }
  147. } else {
  148. t.Errorf("expected ErrRejected")
  149. }
  150. }
  151. assert.True(t, s1.addrBook.OurAddress(rp.Addr()))
  152. assert.False(t, s1.addrBook.HasAddress(rp.Addr()))
  153. rp.Stop()
  154. assertNoPeersAfterTimeout(t, s1, 100*time.Millisecond)
  155. }
  156. func TestSwitchPeerFilter(t *testing.T) {
  157. var (
  158. filters = []PeerFilterFunc{
  159. func(_ IPeerSet, _ Peer) error { return nil },
  160. func(_ IPeerSet, _ Peer) error { return fmt.Errorf("denied!") },
  161. func(_ IPeerSet, _ Peer) error { return nil },
  162. }
  163. sw = MakeSwitch(
  164. cfg,
  165. 1,
  166. "testing",
  167. "123.123.123",
  168. initSwitchFunc,
  169. SwitchPeerFilters(filters...),
  170. )
  171. )
  172. defer sw.Stop()
  173. // simulate remote peer
  174. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  175. rp.Start()
  176. defer rp.Stop()
  177. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  178. chDescs: sw.chDescs,
  179. onPeerError: sw.StopPeerForError,
  180. isPersistent: sw.isPeerPersistentFn(),
  181. reactorsByCh: sw.reactorsByCh,
  182. })
  183. if err != nil {
  184. t.Fatal(err)
  185. }
  186. err = sw.addPeer(p)
  187. if err, ok := err.(ErrRejected); ok {
  188. if !err.IsFiltered() {
  189. t.Errorf("expected peer to be filtered")
  190. }
  191. } else {
  192. t.Errorf("expected ErrRejected")
  193. }
  194. }
  195. func TestSwitchPeerFilterTimeout(t *testing.T) {
  196. var (
  197. filters = []PeerFilterFunc{
  198. func(_ IPeerSet, _ Peer) error {
  199. time.Sleep(10 * time.Millisecond)
  200. return nil
  201. },
  202. }
  203. sw = MakeSwitch(
  204. cfg,
  205. 1,
  206. "testing",
  207. "123.123.123",
  208. initSwitchFunc,
  209. SwitchFilterTimeout(5*time.Millisecond),
  210. SwitchPeerFilters(filters...),
  211. )
  212. )
  213. defer sw.Stop()
  214. // simulate remote peer
  215. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  216. rp.Start()
  217. defer rp.Stop()
  218. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  219. chDescs: sw.chDescs,
  220. onPeerError: sw.StopPeerForError,
  221. isPersistent: sw.isPeerPersistentFn(),
  222. reactorsByCh: sw.reactorsByCh,
  223. })
  224. if err != nil {
  225. t.Fatal(err)
  226. }
  227. err = sw.addPeer(p)
  228. if _, ok := err.(ErrFilterTimeout); !ok {
  229. t.Errorf("expected ErrFilterTimeout")
  230. }
  231. }
  232. func TestSwitchPeerFilterDuplicate(t *testing.T) {
  233. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  234. sw.Start()
  235. defer sw.Stop()
  236. // simulate remote peer
  237. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  238. rp.Start()
  239. defer rp.Stop()
  240. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  241. chDescs: sw.chDescs,
  242. onPeerError: sw.StopPeerForError,
  243. isPersistent: sw.isPeerPersistentFn(),
  244. reactorsByCh: sw.reactorsByCh,
  245. })
  246. if err != nil {
  247. t.Fatal(err)
  248. }
  249. if err := sw.addPeer(p); err != nil {
  250. t.Fatal(err)
  251. }
  252. err = sw.addPeer(p)
  253. if errRej, ok := err.(ErrRejected); ok {
  254. if !errRej.IsDuplicate() {
  255. t.Errorf("expected peer to be duplicate. got %v", errRej)
  256. }
  257. } else {
  258. t.Errorf("expected ErrRejected, got %v", err)
  259. }
  260. }
  261. func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
  262. time.Sleep(timeout)
  263. if sw.Peers().Size() != 0 {
  264. t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
  265. }
  266. }
  267. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  268. assert, require := assert.New(t), require.New(t)
  269. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  270. err := sw.Start()
  271. if err != nil {
  272. t.Error(err)
  273. }
  274. defer sw.Stop()
  275. // simulate remote peer
  276. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  277. rp.Start()
  278. defer rp.Stop()
  279. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  280. chDescs: sw.chDescs,
  281. onPeerError: sw.StopPeerForError,
  282. isPersistent: sw.isPeerPersistentFn(),
  283. reactorsByCh: sw.reactorsByCh,
  284. })
  285. require.Nil(err)
  286. err = sw.addPeer(p)
  287. require.Nil(err)
  288. require.NotNil(sw.Peers().Get(rp.ID()))
  289. // simulate failure by closing connection
  290. p.(*peer).CloseConn()
  291. assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
  292. assert.False(p.IsRunning())
  293. }
  294. func TestSwitchStopPeerForError(t *testing.T) {
  295. s := httptest.NewServer(stdprometheus.UninstrumentedHandler())
  296. defer s.Close()
  297. scrapeMetrics := func() string {
  298. resp, _ := http.Get(s.URL)
  299. buf, _ := ioutil.ReadAll(resp.Body)
  300. return string(buf)
  301. }
  302. namespace, subsystem, name := config.TestInstrumentationConfig().Namespace, MetricsSubsystem, "peers"
  303. re := regexp.MustCompile(namespace + `_` + subsystem + `_` + name + ` ([0-9\.]+)`)
  304. peersMetricValue := func() float64 {
  305. matches := re.FindStringSubmatch(scrapeMetrics())
  306. f, _ := strconv.ParseFloat(matches[1], 64)
  307. return f
  308. }
  309. p2pMetrics := PrometheusMetrics(namespace)
  310. // make two connected switches
  311. sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch {
  312. // set metrics on sw1
  313. if i == 0 {
  314. opt := WithMetrics(p2pMetrics)
  315. opt(sw)
  316. }
  317. return initSwitchFunc(i, sw)
  318. })
  319. assert.Equal(t, len(sw1.Peers().List()), 1)
  320. assert.EqualValues(t, 1, peersMetricValue())
  321. // send messages to the peer from sw1
  322. p := sw1.Peers().List()[0]
  323. p.Send(0x1, []byte("here's a message to send"))
  324. // stop sw2. this should cause the p to fail,
  325. // which results in calling StopPeerForError internally
  326. sw2.Stop()
  327. // now call StopPeerForError explicitly, eg. from a reactor
  328. sw1.StopPeerForError(p, fmt.Errorf("some err"))
  329. assert.Equal(t, len(sw1.Peers().List()), 0)
  330. assert.EqualValues(t, 0, peersMetricValue())
  331. }
  332. func TestSwitchReconnectsToOutboundPersistentPeer(t *testing.T) {
  333. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  334. err := sw.Start()
  335. require.NoError(t, err)
  336. defer sw.Stop()
  337. // 1. simulate failure by closing connection
  338. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  339. rp.Start()
  340. defer rp.Stop()
  341. err = sw.AddPersistentPeers([]string{rp.Addr().String()})
  342. require.NoError(t, err)
  343. err = sw.DialPeerWithAddress(rp.Addr())
  344. require.Nil(t, err)
  345. require.NotNil(t, sw.Peers().Get(rp.ID()))
  346. p := sw.Peers().List()[0]
  347. p.(*peer).CloseConn()
  348. waitUntilSwitchHasAtLeastNPeers(sw, 1)
  349. assert.False(t, p.IsRunning()) // old peer instance
  350. assert.Equal(t, 1, sw.Peers().Size()) // new peer instance
  351. // 2. simulate first time dial failure
  352. rp = &remotePeer{
  353. PrivKey: ed25519.GenPrivKey(),
  354. Config: cfg,
  355. // Use different interface to prevent duplicate IP filter, this will break
  356. // beyond two peers.
  357. listenAddr: "127.0.0.1:0",
  358. }
  359. rp.Start()
  360. defer rp.Stop()
  361. conf := config.DefaultP2PConfig()
  362. conf.TestDialFail = true // will trigger a reconnect
  363. err = sw.addOutboundPeerWithConfig(rp.Addr(), conf)
  364. require.NotNil(t, err)
  365. // DialPeerWithAddres - sw.peerConfig resets the dialer
  366. waitUntilSwitchHasAtLeastNPeers(sw, 2)
  367. assert.Equal(t, 2, sw.Peers().Size())
  368. }
  369. func TestSwitchReconnectsToInboundPersistentPeer(t *testing.T) {
  370. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  371. err := sw.Start()
  372. require.NoError(t, err)
  373. defer sw.Stop()
  374. // 1. simulate failure by closing the connection
  375. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  376. rp.Start()
  377. defer rp.Stop()
  378. err = sw.AddPersistentPeers([]string{rp.Addr().String()})
  379. require.NoError(t, err)
  380. conn, err := rp.Dial(sw.NetAddress())
  381. require.NoError(t, err)
  382. time.Sleep(50 * time.Millisecond)
  383. require.NotNil(t, sw.Peers().Get(rp.ID()))
  384. conn.Close()
  385. waitUntilSwitchHasAtLeastNPeers(sw, 1)
  386. assert.Equal(t, 1, sw.Peers().Size())
  387. }
  388. func TestSwitchDialPeersAsync(t *testing.T) {
  389. if testing.Short() {
  390. return
  391. }
  392. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  393. err := sw.Start()
  394. require.NoError(t, err)
  395. defer sw.Stop()
  396. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  397. rp.Start()
  398. defer rp.Stop()
  399. err = sw.DialPeersAsync([]string{rp.Addr().String()})
  400. require.NoError(t, err)
  401. time.Sleep(dialRandomizerIntervalMilliseconds * time.Millisecond)
  402. require.NotNil(t, sw.Peers().Get(rp.ID()))
  403. }
  404. func waitUntilSwitchHasAtLeastNPeers(sw *Switch, n int) {
  405. for i := 0; i < 20; i++ {
  406. time.Sleep(250 * time.Millisecond)
  407. has := sw.Peers().Size()
  408. if has >= n {
  409. break
  410. }
  411. }
  412. }
  413. func TestSwitchFullConnectivity(t *testing.T) {
  414. switches := MakeConnectedSwitches(cfg, 3, initSwitchFunc, Connect2Switches)
  415. defer func() {
  416. for _, sw := range switches {
  417. sw.Stop()
  418. }
  419. }()
  420. for i, sw := range switches {
  421. if sw.Peers().Size() != 2 {
  422. t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
  423. }
  424. }
  425. }
  426. func TestSwitchAcceptRoutine(t *testing.T) {
  427. cfg.MaxNumInboundPeers = 5
  428. // make switch
  429. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  430. err := sw.Start()
  431. require.NoError(t, err)
  432. defer sw.Stop()
  433. remotePeers := make([]*remotePeer, 0)
  434. assert.Equal(t, 0, sw.Peers().Size())
  435. // 1. check we connect up to MaxNumInboundPeers
  436. for i := 0; i < cfg.MaxNumInboundPeers; i++ {
  437. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  438. remotePeers = append(remotePeers, rp)
  439. rp.Start()
  440. c, err := rp.Dial(sw.NetAddress())
  441. require.NoError(t, err)
  442. // spawn a reading routine to prevent connection from closing
  443. go func(c net.Conn) {
  444. for {
  445. one := make([]byte, 1)
  446. _, err := c.Read(one)
  447. if err != nil {
  448. return
  449. }
  450. }
  451. }(c)
  452. }
  453. time.Sleep(10 * time.Millisecond)
  454. assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size())
  455. // 2. check we close new connections if we already have MaxNumInboundPeers peers
  456. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  457. rp.Start()
  458. conn, err := rp.Dial(sw.NetAddress())
  459. require.NoError(t, err)
  460. // check conn is closed
  461. one := make([]byte, 1)
  462. conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
  463. _, err = conn.Read(one)
  464. assert.Equal(t, io.EOF, err)
  465. assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size())
  466. rp.Stop()
  467. // stop remote peers
  468. for _, rp := range remotePeers {
  469. rp.Stop()
  470. }
  471. }
  472. type errorTransport struct {
  473. acceptErr error
  474. }
  475. func (et errorTransport) NetAddress() NetAddress {
  476. panic("not implemented")
  477. }
  478. func (et errorTransport) Accept(c peerConfig) (Peer, error) {
  479. return nil, et.acceptErr
  480. }
  481. func (errorTransport) Dial(NetAddress, peerConfig) (Peer, error) {
  482. panic("not implemented")
  483. }
  484. func (errorTransport) Cleanup(Peer) {
  485. panic("not implemented")
  486. }
  487. func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
  488. sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}})
  489. assert.NotPanics(t, func() {
  490. err := sw.Start()
  491. assert.NoError(t, err)
  492. sw.Stop()
  493. })
  494. sw = NewSwitch(cfg, errorTransport{ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true}})
  495. assert.NotPanics(t, func() {
  496. err := sw.Start()
  497. assert.NoError(t, err)
  498. sw.Stop()
  499. })
  500. // TODO(melekes) check we remove our address from addrBook
  501. sw = NewSwitch(cfg, errorTransport{ErrTransportClosed{}})
  502. assert.NotPanics(t, func() {
  503. err := sw.Start()
  504. assert.NoError(t, err)
  505. sw.Stop()
  506. })
  507. }
  508. func BenchmarkSwitchBroadcast(b *testing.B) {
  509. s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
  510. // Make bar reactors of bar channels each
  511. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  512. {ID: byte(0x00), Priority: 10},
  513. {ID: byte(0x01), Priority: 10},
  514. }, false))
  515. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  516. {ID: byte(0x02), Priority: 10},
  517. {ID: byte(0x03), Priority: 10},
  518. }, false))
  519. return sw
  520. })
  521. defer s1.Stop()
  522. defer s2.Stop()
  523. // Allow time for goroutines to boot up
  524. time.Sleep(1 * time.Second)
  525. b.ResetTimer()
  526. numSuccess, numFailure := 0, 0
  527. // Send random message from foo channel to another
  528. for i := 0; i < b.N; i++ {
  529. chID := byte(i % 4)
  530. successChan := s1.Broadcast(chID, []byte("test data"))
  531. for s := range successChan {
  532. if s {
  533. numSuccess++
  534. } else {
  535. numFailure++
  536. }
  537. }
  538. }
  539. b.Logf("success: %v, failure: %v", numSuccess, numFailure)
  540. }
  541. type addrBookMock struct {
  542. addrs map[string]struct{}
  543. ourAddrs map[string]struct{}
  544. }
  545. var _ AddrBook = (*addrBookMock)(nil)
  546. func (book *addrBookMock) AddAddress(addr *NetAddress, src *NetAddress) error {
  547. book.addrs[addr.String()] = struct{}{}
  548. return nil
  549. }
  550. func (book *addrBookMock) AddOurAddress(addr *NetAddress) { book.ourAddrs[addr.String()] = struct{}{} }
  551. func (book *addrBookMock) OurAddress(addr *NetAddress) bool {
  552. _, ok := book.ourAddrs[addr.String()]
  553. return ok
  554. }
  555. func (book *addrBookMock) MarkGood(ID) {}
  556. func (book *addrBookMock) HasAddress(addr *NetAddress) bool {
  557. _, ok := book.addrs[addr.String()]
  558. return ok
  559. }
  560. func (book *addrBookMock) RemoveAddress(addr *NetAddress) {
  561. delete(book.addrs, addr.String())
  562. }
  563. func (book *addrBookMock) Save() {}