You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

739 lines
19 KiB

p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "net"
  9. "net/http"
  10. "net/http/httptest"
  11. "regexp"
  12. "strconv"
  13. "sync"
  14. "sync/atomic"
  15. "testing"
  16. "time"
  17. "github.com/prometheus/client_golang/prometheus/promhttp"
  18. "github.com/stretchr/testify/assert"
  19. "github.com/stretchr/testify/require"
  20. "github.com/tendermint/tendermint/config"
  21. "github.com/tendermint/tendermint/crypto/ed25519"
  22. "github.com/tendermint/tendermint/libs/log"
  23. "github.com/tendermint/tendermint/p2p/conn"
  24. )
  25. var (
  26. cfg *config.P2PConfig
  27. )
  28. func init() {
  29. cfg = config.DefaultP2PConfig()
  30. cfg.PexReactor = true
  31. cfg.AllowDuplicateIP = true
  32. }
  33. type PeerMessage struct {
  34. PeerID ID
  35. Bytes []byte
  36. Counter int
  37. }
  38. type TestReactor struct {
  39. BaseReactor
  40. mtx sync.Mutex
  41. channels []*conn.ChannelDescriptor
  42. logMessages bool
  43. msgsCounter int
  44. msgsReceived map[byte][]PeerMessage
  45. }
  46. func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
  47. tr := &TestReactor{
  48. channels: channels,
  49. logMessages: logMessages,
  50. msgsReceived: make(map[byte][]PeerMessage),
  51. }
  52. tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
  53. tr.SetLogger(log.TestingLogger())
  54. return tr
  55. }
  56. func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
  57. return tr.channels
  58. }
  59. func (tr *TestReactor) AddPeer(peer Peer) {}
  60. func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
  61. func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
  62. if tr.logMessages {
  63. tr.mtx.Lock()
  64. defer tr.mtx.Unlock()
  65. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  66. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
  67. tr.msgsCounter++
  68. }
  69. }
  70. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  71. tr.mtx.Lock()
  72. defer tr.mtx.Unlock()
  73. return tr.msgsReceived[chID]
  74. }
  75. //-----------------------------------------------------------------------------
  76. // convenience method for creating two switches connected to each other.
  77. // XXX: note this uses net.Pipe and not a proper TCP conn
  78. func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  79. // Create two switches that will be interconnected.
  80. switches := MakeConnectedSwitches(cfg, 2, initSwitch, Connect2Switches)
  81. return switches[0], switches[1]
  82. }
  83. func initSwitchFunc(i int, sw *Switch) *Switch {
  84. sw.SetAddrBook(&addrBookMock{
  85. addrs: make(map[string]struct{}),
  86. ourAddrs: make(map[string]struct{})})
  87. // Make two reactors of two channels each
  88. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  89. {ID: byte(0x00), Priority: 10},
  90. {ID: byte(0x01), Priority: 10},
  91. }, true))
  92. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  93. {ID: byte(0x02), Priority: 10},
  94. {ID: byte(0x03), Priority: 10},
  95. }, true))
  96. return sw
  97. }
  98. func TestSwitches(t *testing.T) {
  99. s1, s2 := MakeSwitchPair(t, initSwitchFunc)
  100. defer s1.Stop()
  101. defer s2.Stop()
  102. if s1.Peers().Size() != 1 {
  103. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  104. }
  105. if s2.Peers().Size() != 1 {
  106. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  107. }
  108. // Lets send some messages
  109. ch0Msg := []byte("channel zero")
  110. ch1Msg := []byte("channel foo")
  111. ch2Msg := []byte("channel bar")
  112. s1.Broadcast(byte(0x00), ch0Msg)
  113. s1.Broadcast(byte(0x01), ch1Msg)
  114. s1.Broadcast(byte(0x02), ch2Msg)
  115. assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  116. assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  117. assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  118. }
  119. func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
  120. ticker := time.NewTicker(checkPeriod)
  121. for {
  122. select {
  123. case <-ticker.C:
  124. msgs := reactor.getMsgs(channel)
  125. if len(msgs) > 0 {
  126. if !bytes.Equal(msgs[0].Bytes, msgBytes) {
  127. t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
  128. }
  129. return
  130. }
  131. case <-time.After(timeout):
  132. t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
  133. }
  134. }
  135. }
  136. func TestSwitchFiltersOutItself(t *testing.T) {
  137. s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc)
  138. // simulate s1 having a public IP by creating a remote peer with the same ID
  139. rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg}
  140. rp.Start()
  141. // addr should be rejected in addPeer based on the same ID
  142. err := s1.DialPeerWithAddress(rp.Addr())
  143. if assert.Error(t, err) {
  144. if err, ok := err.(ErrRejected); ok {
  145. if !err.IsSelf() {
  146. t.Errorf("expected self to be rejected")
  147. }
  148. } else {
  149. t.Errorf("expected ErrRejected")
  150. }
  151. }
  152. assert.True(t, s1.addrBook.OurAddress(rp.Addr()))
  153. assert.False(t, s1.addrBook.HasAddress(rp.Addr()))
  154. rp.Stop()
  155. assertNoPeersAfterTimeout(t, s1, 100*time.Millisecond)
  156. }
  157. func TestSwitchPeerFilter(t *testing.T) {
  158. var (
  159. filters = []PeerFilterFunc{
  160. func(_ IPeerSet, _ Peer) error { return nil },
  161. func(_ IPeerSet, _ Peer) error { return fmt.Errorf("denied!") },
  162. func(_ IPeerSet, _ Peer) error { return nil },
  163. }
  164. sw = MakeSwitch(
  165. cfg,
  166. 1,
  167. "testing",
  168. "123.123.123",
  169. initSwitchFunc,
  170. SwitchPeerFilters(filters...),
  171. )
  172. )
  173. defer sw.Stop()
  174. // simulate remote peer
  175. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  176. rp.Start()
  177. defer rp.Stop()
  178. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  179. chDescs: sw.chDescs,
  180. onPeerError: sw.StopPeerForError,
  181. isPersistent: sw.isPeerPersistentFn(),
  182. reactorsByCh: sw.reactorsByCh,
  183. })
  184. if err != nil {
  185. t.Fatal(err)
  186. }
  187. err = sw.addPeer(p)
  188. if err, ok := err.(ErrRejected); ok {
  189. if !err.IsFiltered() {
  190. t.Errorf("expected peer to be filtered")
  191. }
  192. } else {
  193. t.Errorf("expected ErrRejected")
  194. }
  195. }
  196. func TestSwitchPeerFilterTimeout(t *testing.T) {
  197. var (
  198. filters = []PeerFilterFunc{
  199. func(_ IPeerSet, _ Peer) error {
  200. time.Sleep(10 * time.Millisecond)
  201. return nil
  202. },
  203. }
  204. sw = MakeSwitch(
  205. cfg,
  206. 1,
  207. "testing",
  208. "123.123.123",
  209. initSwitchFunc,
  210. SwitchFilterTimeout(5*time.Millisecond),
  211. SwitchPeerFilters(filters...),
  212. )
  213. )
  214. defer sw.Stop()
  215. // simulate remote peer
  216. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  217. rp.Start()
  218. defer rp.Stop()
  219. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  220. chDescs: sw.chDescs,
  221. onPeerError: sw.StopPeerForError,
  222. isPersistent: sw.isPeerPersistentFn(),
  223. reactorsByCh: sw.reactorsByCh,
  224. })
  225. if err != nil {
  226. t.Fatal(err)
  227. }
  228. err = sw.addPeer(p)
  229. if _, ok := err.(ErrFilterTimeout); !ok {
  230. t.Errorf("expected ErrFilterTimeout")
  231. }
  232. }
  233. func TestSwitchPeerFilterDuplicate(t *testing.T) {
  234. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  235. sw.Start()
  236. defer sw.Stop()
  237. // simulate remote peer
  238. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  239. rp.Start()
  240. defer rp.Stop()
  241. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  242. chDescs: sw.chDescs,
  243. onPeerError: sw.StopPeerForError,
  244. isPersistent: sw.isPeerPersistentFn(),
  245. reactorsByCh: sw.reactorsByCh,
  246. })
  247. if err != nil {
  248. t.Fatal(err)
  249. }
  250. if err := sw.addPeer(p); err != nil {
  251. t.Fatal(err)
  252. }
  253. err = sw.addPeer(p)
  254. if errRej, ok := err.(ErrRejected); ok {
  255. if !errRej.IsDuplicate() {
  256. t.Errorf("expected peer to be duplicate. got %v", errRej)
  257. }
  258. } else {
  259. t.Errorf("expected ErrRejected, got %v", err)
  260. }
  261. }
  262. func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
  263. time.Sleep(timeout)
  264. if sw.Peers().Size() != 0 {
  265. t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
  266. }
  267. }
  268. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  269. assert, require := assert.New(t), require.New(t)
  270. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  271. err := sw.Start()
  272. if err != nil {
  273. t.Error(err)
  274. }
  275. defer sw.Stop()
  276. // simulate remote peer
  277. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  278. rp.Start()
  279. defer rp.Stop()
  280. p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
  281. chDescs: sw.chDescs,
  282. onPeerError: sw.StopPeerForError,
  283. isPersistent: sw.isPeerPersistentFn(),
  284. reactorsByCh: sw.reactorsByCh,
  285. })
  286. require.Nil(err)
  287. err = sw.addPeer(p)
  288. require.Nil(err)
  289. require.NotNil(sw.Peers().Get(rp.ID()))
  290. // simulate failure by closing connection
  291. p.(*peer).CloseConn()
  292. assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
  293. assert.False(p.IsRunning())
  294. }
  295. func TestSwitchStopPeerForError(t *testing.T) {
  296. s := httptest.NewServer(promhttp.Handler())
  297. defer s.Close()
  298. scrapeMetrics := func() string {
  299. resp, err := http.Get(s.URL)
  300. if err != nil {
  301. t.Errorf("error on GET request: %v", err)
  302. }
  303. defer resp.Body.Close()
  304. buf, _ := ioutil.ReadAll(resp.Body)
  305. return string(buf)
  306. }
  307. namespace, subsystem, name := config.TestInstrumentationConfig().Namespace, MetricsSubsystem, "peers"
  308. re := regexp.MustCompile(namespace + `_` + subsystem + `_` + name + ` ([0-9\.]+)`)
  309. peersMetricValue := func() float64 {
  310. matches := re.FindStringSubmatch(scrapeMetrics())
  311. f, _ := strconv.ParseFloat(matches[1], 64)
  312. return f
  313. }
  314. p2pMetrics := PrometheusMetrics(namespace)
  315. // make two connected switches
  316. sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch {
  317. // set metrics on sw1
  318. if i == 0 {
  319. opt := WithMetrics(p2pMetrics)
  320. opt(sw)
  321. }
  322. return initSwitchFunc(i, sw)
  323. })
  324. assert.Equal(t, len(sw1.Peers().List()), 1)
  325. assert.EqualValues(t, 1, peersMetricValue())
  326. // send messages to the peer from sw1
  327. p := sw1.Peers().List()[0]
  328. p.Send(0x1, []byte("here's a message to send"))
  329. // stop sw2. this should cause the p to fail,
  330. // which results in calling StopPeerForError internally
  331. sw2.Stop()
  332. // now call StopPeerForError explicitly, eg. from a reactor
  333. sw1.StopPeerForError(p, fmt.Errorf("some err"))
  334. assert.Equal(t, len(sw1.Peers().List()), 0)
  335. assert.EqualValues(t, 0, peersMetricValue())
  336. }
  337. func TestSwitchReconnectsToOutboundPersistentPeer(t *testing.T) {
  338. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  339. err := sw.Start()
  340. require.NoError(t, err)
  341. defer sw.Stop()
  342. // 1. simulate failure by closing connection
  343. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  344. rp.Start()
  345. defer rp.Stop()
  346. err = sw.AddPersistentPeers([]string{rp.Addr().String()})
  347. require.NoError(t, err)
  348. err = sw.DialPeerWithAddress(rp.Addr())
  349. require.Nil(t, err)
  350. require.NotNil(t, sw.Peers().Get(rp.ID()))
  351. p := sw.Peers().List()[0]
  352. p.(*peer).CloseConn()
  353. waitUntilSwitchHasAtLeastNPeers(sw, 1)
  354. assert.False(t, p.IsRunning()) // old peer instance
  355. assert.Equal(t, 1, sw.Peers().Size()) // new peer instance
  356. // 2. simulate first time dial failure
  357. rp = &remotePeer{
  358. PrivKey: ed25519.GenPrivKey(),
  359. Config: cfg,
  360. // Use different interface to prevent duplicate IP filter, this will break
  361. // beyond two peers.
  362. listenAddr: "127.0.0.1:0",
  363. }
  364. rp.Start()
  365. defer rp.Stop()
  366. conf := config.DefaultP2PConfig()
  367. conf.TestDialFail = true // will trigger a reconnect
  368. err = sw.addOutboundPeerWithConfig(rp.Addr(), conf)
  369. require.NotNil(t, err)
  370. // DialPeerWithAddres - sw.peerConfig resets the dialer
  371. waitUntilSwitchHasAtLeastNPeers(sw, 2)
  372. assert.Equal(t, 2, sw.Peers().Size())
  373. }
  374. func TestSwitchReconnectsToInboundPersistentPeer(t *testing.T) {
  375. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  376. err := sw.Start()
  377. require.NoError(t, err)
  378. defer sw.Stop()
  379. // 1. simulate failure by closing the connection
  380. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  381. rp.Start()
  382. defer rp.Stop()
  383. err = sw.AddPersistentPeers([]string{rp.Addr().String()})
  384. require.NoError(t, err)
  385. conn, err := rp.Dial(sw.NetAddress())
  386. require.NoError(t, err)
  387. time.Sleep(50 * time.Millisecond)
  388. require.NotNil(t, sw.Peers().Get(rp.ID()))
  389. conn.Close()
  390. waitUntilSwitchHasAtLeastNPeers(sw, 1)
  391. assert.Equal(t, 1, sw.Peers().Size())
  392. }
  393. func TestSwitchDialPeersAsync(t *testing.T) {
  394. if testing.Short() {
  395. return
  396. }
  397. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  398. err := sw.Start()
  399. require.NoError(t, err)
  400. defer sw.Stop()
  401. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  402. rp.Start()
  403. defer rp.Stop()
  404. err = sw.DialPeersAsync([]string{rp.Addr().String()})
  405. require.NoError(t, err)
  406. time.Sleep(dialRandomizerIntervalMilliseconds * time.Millisecond)
  407. require.NotNil(t, sw.Peers().Get(rp.ID()))
  408. }
  409. func waitUntilSwitchHasAtLeastNPeers(sw *Switch, n int) {
  410. for i := 0; i < 20; i++ {
  411. time.Sleep(250 * time.Millisecond)
  412. has := sw.Peers().Size()
  413. if has >= n {
  414. break
  415. }
  416. }
  417. }
  418. func TestSwitchFullConnectivity(t *testing.T) {
  419. switches := MakeConnectedSwitches(cfg, 3, initSwitchFunc, Connect2Switches)
  420. defer func() {
  421. for _, sw := range switches {
  422. sw.Stop()
  423. }
  424. }()
  425. for i, sw := range switches {
  426. if sw.Peers().Size() != 2 {
  427. t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
  428. }
  429. }
  430. }
  431. func TestSwitchAcceptRoutine(t *testing.T) {
  432. cfg.MaxNumInboundPeers = 5
  433. // make switch
  434. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  435. err := sw.Start()
  436. require.NoError(t, err)
  437. defer sw.Stop()
  438. remotePeers := make([]*remotePeer, 0)
  439. assert.Equal(t, 0, sw.Peers().Size())
  440. // 1. check we connect up to MaxNumInboundPeers
  441. for i := 0; i < cfg.MaxNumInboundPeers; i++ {
  442. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  443. remotePeers = append(remotePeers, rp)
  444. rp.Start()
  445. c, err := rp.Dial(sw.NetAddress())
  446. require.NoError(t, err)
  447. // spawn a reading routine to prevent connection from closing
  448. go func(c net.Conn) {
  449. for {
  450. one := make([]byte, 1)
  451. _, err := c.Read(one)
  452. if err != nil {
  453. return
  454. }
  455. }
  456. }(c)
  457. }
  458. time.Sleep(10 * time.Millisecond)
  459. assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size())
  460. // 2. check we close new connections if we already have MaxNumInboundPeers peers
  461. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  462. rp.Start()
  463. conn, err := rp.Dial(sw.NetAddress())
  464. require.NoError(t, err)
  465. // check conn is closed
  466. one := make([]byte, 1)
  467. conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
  468. _, err = conn.Read(one)
  469. assert.Equal(t, io.EOF, err)
  470. assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size())
  471. rp.Stop()
  472. // stop remote peers
  473. for _, rp := range remotePeers {
  474. rp.Stop()
  475. }
  476. }
  477. type errorTransport struct {
  478. acceptErr error
  479. }
  480. func (et errorTransport) NetAddress() NetAddress {
  481. panic("not implemented")
  482. }
  483. func (et errorTransport) Accept(c peerConfig) (Peer, error) {
  484. return nil, et.acceptErr
  485. }
  486. func (errorTransport) Dial(NetAddress, peerConfig) (Peer, error) {
  487. panic("not implemented")
  488. }
  489. func (errorTransport) Cleanup(Peer) {
  490. panic("not implemented")
  491. }
  492. func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
  493. sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}})
  494. assert.NotPanics(t, func() {
  495. err := sw.Start()
  496. assert.NoError(t, err)
  497. sw.Stop()
  498. })
  499. sw = NewSwitch(cfg, errorTransport{ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true}})
  500. assert.NotPanics(t, func() {
  501. err := sw.Start()
  502. assert.NoError(t, err)
  503. sw.Stop()
  504. })
  505. // TODO(melekes) check we remove our address from addrBook
  506. sw = NewSwitch(cfg, errorTransport{ErrTransportClosed{}})
  507. assert.NotPanics(t, func() {
  508. err := sw.Start()
  509. assert.NoError(t, err)
  510. sw.Stop()
  511. })
  512. }
  513. // mockReactor checks that InitPeer never called before RemovePeer. If that's
  514. // not true, InitCalledBeforeRemoveFinished will return true.
  515. type mockReactor struct {
  516. *BaseReactor
  517. // atomic
  518. removePeerInProgress uint32
  519. initCalledBeforeRemoveFinished uint32
  520. }
  521. func (r *mockReactor) RemovePeer(peer Peer, reason interface{}) {
  522. atomic.StoreUint32(&r.removePeerInProgress, 1)
  523. defer atomic.StoreUint32(&r.removePeerInProgress, 0)
  524. time.Sleep(100 * time.Millisecond)
  525. }
  526. func (r *mockReactor) InitPeer(peer Peer) Peer {
  527. if atomic.LoadUint32(&r.removePeerInProgress) == 1 {
  528. atomic.StoreUint32(&r.initCalledBeforeRemoveFinished, 1)
  529. }
  530. return peer
  531. }
  532. func (r *mockReactor) InitCalledBeforeRemoveFinished() bool {
  533. return atomic.LoadUint32(&r.initCalledBeforeRemoveFinished) == 1
  534. }
  535. // see stopAndRemovePeer
  536. func TestSwitchInitPeerIsNotCalledBeforeRemovePeer(t *testing.T) {
  537. // make reactor
  538. reactor := &mockReactor{}
  539. reactor.BaseReactor = NewBaseReactor("mockReactor", reactor)
  540. // make switch
  541. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", func(i int, sw *Switch) *Switch {
  542. sw.AddReactor("mock", reactor)
  543. return sw
  544. })
  545. err := sw.Start()
  546. require.NoError(t, err)
  547. defer sw.Stop()
  548. // add peer
  549. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  550. rp.Start()
  551. defer rp.Stop()
  552. _, err = rp.Dial(sw.NetAddress())
  553. require.NoError(t, err)
  554. // wait till the switch adds rp to the peer set
  555. time.Sleep(50 * time.Millisecond)
  556. // stop peer asynchronously
  557. go sw.StopPeerForError(sw.Peers().Get(rp.ID()), "test")
  558. // simulate peer reconnecting to us
  559. _, err = rp.Dial(sw.NetAddress())
  560. require.NoError(t, err)
  561. // wait till the switch adds rp to the peer set
  562. time.Sleep(50 * time.Millisecond)
  563. // make sure reactor.RemovePeer is finished before InitPeer is called
  564. assert.False(t, reactor.InitCalledBeforeRemoveFinished())
  565. }
  566. func BenchmarkSwitchBroadcast(b *testing.B) {
  567. s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
  568. // Make bar reactors of bar channels each
  569. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  570. {ID: byte(0x00), Priority: 10},
  571. {ID: byte(0x01), Priority: 10},
  572. }, false))
  573. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  574. {ID: byte(0x02), Priority: 10},
  575. {ID: byte(0x03), Priority: 10},
  576. }, false))
  577. return sw
  578. })
  579. defer s1.Stop()
  580. defer s2.Stop()
  581. // Allow time for goroutines to boot up
  582. time.Sleep(1 * time.Second)
  583. b.ResetTimer()
  584. numSuccess, numFailure := 0, 0
  585. // Send random message from foo channel to another
  586. for i := 0; i < b.N; i++ {
  587. chID := byte(i % 4)
  588. successChan := s1.Broadcast(chID, []byte("test data"))
  589. for s := range successChan {
  590. if s {
  591. numSuccess++
  592. } else {
  593. numFailure++
  594. }
  595. }
  596. }
  597. b.Logf("success: %v, failure: %v", numSuccess, numFailure)
  598. }
  599. type addrBookMock struct {
  600. addrs map[string]struct{}
  601. ourAddrs map[string]struct{}
  602. }
  603. var _ AddrBook = (*addrBookMock)(nil)
  604. func (book *addrBookMock) AddAddress(addr *NetAddress, src *NetAddress) error {
  605. book.addrs[addr.String()] = struct{}{}
  606. return nil
  607. }
  608. func (book *addrBookMock) AddOurAddress(addr *NetAddress) { book.ourAddrs[addr.String()] = struct{}{} }
  609. func (book *addrBookMock) OurAddress(addr *NetAddress) bool {
  610. _, ok := book.ourAddrs[addr.String()]
  611. return ok
  612. }
  613. func (book *addrBookMock) MarkGood(ID) {}
  614. func (book *addrBookMock) HasAddress(addr *NetAddress) bool {
  615. _, ok := book.addrs[addr.String()]
  616. return ok
  617. }
  618. func (book *addrBookMock) RemoveAddress(addr *NetAddress) {
  619. delete(book.addrs, addr.String())
  620. }
  621. func (book *addrBookMock) Save() {}