You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1653 lines
56 KiB

  1. package p2p_test
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "testing"
  7. "time"
  8. "github.com/fortytw2/leaktest"
  9. "github.com/stretchr/testify/require"
  10. dbm "github.com/tendermint/tm-db"
  11. "github.com/tendermint/tendermint/p2p"
  12. )
  13. // FIXME: We should probably have some randomized property-based tests for the
  14. // PeerManager too, which runs a bunch of random operations with random peers
  15. // and ensures certain invariants always hold. The logic can be complex, with
  16. // many interactions, and it's hard to cover all scenarios with handwritten
  17. // tests.
  18. func TestPeerManagerOptions_Validate(t *testing.T) {
  19. nodeID := p2p.NodeID("00112233445566778899aabbccddeeff00112233")
  20. testcases := map[string]struct {
  21. options p2p.PeerManagerOptions
  22. ok bool
  23. }{
  24. "zero options is valid": {p2p.PeerManagerOptions{}, true},
  25. // PersistentPeers
  26. "valid PersistentPeers NodeID": {p2p.PeerManagerOptions{
  27. PersistentPeers: []p2p.NodeID{"00112233445566778899aabbccddeeff00112233"},
  28. }, true},
  29. "invalid PersistentPeers NodeID": {p2p.PeerManagerOptions{
  30. PersistentPeers: []p2p.NodeID{"foo"},
  31. }, false},
  32. "uppercase PersistentPeers NodeID": {p2p.PeerManagerOptions{
  33. PersistentPeers: []p2p.NodeID{"00112233445566778899AABBCCDDEEFF00112233"},
  34. }, false},
  35. "PersistentPeers at MaxConnected": {p2p.PeerManagerOptions{
  36. PersistentPeers: []p2p.NodeID{nodeID, nodeID, nodeID},
  37. MaxConnected: 3,
  38. }, true},
  39. "PersistentPeers above MaxConnected": {p2p.PeerManagerOptions{
  40. PersistentPeers: []p2p.NodeID{nodeID, nodeID, nodeID},
  41. MaxConnected: 2,
  42. }, false},
  43. "PersistentPeers above MaxConnected below MaxConnectedUpgrade": {p2p.PeerManagerOptions{
  44. PersistentPeers: []p2p.NodeID{nodeID, nodeID, nodeID},
  45. MaxConnected: 2,
  46. MaxConnectedUpgrade: 2,
  47. }, false},
  48. // MaxPeers
  49. "MaxPeers without MaxConnected": {p2p.PeerManagerOptions{
  50. MaxPeers: 3,
  51. }, false},
  52. "MaxPeers below MaxConnected+MaxConnectedUpgrade": {p2p.PeerManagerOptions{
  53. MaxPeers: 2,
  54. MaxConnected: 2,
  55. MaxConnectedUpgrade: 1,
  56. }, false},
  57. "MaxPeers at MaxConnected+MaxConnectedUpgrade": {p2p.PeerManagerOptions{
  58. MaxPeers: 3,
  59. MaxConnected: 2,
  60. MaxConnectedUpgrade: 1,
  61. }, true},
  62. // MaxRetryTime
  63. "MaxRetryTime below MinRetryTime": {p2p.PeerManagerOptions{
  64. MinRetryTime: 7 * time.Second,
  65. MaxRetryTime: 5 * time.Second,
  66. }, false},
  67. "MaxRetryTime at MinRetryTime": {p2p.PeerManagerOptions{
  68. MinRetryTime: 5 * time.Second,
  69. MaxRetryTime: 5 * time.Second,
  70. }, true},
  71. "MaxRetryTime without MinRetryTime": {p2p.PeerManagerOptions{
  72. MaxRetryTime: 5 * time.Second,
  73. }, false},
  74. // MaxRetryTimePersistent
  75. "MaxRetryTimePersistent below MinRetryTime": {p2p.PeerManagerOptions{
  76. MinRetryTime: 7 * time.Second,
  77. MaxRetryTimePersistent: 5 * time.Second,
  78. }, false},
  79. "MaxRetryTimePersistent at MinRetryTime": {p2p.PeerManagerOptions{
  80. MinRetryTime: 5 * time.Second,
  81. MaxRetryTimePersistent: 5 * time.Second,
  82. }, true},
  83. "MaxRetryTimePersistent without MinRetryTime": {p2p.PeerManagerOptions{
  84. MaxRetryTimePersistent: 5 * time.Second,
  85. }, false},
  86. }
  87. for name, tc := range testcases {
  88. tc := tc
  89. t.Run(name, func(t *testing.T) {
  90. err := tc.options.Validate()
  91. if tc.ok {
  92. require.NoError(t, err)
  93. } else {
  94. require.Error(t, err)
  95. }
  96. })
  97. }
  98. }
  99. func TestNewPeerManager(t *testing.T) {
  100. // Zero options should be valid.
  101. _, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  102. require.NoError(t, err)
  103. // Invalid options should error.
  104. _, err = p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  105. PersistentPeers: []p2p.NodeID{"foo"},
  106. })
  107. require.Error(t, err)
  108. // Invalid database should error.
  109. _, err = p2p.NewPeerManager(selfID, nil, p2p.PeerManagerOptions{})
  110. require.Error(t, err)
  111. // Empty self ID should error.
  112. _, err = p2p.NewPeerManager("", nil, p2p.PeerManagerOptions{})
  113. require.Error(t, err)
  114. }
  115. func TestNewPeerManager_Persistence(t *testing.T) {
  116. aID := p2p.NodeID(strings.Repeat("a", 40))
  117. aAddresses := []p2p.NodeAddress{
  118. {Protocol: "tcp", NodeID: aID, Hostname: "127.0.0.1", Port: 26657, Path: "/path"},
  119. {Protocol: "memory", NodeID: aID},
  120. }
  121. bID := p2p.NodeID(strings.Repeat("b", 40))
  122. bAddresses := []p2p.NodeAddress{
  123. {Protocol: "tcp", NodeID: bID, Hostname: "b10c::1", Port: 26657, Path: "/path"},
  124. {Protocol: "memory", NodeID: bID},
  125. }
  126. cID := p2p.NodeID(strings.Repeat("c", 40))
  127. cAddresses := []p2p.NodeAddress{
  128. {Protocol: "tcp", NodeID: cID, Hostname: "host.domain", Port: 80},
  129. {Protocol: "memory", NodeID: cID},
  130. }
  131. // Create an initial peer manager and add the peers.
  132. db := dbm.NewMemDB()
  133. peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{
  134. PersistentPeers: []p2p.NodeID{aID},
  135. PeerScores: map[p2p.NodeID]p2p.PeerScore{bID: 1},
  136. })
  137. require.NoError(t, err)
  138. defer peerManager.Close()
  139. for _, addr := range append(append(aAddresses, bAddresses...), cAddresses...) {
  140. require.NoError(t, peerManager.Add(addr))
  141. }
  142. require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
  143. require.ElementsMatch(t, bAddresses, peerManager.Addresses(bID))
  144. require.ElementsMatch(t, cAddresses, peerManager.Addresses(cID))
  145. require.Equal(t, map[p2p.NodeID]p2p.PeerScore{
  146. aID: p2p.PeerScorePersistent,
  147. bID: 1,
  148. cID: 0,
  149. }, peerManager.Scores())
  150. peerManager.Close()
  151. // Creating a new peer manager with the same database should retain the
  152. // peers, but they should have updated scores from the new PersistentPeers
  153. // configuration.
  154. peerManager, err = p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{
  155. PersistentPeers: []p2p.NodeID{bID},
  156. PeerScores: map[p2p.NodeID]p2p.PeerScore{cID: 1},
  157. })
  158. require.NoError(t, err)
  159. defer peerManager.Close()
  160. require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
  161. require.ElementsMatch(t, bAddresses, peerManager.Addresses(bID))
  162. require.ElementsMatch(t, cAddresses, peerManager.Addresses(cID))
  163. require.Equal(t, map[p2p.NodeID]p2p.PeerScore{
  164. aID: 0,
  165. bID: p2p.PeerScorePersistent,
  166. cID: 1,
  167. }, peerManager.Scores())
  168. }
  169. func TestNewPeerManager_SelfIDChange(t *testing.T) {
  170. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  171. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  172. db := dbm.NewMemDB()
  173. peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
  174. require.NoError(t, err)
  175. require.NoError(t, peerManager.Add(a))
  176. require.NoError(t, peerManager.Add(b))
  177. require.ElementsMatch(t, []p2p.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
  178. peerManager.Close()
  179. // If we change our selfID to one of the peers in the peer store, it
  180. // should be removed from the store.
  181. peerManager, err = p2p.NewPeerManager(a.NodeID, db, p2p.PeerManagerOptions{})
  182. require.NoError(t, err)
  183. require.Equal(t, []p2p.NodeID{b.NodeID}, peerManager.Peers())
  184. }
  185. func TestPeerManager_Add(t *testing.T) {
  186. aID := p2p.NodeID(strings.Repeat("a", 40))
  187. bID := p2p.NodeID(strings.Repeat("b", 40))
  188. cID := p2p.NodeID(strings.Repeat("c", 40))
  189. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  190. PersistentPeers: []p2p.NodeID{aID, cID},
  191. MaxPeers: 2,
  192. MaxConnected: 2,
  193. })
  194. require.NoError(t, err)
  195. // Adding a couple of addresses should work.
  196. aAddresses := []p2p.NodeAddress{
  197. {Protocol: "tcp", NodeID: aID, Hostname: "localhost"},
  198. {Protocol: "memory", NodeID: aID},
  199. }
  200. for _, addr := range aAddresses {
  201. err = peerManager.Add(addr)
  202. require.NoError(t, err)
  203. }
  204. require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
  205. // Adding a different peer should be fine.
  206. bAddress := p2p.NodeAddress{Protocol: "tcp", NodeID: bID, Hostname: "localhost"}
  207. require.NoError(t, peerManager.Add(bAddress))
  208. require.Equal(t, []p2p.NodeAddress{bAddress}, peerManager.Addresses(bID))
  209. require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
  210. // Adding an existing address again should be a noop.
  211. require.NoError(t, peerManager.Add(aAddresses[0]))
  212. require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
  213. // Adding a third peer with MaxPeers=2 should cause bID, which is
  214. // the lowest-scored peer (not in PersistentPeers), to be removed.
  215. require.NoError(t, peerManager.Add(p2p.NodeAddress{
  216. Protocol: "tcp", NodeID: cID, Hostname: "localhost"}))
  217. require.ElementsMatch(t, []p2p.NodeID{aID, cID}, peerManager.Peers())
  218. // Adding an invalid address should error.
  219. require.Error(t, peerManager.Add(p2p.NodeAddress{Path: "foo"}))
  220. // Adding self should error
  221. require.Error(t, peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: selfID}))
  222. }
  223. func TestPeerManager_DialNext(t *testing.T) {
  224. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  225. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  226. require.NoError(t, err)
  227. // Add an address. DialNext should return it.
  228. require.NoError(t, peerManager.Add(a))
  229. address, err := peerManager.DialNext(ctx)
  230. require.NoError(t, err)
  231. require.Equal(t, a, address)
  232. // Since there are no more undialed peers, the next call should block
  233. // until it times out.
  234. timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
  235. defer cancel()
  236. _, err = peerManager.DialNext(timeoutCtx)
  237. require.Error(t, err)
  238. require.Equal(t, context.DeadlineExceeded, err)
  239. }
  240. func TestPeerManager_DialNext_Retry(t *testing.T) {
  241. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  242. options := p2p.PeerManagerOptions{
  243. MinRetryTime: 100 * time.Millisecond,
  244. MaxRetryTime: 500 * time.Millisecond,
  245. }
  246. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), options)
  247. require.NoError(t, err)
  248. require.NoError(t, peerManager.Add(a))
  249. // Do five dial retries (six dials total). The retry time should double for
  250. // each failure. At the forth retry, MaxRetryTime should kick in.
  251. ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
  252. defer cancel()
  253. for i := 0; i <= 5; i++ {
  254. start := time.Now()
  255. dial, err := peerManager.DialNext(ctx)
  256. require.NoError(t, err)
  257. require.Equal(t, a, dial)
  258. elapsed := time.Since(start).Round(time.Millisecond)
  259. switch i {
  260. case 0:
  261. require.LessOrEqual(t, elapsed, options.MinRetryTime)
  262. case 1:
  263. require.GreaterOrEqual(t, elapsed, options.MinRetryTime)
  264. case 2:
  265. require.GreaterOrEqual(t, elapsed, 2*options.MinRetryTime)
  266. case 3:
  267. require.GreaterOrEqual(t, elapsed, 4*options.MinRetryTime)
  268. case 4, 5:
  269. require.GreaterOrEqual(t, elapsed, options.MaxRetryTime)
  270. require.LessOrEqual(t, elapsed, 8*options.MinRetryTime)
  271. default:
  272. require.Fail(t, "unexpected retry")
  273. }
  274. require.NoError(t, peerManager.DialFailed(a))
  275. }
  276. }
  277. func TestPeerManager_DialNext_WakeOnAdd(t *testing.T) {
  278. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  279. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  280. require.NoError(t, err)
  281. // Spawn a goroutine to add a peer after a delay.
  282. go func() {
  283. time.Sleep(200 * time.Millisecond)
  284. require.NoError(t, peerManager.Add(a))
  285. }()
  286. // This will block until peer is added above.
  287. ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
  288. defer cancel()
  289. dial, err := peerManager.DialNext(ctx)
  290. require.NoError(t, err)
  291. require.Equal(t, a, dial)
  292. }
  293. func TestPeerManager_DialNext_WakeOnDialFailed(t *testing.T) {
  294. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  295. MaxConnected: 1,
  296. })
  297. require.NoError(t, err)
  298. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  299. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  300. // Add and dial a.
  301. require.NoError(t, peerManager.Add(a))
  302. dial, err := peerManager.TryDialNext()
  303. require.NoError(t, err)
  304. require.Equal(t, a, dial)
  305. // Add b. We shouldn't be able to dial it, due to MaxConnected.
  306. require.NoError(t, peerManager.Add(b))
  307. dial, err = peerManager.TryDialNext()
  308. require.NoError(t, err)
  309. require.Zero(t, dial)
  310. // Spawn a goroutine to fail a's dial attempt.
  311. go func() {
  312. time.Sleep(200 * time.Millisecond)
  313. require.NoError(t, peerManager.DialFailed(a))
  314. }()
  315. // This should make b available for dialing (not a, retries are disabled).
  316. ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
  317. defer cancel()
  318. dial, err = peerManager.DialNext(ctx)
  319. require.NoError(t, err)
  320. require.Equal(t, b, dial)
  321. }
  322. func TestPeerManager_DialNext_WakeOnDialFailedRetry(t *testing.T) {
  323. options := p2p.PeerManagerOptions{MinRetryTime: 200 * time.Millisecond}
  324. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), options)
  325. require.NoError(t, err)
  326. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  327. // Add a, dial it, and mark it a failure. This will start a retry timer.
  328. require.NoError(t, peerManager.Add(a))
  329. dial, err := peerManager.TryDialNext()
  330. require.NoError(t, err)
  331. require.Equal(t, a, dial)
  332. require.NoError(t, peerManager.DialFailed(dial))
  333. failed := time.Now()
  334. // The retry timer should unblock DialNext and make a available again after
  335. // the retry time passes.
  336. ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
  337. defer cancel()
  338. dial, err = peerManager.DialNext(ctx)
  339. require.NoError(t, err)
  340. require.Equal(t, a, dial)
  341. require.GreaterOrEqual(t, time.Since(failed), options.MinRetryTime)
  342. }
  343. func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) {
  344. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  345. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  346. require.NoError(t, err)
  347. err = peerManager.Add(a)
  348. require.NoError(t, err)
  349. err = peerManager.Accepted(a.NodeID)
  350. require.NoError(t, err)
  351. dial, err := peerManager.TryDialNext()
  352. require.NoError(t, err)
  353. require.Zero(t, dial)
  354. go func() {
  355. time.Sleep(200 * time.Millisecond)
  356. require.NoError(t, peerManager.Disconnected(a.NodeID))
  357. }()
  358. ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
  359. defer cancel()
  360. dial, err = peerManager.DialNext(ctx)
  361. require.NoError(t, err)
  362. require.Equal(t, a, dial)
  363. }
  364. func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
  365. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  366. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  367. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  368. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  369. MaxConnected: 2,
  370. })
  371. require.NoError(t, err)
  372. // Add a and connect to it.
  373. require.NoError(t, peerManager.Add(a))
  374. dial, err := peerManager.TryDialNext()
  375. require.NoError(t, err)
  376. require.Equal(t, a, dial)
  377. require.NoError(t, peerManager.Dialed(a))
  378. // Add b and start dialing it.
  379. require.NoError(t, peerManager.Add(b))
  380. dial, err = peerManager.TryDialNext()
  381. require.NoError(t, err)
  382. require.Equal(t, b, dial)
  383. // At this point, adding c will not allow dialing it.
  384. require.NoError(t, peerManager.Add(c))
  385. dial, err = peerManager.TryDialNext()
  386. require.NoError(t, err)
  387. require.Zero(t, dial)
  388. }
  389. func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
  390. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  391. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  392. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  393. d := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("d", 40))}
  394. e := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("e", 40))}
  395. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  396. PeerScores: map[p2p.NodeID]p2p.PeerScore{
  397. a.NodeID: 0,
  398. b.NodeID: 1,
  399. c.NodeID: 2,
  400. d.NodeID: 3,
  401. e.NodeID: 0,
  402. },
  403. PersistentPeers: []p2p.NodeID{c.NodeID, d.NodeID},
  404. MaxConnected: 2,
  405. MaxConnectedUpgrade: 1,
  406. })
  407. require.NoError(t, err)
  408. // Add a and connect to it.
  409. require.NoError(t, peerManager.Add(a))
  410. dial, err := peerManager.TryDialNext()
  411. require.NoError(t, err)
  412. require.Equal(t, a, dial)
  413. require.NoError(t, peerManager.Dialed(a))
  414. // Add b and start dialing it.
  415. require.NoError(t, peerManager.Add(b))
  416. dial, err = peerManager.TryDialNext()
  417. require.NoError(t, err)
  418. require.Equal(t, b, dial)
  419. // Even though we are at capacity, we should be allowed to dial c for an
  420. // upgrade of a, since it's higher-scored.
  421. require.NoError(t, peerManager.Add(c))
  422. dial, err = peerManager.TryDialNext()
  423. require.NoError(t, err)
  424. require.Equal(t, c, dial)
  425. // However, since we're using all upgrade slots now, we can't add and dial
  426. // d, even though it's also higher-scored.
  427. require.NoError(t, peerManager.Add(d))
  428. dial, err = peerManager.TryDialNext()
  429. require.NoError(t, err)
  430. require.Zero(t, dial)
  431. // We go through with c's upgrade.
  432. require.NoError(t, peerManager.Dialed(c))
  433. // Still can't dial d.
  434. dial, err = peerManager.TryDialNext()
  435. require.NoError(t, err)
  436. require.Zero(t, dial)
  437. // Now, if we disconnect a, we should be allowed to dial d because we have a
  438. // free upgrade slot.
  439. require.NoError(t, peerManager.Disconnected(a.NodeID))
  440. dial, err = peerManager.TryDialNext()
  441. require.NoError(t, err)
  442. require.Equal(t, d, dial)
  443. require.NoError(t, peerManager.Dialed(d))
  444. // However, if we disconnect b (such that only c and d are connected), we
  445. // should not be allowed to dial e even though there are upgrade slots,
  446. // because there are no lower-scored nodes that can be upgraded.
  447. require.NoError(t, peerManager.Disconnected(b.NodeID))
  448. require.NoError(t, peerManager.Add(e))
  449. dial, err = peerManager.TryDialNext()
  450. require.NoError(t, err)
  451. require.Zero(t, dial)
  452. }
  453. func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
  454. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  455. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  456. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  457. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  458. PeerScores: map[p2p.NodeID]p2p.PeerScore{b.NodeID: 1, c.NodeID: 1},
  459. MaxConnected: 1,
  460. MaxConnectedUpgrade: 2,
  461. })
  462. require.NoError(t, err)
  463. // Add a and connect to it.
  464. require.NoError(t, peerManager.Add(a))
  465. dial, err := peerManager.TryDialNext()
  466. require.NoError(t, err)
  467. require.Equal(t, a, dial)
  468. require.NoError(t, peerManager.Dialed(a))
  469. // Add b and start dialing it. This will claim a for upgrading.
  470. require.NoError(t, peerManager.Add(b))
  471. dial, err = peerManager.TryDialNext()
  472. require.NoError(t, err)
  473. require.Equal(t, b, dial)
  474. // Adding c and dialing it will fail, because a is the only connected
  475. // peer that can be upgraded, and b is already trying to upgrade it.
  476. require.NoError(t, peerManager.Add(c))
  477. dial, err = peerManager.TryDialNext()
  478. require.NoError(t, err)
  479. require.Empty(t, dial)
  480. }
  481. func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
  482. aID := p2p.NodeID(strings.Repeat("a", 40))
  483. a := p2p.NodeAddress{Protocol: "memory", NodeID: aID}
  484. aTCP := p2p.NodeAddress{Protocol: "tcp", NodeID: aID, Hostname: "localhost"}
  485. bID := p2p.NodeID(strings.Repeat("b", 40))
  486. b := p2p.NodeAddress{Protocol: "memory", NodeID: bID}
  487. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  488. MaxConnected: 2,
  489. })
  490. require.NoError(t, err)
  491. // Add a and dial it.
  492. require.NoError(t, peerManager.Add(a))
  493. dial, err := peerManager.TryDialNext()
  494. require.NoError(t, err)
  495. require.Equal(t, a, dial)
  496. // Adding a's TCP address will not dispense a, since it's already dialing.
  497. require.NoError(t, peerManager.Add(aTCP))
  498. dial, err = peerManager.TryDialNext()
  499. require.NoError(t, err)
  500. require.Zero(t, dial)
  501. // Marking a as dialed will still not dispense it.
  502. require.NoError(t, peerManager.Dialed(a))
  503. dial, err = peerManager.TryDialNext()
  504. require.NoError(t, err)
  505. require.Zero(t, dial)
  506. // Adding b and accepting a connection from it will not dispense it either.
  507. require.NoError(t, peerManager.Add(b))
  508. require.NoError(t, peerManager.Accepted(bID))
  509. dial, err = peerManager.TryDialNext()
  510. require.NoError(t, err)
  511. require.Zero(t, dial)
  512. }
  513. func TestPeerManager_TryDialNext_Multiple(t *testing.T) {
  514. aID := p2p.NodeID(strings.Repeat("a", 40))
  515. bID := p2p.NodeID(strings.Repeat("b", 40))
  516. addresses := []p2p.NodeAddress{
  517. {Protocol: "memory", NodeID: aID},
  518. {Protocol: "memory", NodeID: bID},
  519. {Protocol: "tcp", NodeID: aID, Hostname: "127.0.0.1"},
  520. {Protocol: "tcp", NodeID: bID, Hostname: "::1"},
  521. }
  522. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  523. require.NoError(t, err)
  524. for _, address := range addresses {
  525. require.NoError(t, peerManager.Add(address))
  526. }
  527. // All addresses should be dispensed as long as dialing them has failed.
  528. dial := []p2p.NodeAddress{}
  529. for range addresses {
  530. address, err := peerManager.TryDialNext()
  531. require.NoError(t, err)
  532. require.NotZero(t, address)
  533. require.NoError(t, peerManager.DialFailed(address))
  534. dial = append(dial, address)
  535. }
  536. require.ElementsMatch(t, dial, addresses)
  537. address, err := peerManager.TryDialNext()
  538. require.NoError(t, err)
  539. require.Zero(t, address)
  540. }
  541. func TestPeerManager_DialFailed(t *testing.T) {
  542. // DialFailed is tested through other tests, we'll just check a few basic
  543. // things here, e.g. reporting unknown addresses.
  544. aID := p2p.NodeID(strings.Repeat("a", 40))
  545. a := p2p.NodeAddress{Protocol: "memory", NodeID: aID}
  546. bID := p2p.NodeID(strings.Repeat("b", 40))
  547. b := p2p.NodeAddress{Protocol: "memory", NodeID: bID}
  548. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  549. require.NoError(t, err)
  550. require.NoError(t, peerManager.Add(a))
  551. // Dialing and then calling DialFailed with a different address (same
  552. // NodeID) should unmark as dialing and allow us to dial the other address
  553. // again, but not register the failed address.
  554. dial, err := peerManager.TryDialNext()
  555. require.NoError(t, err)
  556. require.Equal(t, a, dial)
  557. require.NoError(t, peerManager.DialFailed(p2p.NodeAddress{
  558. Protocol: "tcp", NodeID: aID, Hostname: "localhost"}))
  559. require.Equal(t, []p2p.NodeAddress{a}, peerManager.Addresses(aID))
  560. dial, err = peerManager.TryDialNext()
  561. require.NoError(t, err)
  562. require.Equal(t, a, dial)
  563. // Calling DialFailed on same address twice should be fine.
  564. require.NoError(t, peerManager.DialFailed(a))
  565. require.NoError(t, peerManager.DialFailed(a))
  566. // DialFailed on an unknown peer shouldn't error or add it.
  567. require.NoError(t, peerManager.DialFailed(b))
  568. require.Equal(t, []p2p.NodeID{aID}, peerManager.Peers())
  569. }
  570. func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
  571. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  572. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  573. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  574. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  575. PeerScores: map[p2p.NodeID]p2p.PeerScore{b.NodeID: 1, c.NodeID: 1},
  576. MaxConnected: 1,
  577. MaxConnectedUpgrade: 2,
  578. })
  579. require.NoError(t, err)
  580. // Add a and connect to it.
  581. require.NoError(t, peerManager.Add(a))
  582. dial, err := peerManager.TryDialNext()
  583. require.NoError(t, err)
  584. require.Equal(t, a, dial)
  585. require.NoError(t, peerManager.Dialed(a))
  586. // Add b and start dialing it. This will claim a for upgrading.
  587. require.NoError(t, peerManager.Add(b))
  588. dial, err = peerManager.TryDialNext()
  589. require.NoError(t, err)
  590. require.Equal(t, b, dial)
  591. // Adding c and dialing it will fail, even though it could upgrade a and we
  592. // have free upgrade slots, because a is the only connected peer that can be
  593. // upgraded and b is already trying to upgrade it.
  594. require.NoError(t, peerManager.Add(c))
  595. dial, err = peerManager.TryDialNext()
  596. require.NoError(t, err)
  597. require.Empty(t, dial)
  598. // Failing b's dial will now make c available for dialing.
  599. require.NoError(t, peerManager.DialFailed(b))
  600. dial, err = peerManager.TryDialNext()
  601. require.NoError(t, err)
  602. require.Equal(t, c, dial)
  603. }
  604. func TestPeerManager_Dialed_Connected(t *testing.T) {
  605. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  606. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  607. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  608. require.NoError(t, err)
  609. // Marking a as dialed twice should error.
  610. require.NoError(t, peerManager.Add(a))
  611. dial, err := peerManager.TryDialNext()
  612. require.NoError(t, err)
  613. require.Equal(t, a, dial)
  614. require.NoError(t, peerManager.Dialed(a))
  615. require.Error(t, peerManager.Dialed(a))
  616. // Accepting a connection from b and then trying to mark it as dialed should fail.
  617. require.NoError(t, peerManager.Add(b))
  618. dial, err = peerManager.TryDialNext()
  619. require.NoError(t, err)
  620. require.Equal(t, b, dial)
  621. require.NoError(t, peerManager.Accepted(b.NodeID))
  622. require.Error(t, peerManager.Dialed(b))
  623. }
  624. func TestPeerManager_Dialed_Self(t *testing.T) {
  625. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  626. require.NoError(t, err)
  627. // Dialing self should error.
  628. require.Error(t, peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: selfID}))
  629. }
  630. func TestPeerManager_Dialed_MaxConnected(t *testing.T) {
  631. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  632. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  633. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  634. MaxConnected: 1,
  635. })
  636. require.NoError(t, err)
  637. // Start to dial a.
  638. require.NoError(t, peerManager.Add(a))
  639. dial, err := peerManager.TryDialNext()
  640. require.NoError(t, err)
  641. require.Equal(t, a, dial)
  642. // Marking b as dialed in the meanwhile (even without TryDialNext)
  643. // should be fine.
  644. require.NoError(t, peerManager.Add(b))
  645. require.NoError(t, peerManager.Dialed(b))
  646. // Completing the dial for a should now error.
  647. require.Error(t, peerManager.Dialed(a))
  648. }
  649. func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) {
  650. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  651. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  652. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  653. d := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("d", 40))}
  654. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  655. MaxConnected: 2,
  656. MaxConnectedUpgrade: 1,
  657. PeerScores: map[p2p.NodeID]p2p.PeerScore{c.NodeID: 1, d.NodeID: 1},
  658. })
  659. require.NoError(t, err)
  660. // Dialing a and b is fine.
  661. require.NoError(t, peerManager.Add(a))
  662. require.NoError(t, peerManager.Dialed(a))
  663. require.NoError(t, peerManager.Add(b))
  664. require.NoError(t, peerManager.Dialed(b))
  665. // Starting an upgrade of c should be fine.
  666. require.NoError(t, peerManager.Add(c))
  667. dial, err := peerManager.TryDialNext()
  668. require.NoError(t, err)
  669. require.Equal(t, c, dial)
  670. require.NoError(t, peerManager.Dialed(c))
  671. // Trying to mark d dialed should fail, since there are no more upgrade
  672. // slots and a/b haven't been evicted yet.
  673. require.NoError(t, peerManager.Add(d))
  674. require.Error(t, peerManager.Dialed(d))
  675. }
  676. func TestPeerManager_Dialed_Unknown(t *testing.T) {
  677. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  678. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  679. require.NoError(t, err)
  680. // Marking an unknown node as dialed should error.
  681. require.Error(t, peerManager.Dialed(a))
  682. }
  683. func TestPeerManager_Dialed_Upgrade(t *testing.T) {
  684. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  685. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  686. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  687. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  688. MaxConnected: 1,
  689. MaxConnectedUpgrade: 2,
  690. PeerScores: map[p2p.NodeID]p2p.PeerScore{b.NodeID: 1, c.NodeID: 1},
  691. })
  692. require.NoError(t, err)
  693. // Dialing a is fine.
  694. require.NoError(t, peerManager.Add(a))
  695. require.NoError(t, peerManager.Dialed(a))
  696. // Upgrading it with b should work, since b has a higher score.
  697. require.NoError(t, peerManager.Add(b))
  698. dial, err := peerManager.TryDialNext()
  699. require.NoError(t, err)
  700. require.Equal(t, b, dial)
  701. require.NoError(t, peerManager.Dialed(b))
  702. // a hasn't been evicted yet, but c shouldn't be allowed to upgrade anyway
  703. // since it's about to be evicted.
  704. require.NoError(t, peerManager.Add(c))
  705. dial, err = peerManager.TryDialNext()
  706. require.NoError(t, err)
  707. require.Empty(t, dial)
  708. // a should now be evicted.
  709. evict, err := peerManager.TryEvictNext()
  710. require.NoError(t, err)
  711. require.Equal(t, a.NodeID, evict)
  712. }
  713. func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
  714. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  715. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  716. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  717. d := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("d", 40))}
  718. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  719. MaxConnected: 2,
  720. MaxConnectedUpgrade: 1,
  721. PeerScores: map[p2p.NodeID]p2p.PeerScore{
  722. a.NodeID: 3,
  723. b.NodeID: 2,
  724. c.NodeID: 10,
  725. d.NodeID: 1,
  726. },
  727. })
  728. require.NoError(t, err)
  729. // Connect to a and b.
  730. require.NoError(t, peerManager.Add(a))
  731. require.NoError(t, peerManager.Dialed(a))
  732. require.NoError(t, peerManager.Add(b))
  733. require.NoError(t, peerManager.Dialed(b))
  734. // Start an upgrade with c, which should pick b to upgrade (since it
  735. // has score 2).
  736. require.NoError(t, peerManager.Add(c))
  737. dial, err := peerManager.TryDialNext()
  738. require.NoError(t, err)
  739. require.Equal(t, c, dial)
  740. // In the meanwhile, a disconnects and d connects. d is even lower-scored
  741. // than b (1 vs 2), which is currently being upgraded.
  742. require.NoError(t, peerManager.Disconnected(a.NodeID))
  743. require.NoError(t, peerManager.Add(d))
  744. require.NoError(t, peerManager.Accepted(d.NodeID))
  745. // Once c completes the upgrade of b, it should instead evict d,
  746. // since it has en even lower score.
  747. require.NoError(t, peerManager.Dialed(c))
  748. evict, err := peerManager.TryEvictNext()
  749. require.NoError(t, err)
  750. require.Equal(t, d.NodeID, evict)
  751. }
  752. func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) {
  753. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  754. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  755. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  756. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  757. MaxConnected: 2,
  758. MaxConnectedUpgrade: 1,
  759. PeerScores: map[p2p.NodeID]p2p.PeerScore{
  760. a.NodeID: 1,
  761. b.NodeID: 2,
  762. c.NodeID: 3,
  763. },
  764. })
  765. require.NoError(t, err)
  766. // Connect to a and b.
  767. require.NoError(t, peerManager.Add(a))
  768. require.NoError(t, peerManager.Dialed(a))
  769. require.NoError(t, peerManager.Add(b))
  770. require.NoError(t, peerManager.Dialed(b))
  771. // Start an upgrade with c, which should pick a to upgrade.
  772. require.NoError(t, peerManager.Add(c))
  773. dial, err := peerManager.TryDialNext()
  774. require.NoError(t, err)
  775. require.Equal(t, c, dial)
  776. // In the meanwhile, b disconnects.
  777. require.NoError(t, peerManager.Disconnected(b.NodeID))
  778. // Once c completes the upgrade of b, there is no longer a need to
  779. // evict anything since we're at capacity.
  780. // since it has en even lower score.
  781. require.NoError(t, peerManager.Dialed(c))
  782. evict, err := peerManager.TryEvictNext()
  783. require.NoError(t, err)
  784. require.Zero(t, evict)
  785. }
  786. func TestPeerManager_Accepted(t *testing.T) {
  787. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  788. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  789. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  790. d := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("d", 40))}
  791. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  792. require.NoError(t, err)
  793. // Accepting a connection from self should error.
  794. require.Error(t, peerManager.Accepted(selfID))
  795. // Accepting a connection from a known peer should work.
  796. require.NoError(t, peerManager.Add(a))
  797. require.NoError(t, peerManager.Accepted(a.NodeID))
  798. // Accepting a connection from an already accepted peer should error.
  799. require.Error(t, peerManager.Accepted(a.NodeID))
  800. // Accepting a connection from an unknown peer should work and register it.
  801. require.NoError(t, peerManager.Accepted(b.NodeID))
  802. require.ElementsMatch(t, []p2p.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
  803. // Accepting a connection from a peer that's being dialed should work, and
  804. // should cause the dial to fail.
  805. require.NoError(t, peerManager.Add(c))
  806. dial, err := peerManager.TryDialNext()
  807. require.NoError(t, err)
  808. require.Equal(t, c, dial)
  809. require.NoError(t, peerManager.Accepted(c.NodeID))
  810. require.Error(t, peerManager.Dialed(c))
  811. // Accepting a connection from a peer that's been dialed should fail.
  812. require.NoError(t, peerManager.Add(d))
  813. dial, err = peerManager.TryDialNext()
  814. require.NoError(t, err)
  815. require.Equal(t, d, dial)
  816. require.NoError(t, peerManager.Dialed(d))
  817. require.Error(t, peerManager.Accepted(d.NodeID))
  818. }
  819. func TestPeerManager_Accepted_MaxConnected(t *testing.T) {
  820. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  821. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  822. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  823. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  824. MaxConnected: 2,
  825. })
  826. require.NoError(t, err)
  827. // Connect to a and b.
  828. require.NoError(t, peerManager.Add(a))
  829. require.NoError(t, peerManager.Dialed(a))
  830. require.NoError(t, peerManager.Add(b))
  831. require.NoError(t, peerManager.Accepted(b.NodeID))
  832. // Accepting c should now fail.
  833. require.NoError(t, peerManager.Add(c))
  834. require.Error(t, peerManager.Accepted(c.NodeID))
  835. }
  836. func TestPeerManager_Accepted_MaxConnectedUpgrade(t *testing.T) {
  837. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  838. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  839. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  840. d := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("d", 40))}
  841. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  842. PeerScores: map[p2p.NodeID]p2p.PeerScore{
  843. c.NodeID: 1,
  844. d.NodeID: 2,
  845. },
  846. MaxConnected: 1,
  847. MaxConnectedUpgrade: 1,
  848. })
  849. require.NoError(t, err)
  850. // Dial a.
  851. require.NoError(t, peerManager.Add(a))
  852. require.NoError(t, peerManager.Dialed(a))
  853. // Accepting b should fail, since it's not an upgrade over a.
  854. require.NoError(t, peerManager.Add(b))
  855. require.Error(t, peerManager.Accepted(b.NodeID))
  856. // Accepting c should work, since it upgrades a.
  857. require.NoError(t, peerManager.Add(c))
  858. require.NoError(t, peerManager.Accepted(c.NodeID))
  859. // a still hasn't been evicted, so accepting b should still fail.
  860. require.NoError(t, peerManager.Add(b))
  861. require.Error(t, peerManager.Accepted(b.NodeID))
  862. // Also, accepting d should fail, since all upgrade slots are full.
  863. require.NoError(t, peerManager.Add(d))
  864. require.Error(t, peerManager.Accepted(d.NodeID))
  865. }
  866. func TestPeerManager_Accepted_Upgrade(t *testing.T) {
  867. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  868. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  869. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  870. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  871. PeerScores: map[p2p.NodeID]p2p.PeerScore{
  872. b.NodeID: 1,
  873. c.NodeID: 1,
  874. },
  875. MaxConnected: 1,
  876. MaxConnectedUpgrade: 2,
  877. })
  878. require.NoError(t, err)
  879. // Accept a.
  880. require.NoError(t, peerManager.Add(a))
  881. require.NoError(t, peerManager.Accepted(a.NodeID))
  882. // Accepting b should work, since it upgrades a.
  883. require.NoError(t, peerManager.Add(b))
  884. require.NoError(t, peerManager.Accepted(b.NodeID))
  885. // c cannot get accepted, since a has been upgraded by b.
  886. require.NoError(t, peerManager.Add(c))
  887. require.Error(t, peerManager.Accepted(c.NodeID))
  888. // This should cause a to get evicted.
  889. evict, err := peerManager.TryEvictNext()
  890. require.NoError(t, err)
  891. require.Equal(t, a.NodeID, evict)
  892. require.NoError(t, peerManager.Disconnected(a.NodeID))
  893. // c still cannot get accepted, since it's not scored above b.
  894. require.Error(t, peerManager.Accepted(c.NodeID))
  895. }
  896. func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) {
  897. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  898. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  899. c := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("c", 40))}
  900. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  901. PeerScores: map[p2p.NodeID]p2p.PeerScore{
  902. b.NodeID: 1,
  903. c.NodeID: 1,
  904. },
  905. MaxConnected: 1,
  906. MaxConnectedUpgrade: 2,
  907. })
  908. require.NoError(t, err)
  909. // Accept a.
  910. require.NoError(t, peerManager.Add(a))
  911. require.NoError(t, peerManager.Accepted(a.NodeID))
  912. // Start dial upgrade from a to b.
  913. require.NoError(t, peerManager.Add(b))
  914. dial, err := peerManager.TryDialNext()
  915. require.NoError(t, err)
  916. require.Equal(t, b, dial)
  917. // a has already been claimed as an upgrade of a, so accepting
  918. // c should fail since there's noone else to upgrade.
  919. require.NoError(t, peerManager.Add(c))
  920. require.Error(t, peerManager.Accepted(c.NodeID))
  921. // However, if b connects to us while we're also trying to upgrade to it via
  922. // dialing, then we accept the incoming connection as an upgrade.
  923. require.NoError(t, peerManager.Accepted(b.NodeID))
  924. // This should cause a to get evicted, and the dial upgrade to fail.
  925. evict, err := peerManager.TryEvictNext()
  926. require.NoError(t, err)
  927. require.Equal(t, a.NodeID, evict)
  928. require.Error(t, peerManager.Dialed(b))
  929. }
  930. func TestPeerManager_Ready(t *testing.T) {
  931. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  932. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  933. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  934. require.NoError(t, err)
  935. sub := peerManager.Subscribe()
  936. defer sub.Close()
  937. // Connecting to a should still have it as status down.
  938. require.NoError(t, peerManager.Add(a))
  939. require.NoError(t, peerManager.Accepted(a.NodeID))
  940. require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
  941. // Marking a as ready should transition it to PeerStatusUp and send an update.
  942. require.NoError(t, peerManager.Ready(a.NodeID))
  943. require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
  944. require.Equal(t, p2p.PeerUpdate{
  945. NodeID: a.NodeID,
  946. Status: p2p.PeerStatusUp,
  947. }, <-sub.Updates())
  948. // Marking an unconnected peer as ready should do nothing.
  949. require.NoError(t, peerManager.Add(b))
  950. require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
  951. require.NoError(t, peerManager.Ready(b.NodeID))
  952. require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
  953. require.Empty(t, sub.Updates())
  954. }
  955. // See TryEvictNext for most tests, this just tests blocking behavior.
  956. func TestPeerManager_EvictNext(t *testing.T) {
  957. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  958. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  959. require.NoError(t, err)
  960. require.NoError(t, peerManager.Add(a))
  961. require.NoError(t, peerManager.Accepted(a.NodeID))
  962. require.NoError(t, peerManager.Ready(a.NodeID))
  963. // Since there are no peers to evict, EvictNext should block until timeout.
  964. timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
  965. defer cancel()
  966. _, err = peerManager.EvictNext(timeoutCtx)
  967. require.Error(t, err)
  968. require.Equal(t, context.DeadlineExceeded, err)
  969. // Erroring the peer will return it from EvictNext().
  970. require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
  971. evict, err := peerManager.EvictNext(timeoutCtx)
  972. require.NoError(t, err)
  973. require.Equal(t, a.NodeID, evict)
  974. // Since there are no more peers to evict, the next call should block.
  975. timeoutCtx, cancel = context.WithTimeout(ctx, 100*time.Millisecond)
  976. defer cancel()
  977. _, err = peerManager.EvictNext(timeoutCtx)
  978. require.Error(t, err)
  979. require.Equal(t, context.DeadlineExceeded, err)
  980. }
  981. func TestPeerManager_EvictNext_WakeOnError(t *testing.T) {
  982. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  983. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  984. require.NoError(t, err)
  985. require.NoError(t, peerManager.Add(a))
  986. require.NoError(t, peerManager.Accepted(a.NodeID))
  987. require.NoError(t, peerManager.Ready(a.NodeID))
  988. // Spawn a goroutine to error a peer after a delay.
  989. go func() {
  990. time.Sleep(200 * time.Millisecond)
  991. require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
  992. }()
  993. // This will block until peer errors above.
  994. ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
  995. defer cancel()
  996. evict, err := peerManager.EvictNext(ctx)
  997. require.NoError(t, err)
  998. require.Equal(t, a.NodeID, evict)
  999. }
  1000. func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
  1001. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  1002. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  1003. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  1004. MaxConnected: 1,
  1005. MaxConnectedUpgrade: 1,
  1006. PeerScores: map[p2p.NodeID]p2p.PeerScore{b.NodeID: 1},
  1007. })
  1008. require.NoError(t, err)
  1009. // Connect a.
  1010. require.NoError(t, peerManager.Add(a))
  1011. require.NoError(t, peerManager.Accepted(a.NodeID))
  1012. require.NoError(t, peerManager.Ready(a.NodeID))
  1013. // Spawn a goroutine to upgrade to b with a delay.
  1014. go func() {
  1015. time.Sleep(200 * time.Millisecond)
  1016. require.NoError(t, peerManager.Add(b))
  1017. dial, err := peerManager.TryDialNext()
  1018. require.NoError(t, err)
  1019. require.Equal(t, b, dial)
  1020. require.NoError(t, peerManager.Dialed(b))
  1021. }()
  1022. // This will block until peer is upgraded above.
  1023. ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
  1024. defer cancel()
  1025. evict, err := peerManager.EvictNext(ctx)
  1026. require.NoError(t, err)
  1027. require.Equal(t, a.NodeID, evict)
  1028. }
  1029. func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) {
  1030. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  1031. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  1032. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  1033. MaxConnected: 1,
  1034. MaxConnectedUpgrade: 1,
  1035. PeerScores: map[p2p.NodeID]p2p.PeerScore{b.NodeID: 1},
  1036. })
  1037. require.NoError(t, err)
  1038. // Connect a.
  1039. require.NoError(t, peerManager.Add(a))
  1040. require.NoError(t, peerManager.Accepted(a.NodeID))
  1041. require.NoError(t, peerManager.Ready(a.NodeID))
  1042. // Spawn a goroutine to upgrade b with a delay.
  1043. go func() {
  1044. time.Sleep(200 * time.Millisecond)
  1045. require.NoError(t, peerManager.Accepted(b.NodeID))
  1046. }()
  1047. // This will block until peer is upgraded above.
  1048. ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
  1049. defer cancel()
  1050. evict, err := peerManager.EvictNext(ctx)
  1051. require.NoError(t, err)
  1052. require.Equal(t, a.NodeID, evict)
  1053. }
  1054. func TestPeerManager_TryEvictNext(t *testing.T) {
  1055. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  1056. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1057. require.NoError(t, err)
  1058. require.NoError(t, peerManager.Add(a))
  1059. // Nothing is evicted with no peers connected.
  1060. evict, err := peerManager.TryEvictNext()
  1061. require.NoError(t, err)
  1062. require.Zero(t, evict)
  1063. // Connecting to a won't evict anything either.
  1064. require.NoError(t, peerManager.Accepted(a.NodeID))
  1065. require.NoError(t, peerManager.Ready(a.NodeID))
  1066. // But if a errors it should be evicted.
  1067. require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
  1068. evict, err = peerManager.TryEvictNext()
  1069. require.NoError(t, err)
  1070. require.Equal(t, a.NodeID, evict)
  1071. // While a is being evicted (before disconnect), it shouldn't get evicted again.
  1072. evict, err = peerManager.TryEvictNext()
  1073. require.NoError(t, err)
  1074. require.Zero(t, evict)
  1075. require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
  1076. evict, err = peerManager.TryEvictNext()
  1077. require.NoError(t, err)
  1078. require.Zero(t, evict)
  1079. }
  1080. func TestPeerManager_Disconnected(t *testing.T) {
  1081. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  1082. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1083. require.NoError(t, err)
  1084. sub := peerManager.Subscribe()
  1085. defer sub.Close()
  1086. // Disconnecting an unknown peer does nothing.
  1087. require.NoError(t, peerManager.Disconnected(a.NodeID))
  1088. require.Empty(t, peerManager.Peers())
  1089. require.Empty(t, sub.Updates())
  1090. // Disconnecting an accepted non-ready peer does not send a status update.
  1091. require.NoError(t, peerManager.Add(a))
  1092. require.NoError(t, peerManager.Accepted(a.NodeID))
  1093. require.NoError(t, peerManager.Disconnected(a.NodeID))
  1094. require.Empty(t, sub.Updates())
  1095. // Disconnecting a ready peer sends a status update.
  1096. require.NoError(t, peerManager.Add(a))
  1097. require.NoError(t, peerManager.Accepted(a.NodeID))
  1098. require.NoError(t, peerManager.Ready(a.NodeID))
  1099. require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
  1100. require.NotEmpty(t, sub.Updates())
  1101. require.Equal(t, p2p.PeerUpdate{
  1102. NodeID: a.NodeID,
  1103. Status: p2p.PeerStatusUp,
  1104. }, <-sub.Updates())
  1105. require.NoError(t, peerManager.Disconnected(a.NodeID))
  1106. require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
  1107. require.NotEmpty(t, sub.Updates())
  1108. require.Equal(t, p2p.PeerUpdate{
  1109. NodeID: a.NodeID,
  1110. Status: p2p.PeerStatusDown,
  1111. }, <-sub.Updates())
  1112. // Disconnecting a dialing peer does not unmark it as dialing, to avoid
  1113. // dialing it multiple times in parallel.
  1114. dial, err := peerManager.TryDialNext()
  1115. require.NoError(t, err)
  1116. require.Equal(t, a, dial)
  1117. require.NoError(t, peerManager.Disconnected(a.NodeID))
  1118. dial, err = peerManager.TryDialNext()
  1119. require.NoError(t, err)
  1120. require.Zero(t, dial)
  1121. }
  1122. func TestPeerManager_Errored(t *testing.T) {
  1123. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  1124. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1125. require.NoError(t, err)
  1126. // Erroring an unknown peer does nothing.
  1127. require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
  1128. require.Empty(t, peerManager.Peers())
  1129. evict, err := peerManager.TryEvictNext()
  1130. require.NoError(t, err)
  1131. require.Zero(t, evict)
  1132. // Erroring a known peer does nothing, and won't evict it later,
  1133. // even when it connects.
  1134. require.NoError(t, peerManager.Add(a))
  1135. require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
  1136. evict, err = peerManager.TryEvictNext()
  1137. require.NoError(t, err)
  1138. require.Zero(t, evict)
  1139. require.NoError(t, peerManager.Accepted(a.NodeID))
  1140. require.NoError(t, peerManager.Ready(a.NodeID))
  1141. evict, err = peerManager.TryEvictNext()
  1142. require.NoError(t, err)
  1143. require.Zero(t, evict)
  1144. // However, erroring once connected will evict it.
  1145. require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
  1146. evict, err = peerManager.TryEvictNext()
  1147. require.NoError(t, err)
  1148. require.Equal(t, a.NodeID, evict)
  1149. }
  1150. func TestPeerManager_Subscribe(t *testing.T) {
  1151. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  1152. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1153. require.NoError(t, err)
  1154. // This tests all subscription events for full peer lifecycles.
  1155. sub := peerManager.Subscribe()
  1156. defer sub.Close()
  1157. require.NoError(t, peerManager.Add(a))
  1158. require.Empty(t, sub.Updates())
  1159. // Inbound connection.
  1160. require.NoError(t, peerManager.Accepted(a.NodeID))
  1161. require.Empty(t, sub.Updates())
  1162. require.NoError(t, peerManager.Ready(a.NodeID))
  1163. require.NotEmpty(t, sub.Updates())
  1164. require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
  1165. require.NoError(t, peerManager.Disconnected(a.NodeID))
  1166. require.NotEmpty(t, sub.Updates())
  1167. require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
  1168. // Outbound connection with peer error and eviction.
  1169. dial, err := peerManager.TryDialNext()
  1170. require.NoError(t, err)
  1171. require.Equal(t, a, dial)
  1172. require.Empty(t, sub.Updates())
  1173. require.NoError(t, peerManager.Dialed(a))
  1174. require.Empty(t, sub.Updates())
  1175. require.NoError(t, peerManager.Ready(a.NodeID))
  1176. require.NotEmpty(t, sub.Updates())
  1177. require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
  1178. require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
  1179. require.Empty(t, sub.Updates())
  1180. evict, err := peerManager.TryEvictNext()
  1181. require.NoError(t, err)
  1182. require.Equal(t, a.NodeID, evict)
  1183. require.NoError(t, peerManager.Disconnected(a.NodeID))
  1184. require.NotEmpty(t, sub.Updates())
  1185. require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
  1186. // Outbound connection with dial failure.
  1187. dial, err = peerManager.TryDialNext()
  1188. require.NoError(t, err)
  1189. require.Equal(t, a, dial)
  1190. require.Empty(t, sub.Updates())
  1191. require.NoError(t, peerManager.DialFailed(a))
  1192. require.Empty(t, sub.Updates())
  1193. }
  1194. func TestPeerManager_Subscribe_Close(t *testing.T) {
  1195. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  1196. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1197. require.NoError(t, err)
  1198. sub := peerManager.Subscribe()
  1199. defer sub.Close()
  1200. require.NoError(t, peerManager.Add(a))
  1201. require.NoError(t, peerManager.Accepted(a.NodeID))
  1202. require.Empty(t, sub.Updates())
  1203. require.NoError(t, peerManager.Ready(a.NodeID))
  1204. require.NotEmpty(t, sub.Updates())
  1205. require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
  1206. // Closing the subscription should not send us the disconnected update.
  1207. sub.Close()
  1208. require.NoError(t, peerManager.Disconnected(a.NodeID))
  1209. require.Empty(t, sub.Updates())
  1210. }
  1211. func TestPeerManager_Subscribe_Broadcast(t *testing.T) {
  1212. t.Cleanup(leaktest.Check(t))
  1213. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  1214. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1215. require.NoError(t, err)
  1216. s1 := peerManager.Subscribe()
  1217. defer s1.Close()
  1218. s2 := peerManager.Subscribe()
  1219. defer s2.Close()
  1220. s3 := peerManager.Subscribe()
  1221. defer s3.Close()
  1222. // Connecting to a peer should send updates on all subscriptions.
  1223. require.NoError(t, peerManager.Add(a))
  1224. require.NoError(t, peerManager.Accepted(a.NodeID))
  1225. require.NoError(t, peerManager.Ready(a.NodeID))
  1226. expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}
  1227. require.NotEmpty(t, s1)
  1228. require.Equal(t, expectUp, <-s1.Updates())
  1229. require.NotEmpty(t, s2)
  1230. require.Equal(t, expectUp, <-s2.Updates())
  1231. require.NotEmpty(t, s3)
  1232. require.Equal(t, expectUp, <-s3.Updates())
  1233. // We now close s2. Disconnecting the peer should only send updates
  1234. // on s1 and s3.
  1235. s2.Close()
  1236. require.NoError(t, peerManager.Disconnected(a.NodeID))
  1237. expectDown := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}
  1238. require.NotEmpty(t, s1)
  1239. require.Equal(t, expectDown, <-s1.Updates())
  1240. require.Empty(t, s2.Updates())
  1241. require.NotEmpty(t, s3)
  1242. require.Equal(t, expectDown, <-s3.Updates())
  1243. }
  1244. func TestPeerManager_Close(t *testing.T) {
  1245. // leaktest will check that spawned goroutines are closed.
  1246. t.Cleanup(leaktest.CheckTimeout(t, 1*time.Second))
  1247. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  1248. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  1249. MinRetryTime: 10 * time.Second,
  1250. })
  1251. require.NoError(t, err)
  1252. // This subscription isn't closed, but PeerManager.Close()
  1253. // should reap the spawned goroutine.
  1254. _ = peerManager.Subscribe()
  1255. // This dial failure will start a retry timer for 10 seconds, which
  1256. // should be reaped.
  1257. require.NoError(t, peerManager.Add(a))
  1258. dial, err := peerManager.TryDialNext()
  1259. require.NoError(t, err)
  1260. require.Equal(t, a, dial)
  1261. require.NoError(t, peerManager.DialFailed(a))
  1262. // This should clean up the goroutines.
  1263. peerManager.Close()
  1264. }
  1265. func TestPeerManager_Advertise(t *testing.T) {
  1266. aID := p2p.NodeID(strings.Repeat("a", 40))
  1267. aTCP := p2p.NodeAddress{Protocol: "tcp", NodeID: aID, Hostname: "127.0.0.1", Port: 26657, Path: "/path"}
  1268. aMem := p2p.NodeAddress{Protocol: "memory", NodeID: aID}
  1269. bID := p2p.NodeID(strings.Repeat("b", 40))
  1270. bTCP := p2p.NodeAddress{Protocol: "tcp", NodeID: bID, Hostname: "b10c::1", Port: 26657, Path: "/path"}
  1271. bMem := p2p.NodeAddress{Protocol: "memory", NodeID: bID}
  1272. cID := p2p.NodeID(strings.Repeat("c", 40))
  1273. cTCP := p2p.NodeAddress{Protocol: "tcp", NodeID: cID, Hostname: "host.domain", Port: 80}
  1274. cMem := p2p.NodeAddress{Protocol: "memory", NodeID: cID}
  1275. dID := p2p.NodeID(strings.Repeat("d", 40))
  1276. // Create an initial peer manager and add the peers.
  1277. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  1278. PeerScores: map[p2p.NodeID]p2p.PeerScore{aID: 3, bID: 2, cID: 1},
  1279. })
  1280. require.NoError(t, err)
  1281. defer peerManager.Close()
  1282. require.NoError(t, peerManager.Add(aTCP))
  1283. require.NoError(t, peerManager.Add(aMem))
  1284. require.NoError(t, peerManager.Add(bTCP))
  1285. require.NoError(t, peerManager.Add(bMem))
  1286. require.NoError(t, peerManager.Add(cTCP))
  1287. require.NoError(t, peerManager.Add(cMem))
  1288. // d should get all addresses.
  1289. require.ElementsMatch(t, []p2p.NodeAddress{
  1290. aTCP, aMem, bTCP, bMem, cTCP, cMem,
  1291. }, peerManager.Advertise(dID, 100))
  1292. // a should not get its own addresses.
  1293. require.ElementsMatch(t, []p2p.NodeAddress{
  1294. bTCP, bMem, cTCP, cMem,
  1295. }, peerManager.Advertise(aID, 100))
  1296. // Asking for 0 addresses should return, well, 0.
  1297. require.Empty(t, peerManager.Advertise(aID, 0))
  1298. // Asking for 2 addresses should get the highest-rated ones, i.e. a.
  1299. require.ElementsMatch(t, []p2p.NodeAddress{
  1300. aTCP, aMem,
  1301. }, peerManager.Advertise(dID, 2))
  1302. }
  1303. func TestPeerManager_SetHeight_GetHeight(t *testing.T) {
  1304. a := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("a", 40))}
  1305. b := p2p.NodeAddress{Protocol: "memory", NodeID: p2p.NodeID(strings.Repeat("b", 40))}
  1306. db := dbm.NewMemDB()
  1307. peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
  1308. require.NoError(t, err)
  1309. // Getting a height should default to 0, for unknown peers and
  1310. // for known peers without height.
  1311. require.NoError(t, peerManager.Add(a))
  1312. require.EqualValues(t, 0, peerManager.GetHeight(a.NodeID))
  1313. require.EqualValues(t, 0, peerManager.GetHeight(b.NodeID))
  1314. // Setting a height should work for a known node.
  1315. require.NoError(t, peerManager.SetHeight(a.NodeID, 3))
  1316. require.EqualValues(t, 3, peerManager.GetHeight(a.NodeID))
  1317. // Setting a height should add an unknown node.
  1318. require.Equal(t, []p2p.NodeID{a.NodeID}, peerManager.Peers())
  1319. require.NoError(t, peerManager.SetHeight(b.NodeID, 7))
  1320. require.EqualValues(t, 7, peerManager.GetHeight(b.NodeID))
  1321. require.ElementsMatch(t, []p2p.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
  1322. // The heights should not be persisted.
  1323. peerManager.Close()
  1324. peerManager, err = p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
  1325. require.NoError(t, err)
  1326. require.ElementsMatch(t, []p2p.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
  1327. require.Zero(t, peerManager.GetHeight(a.NodeID))
  1328. require.Zero(t, peerManager.GetHeight(b.NodeID))
  1329. }
  1330. func TestPeerScoring(t *testing.T) {
  1331. // create a mock peer manager
  1332. db := dbm.NewMemDB()
  1333. peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
  1334. require.NoError(t, err)
  1335. defer peerManager.Close()
  1336. // create a fake node
  1337. id := p2p.NodeID(strings.Repeat("a1", 20))
  1338. require.NoError(t, peerManager.Add(p2p.NodeAddress{NodeID: id, Protocol: "memory"}))
  1339. // update the manager and make sure it's correct
  1340. pu := peerManager.Subscribe()
  1341. require.EqualValues(t, 0, peerManager.Scores()[id])
  1342. // add a bunch of good status updates and watch things increase.
  1343. for i := 1; i < 10; i++ {
  1344. pu.SendUpdate(p2p.PeerUpdate{
  1345. NodeID: id,
  1346. Status: p2p.PeerStatusGood,
  1347. })
  1348. time.Sleep(time.Millisecond) // force a context switch
  1349. require.EqualValues(t, i, peerManager.Scores()[id])
  1350. }
  1351. // watch the corresponding decreases respond to update
  1352. for i := 10; i == 0; i-- {
  1353. pu.SendUpdate(p2p.PeerUpdate{
  1354. NodeID: id,
  1355. Status: p2p.PeerStatusBad,
  1356. })
  1357. time.Sleep(time.Millisecond) // force a context switch
  1358. require.EqualValues(t, i, peerManager.Scores()[id])
  1359. }
  1360. }