You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1881 lines
61 KiB

  1. package p2p_test
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "testing"
  8. "time"
  9. "github.com/fortytw2/leaktest"
  10. "github.com/stretchr/testify/require"
  11. dbm "github.com/tendermint/tm-db"
  12. "github.com/tendermint/tendermint/internal/p2p"
  13. "github.com/tendermint/tendermint/types"
  14. )
  15. // FIXME: We should probably have some randomized property-based tests for the
  16. // PeerManager too, which runs a bunch of random operations with random peers
  17. // and ensures certain invariants always hold. The logic can be complex, with
  18. // many interactions, and it's hard to cover all scenarios with handwritten
  19. // tests.
  20. func TestPeerManagerOptions_Validate(t *testing.T) {
  21. nodeID := types.NodeID("00112233445566778899aabbccddeeff00112233")
  22. testcases := map[string]struct {
  23. options p2p.PeerManagerOptions
  24. ok bool
  25. }{
  26. "zero options is valid": {p2p.PeerManagerOptions{}, true},
  27. // PersistentPeers
  28. "valid PersistentPeers NodeID": {p2p.PeerManagerOptions{
  29. PersistentPeers: []types.NodeID{"00112233445566778899aabbccddeeff00112233"},
  30. }, true},
  31. "invalid PersistentPeers NodeID": {p2p.PeerManagerOptions{
  32. PersistentPeers: []types.NodeID{"foo"},
  33. }, false},
  34. "uppercase PersistentPeers NodeID": {p2p.PeerManagerOptions{
  35. PersistentPeers: []types.NodeID{"00112233445566778899AABBCCDDEEFF00112233"},
  36. }, false},
  37. "PersistentPeers at MaxConnected": {p2p.PeerManagerOptions{
  38. PersistentPeers: []types.NodeID{nodeID, nodeID, nodeID},
  39. MaxConnected: 3,
  40. }, true},
  41. "PersistentPeers above MaxConnected": {p2p.PeerManagerOptions{
  42. PersistentPeers: []types.NodeID{nodeID, nodeID, nodeID},
  43. MaxConnected: 2,
  44. }, false},
  45. "PersistentPeers above MaxConnected below MaxConnectedUpgrade": {p2p.PeerManagerOptions{
  46. PersistentPeers: []types.NodeID{nodeID, nodeID, nodeID},
  47. MaxConnected: 2,
  48. MaxConnectedUpgrade: 2,
  49. }, false},
  50. // MaxPeers
  51. "MaxPeers without MaxConnected": {p2p.PeerManagerOptions{
  52. MaxPeers: 3,
  53. }, false},
  54. "MaxPeers below MaxConnected+MaxConnectedUpgrade": {p2p.PeerManagerOptions{
  55. MaxPeers: 2,
  56. MaxConnected: 2,
  57. MaxConnectedUpgrade: 1,
  58. }, false},
  59. "MaxPeers at MaxConnected+MaxConnectedUpgrade": {p2p.PeerManagerOptions{
  60. MaxPeers: 3,
  61. MaxConnected: 2,
  62. MaxConnectedUpgrade: 1,
  63. }, true},
  64. // MaxRetryTime
  65. "MaxRetryTime below MinRetryTime": {p2p.PeerManagerOptions{
  66. MinRetryTime: 7 * time.Second,
  67. MaxRetryTime: 5 * time.Second,
  68. }, false},
  69. "MaxRetryTime at MinRetryTime": {p2p.PeerManagerOptions{
  70. MinRetryTime: 5 * time.Second,
  71. MaxRetryTime: 5 * time.Second,
  72. }, true},
  73. "MaxRetryTime without MinRetryTime": {p2p.PeerManagerOptions{
  74. MaxRetryTime: 5 * time.Second,
  75. }, false},
  76. // MaxRetryTimePersistent
  77. "MaxRetryTimePersistent below MinRetryTime": {p2p.PeerManagerOptions{
  78. MinRetryTime: 7 * time.Second,
  79. MaxRetryTimePersistent: 5 * time.Second,
  80. }, false},
  81. "MaxRetryTimePersistent at MinRetryTime": {p2p.PeerManagerOptions{
  82. MinRetryTime: 5 * time.Second,
  83. MaxRetryTimePersistent: 5 * time.Second,
  84. }, true},
  85. "MaxRetryTimePersistent without MinRetryTime": {p2p.PeerManagerOptions{
  86. MaxRetryTimePersistent: 5 * time.Second,
  87. }, false},
  88. }
  89. for name, tc := range testcases {
  90. tc := tc
  91. t.Run(name, func(t *testing.T) {
  92. err := tc.options.Validate()
  93. if tc.ok {
  94. require.NoError(t, err)
  95. } else {
  96. require.Error(t, err)
  97. }
  98. })
  99. }
  100. }
  101. func TestNewPeerManager(t *testing.T) {
  102. // Zero options should be valid.
  103. _, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  104. require.NoError(t, err)
  105. // Invalid options should error.
  106. _, err = p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  107. PersistentPeers: []types.NodeID{"foo"},
  108. })
  109. require.Error(t, err)
  110. // Invalid database should error.
  111. _, err = p2p.NewPeerManager(selfID, nil, p2p.PeerManagerOptions{})
  112. require.Error(t, err)
  113. // Empty self ID should error.
  114. _, err = p2p.NewPeerManager("", nil, p2p.PeerManagerOptions{})
  115. require.Error(t, err)
  116. }
  117. func TestNewPeerManager_Persistence(t *testing.T) {
  118. aID := types.NodeID(strings.Repeat("a", 40))
  119. aAddresses := []p2p.NodeAddress{
  120. {Protocol: "tcp", NodeID: aID, Hostname: "127.0.0.1", Port: 26657, Path: "/path"},
  121. {Protocol: "memory", NodeID: aID},
  122. }
  123. bID := types.NodeID(strings.Repeat("b", 40))
  124. bAddresses := []p2p.NodeAddress{
  125. {Protocol: "tcp", NodeID: bID, Hostname: "b10c::1", Port: 26657, Path: "/path"},
  126. {Protocol: "memory", NodeID: bID},
  127. }
  128. cID := types.NodeID(strings.Repeat("c", 40))
  129. cAddresses := []p2p.NodeAddress{
  130. {Protocol: "tcp", NodeID: cID, Hostname: "host.domain", Port: 80},
  131. {Protocol: "memory", NodeID: cID},
  132. }
  133. // Create an initial peer manager and add the peers.
  134. db := dbm.NewMemDB()
  135. peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{
  136. PersistentPeers: []types.NodeID{aID},
  137. PeerScores: map[types.NodeID]p2p.PeerScore{bID: 1},
  138. })
  139. require.NoError(t, err)
  140. for _, addr := range append(append(aAddresses, bAddresses...), cAddresses...) {
  141. added, err := peerManager.Add(addr)
  142. require.NoError(t, err)
  143. require.True(t, added)
  144. }
  145. require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
  146. require.ElementsMatch(t, bAddresses, peerManager.Addresses(bID))
  147. require.ElementsMatch(t, cAddresses, peerManager.Addresses(cID))
  148. require.Equal(t, map[types.NodeID]p2p.PeerScore{
  149. aID: p2p.PeerScorePersistent,
  150. bID: 1,
  151. cID: 0,
  152. }, peerManager.Scores())
  153. // Creating a new peer manager with the same database should retain the
  154. // peers, but they should have updated scores from the new PersistentPeers
  155. // configuration.
  156. peerManager, err = p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{
  157. PersistentPeers: []types.NodeID{bID},
  158. PeerScores: map[types.NodeID]p2p.PeerScore{cID: 1},
  159. })
  160. require.NoError(t, err)
  161. require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
  162. require.ElementsMatch(t, bAddresses, peerManager.Addresses(bID))
  163. require.ElementsMatch(t, cAddresses, peerManager.Addresses(cID))
  164. require.Equal(t, map[types.NodeID]p2p.PeerScore{
  165. aID: 0,
  166. bID: p2p.PeerScorePersistent,
  167. cID: 1,
  168. }, peerManager.Scores())
  169. }
  170. func TestNewPeerManager_SelfIDChange(t *testing.T) {
  171. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  172. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  173. db := dbm.NewMemDB()
  174. peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
  175. require.NoError(t, err)
  176. added, err := peerManager.Add(a)
  177. require.NoError(t, err)
  178. require.True(t, added)
  179. added, err = peerManager.Add(b)
  180. require.NoError(t, err)
  181. require.True(t, added)
  182. require.ElementsMatch(t, []types.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
  183. // If we change our selfID to one of the peers in the peer store, it
  184. // should be removed from the store.
  185. peerManager, err = p2p.NewPeerManager(a.NodeID, db, p2p.PeerManagerOptions{})
  186. require.NoError(t, err)
  187. require.Equal(t, []types.NodeID{b.NodeID}, peerManager.Peers())
  188. }
  189. func TestPeerManager_Add(t *testing.T) {
  190. aID := types.NodeID(strings.Repeat("a", 40))
  191. bID := types.NodeID(strings.Repeat("b", 40))
  192. cID := types.NodeID(strings.Repeat("c", 40))
  193. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  194. PersistentPeers: []types.NodeID{aID, cID},
  195. MaxPeers: 2,
  196. MaxConnected: 2,
  197. })
  198. require.NoError(t, err)
  199. // Adding a couple of addresses should work.
  200. aAddresses := []p2p.NodeAddress{
  201. {Protocol: "tcp", NodeID: aID, Hostname: "localhost"},
  202. {Protocol: "memory", NodeID: aID},
  203. }
  204. for _, addr := range aAddresses {
  205. added, err := peerManager.Add(addr)
  206. require.NoError(t, err)
  207. require.True(t, added)
  208. }
  209. require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
  210. // Adding a different peer should be fine.
  211. bAddress := p2p.NodeAddress{Protocol: "tcp", NodeID: bID, Hostname: "localhost"}
  212. added, err := peerManager.Add(bAddress)
  213. require.NoError(t, err)
  214. require.True(t, added)
  215. require.Equal(t, []p2p.NodeAddress{bAddress}, peerManager.Addresses(bID))
  216. require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
  217. // Adding an existing address again should be a noop.
  218. added, err = peerManager.Add(aAddresses[0])
  219. require.NoError(t, err)
  220. require.False(t, added)
  221. require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID))
  222. // Adding a third peer with MaxPeers=2 should cause bID, which is
  223. // the lowest-scored peer (not in PersistentPeers), to be removed.
  224. added, err = peerManager.Add(p2p.NodeAddress{
  225. Protocol: "tcp", NodeID: cID, Hostname: "localhost"})
  226. require.NoError(t, err)
  227. require.True(t, added)
  228. require.ElementsMatch(t, []types.NodeID{aID, cID}, peerManager.Peers())
  229. // Adding an invalid address should error.
  230. _, err = peerManager.Add(p2p.NodeAddress{Path: "foo"})
  231. require.Error(t, err)
  232. // Adding self should error
  233. _, err = peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: selfID})
  234. require.Error(t, err)
  235. }
  236. func TestPeerManager_DialNext(t *testing.T) {
  237. ctx, cancel := context.WithCancel(context.Background())
  238. defer cancel()
  239. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  240. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  241. require.NoError(t, err)
  242. // Add an address. DialNext should return it.
  243. added, err := peerManager.Add(a)
  244. require.NoError(t, err)
  245. require.True(t, added)
  246. address, err := peerManager.DialNext(ctx)
  247. require.NoError(t, err)
  248. require.Equal(t, a, address)
  249. // Since there are no more undialed peers, the next call should block
  250. // until it times out.
  251. timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
  252. defer cancel()
  253. _, err = peerManager.DialNext(timeoutCtx)
  254. require.Error(t, err)
  255. require.Equal(t, context.DeadlineExceeded, err)
  256. }
  257. func TestPeerManager_DialNext_Retry(t *testing.T) {
  258. ctx, cancel := context.WithCancel(context.Background())
  259. defer cancel()
  260. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  261. options := p2p.PeerManagerOptions{
  262. MinRetryTime: 100 * time.Millisecond,
  263. MaxRetryTime: 500 * time.Millisecond,
  264. }
  265. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), options)
  266. require.NoError(t, err)
  267. added, err := peerManager.Add(a)
  268. require.NoError(t, err)
  269. require.True(t, added)
  270. // Do five dial retries (six dials total). The retry time should double for
  271. // each failure. At the forth retry, MaxRetryTime should kick in.
  272. ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
  273. defer cancel()
  274. for i := 0; i <= 6; i++ {
  275. start := time.Now()
  276. dial, err := peerManager.DialNext(ctx)
  277. require.NoError(t, err)
  278. require.Equal(t, a, dial)
  279. elapsed := time.Since(start).Round(time.Millisecond)
  280. fmt.Println(elapsed, options.MinRetryTime)
  281. switch i {
  282. case 0:
  283. require.LessOrEqual(t, elapsed, options.MinRetryTime)
  284. case 1:
  285. require.GreaterOrEqual(t, elapsed, options.MinRetryTime)
  286. case 2:
  287. require.GreaterOrEqual(t, elapsed, 2*options.MinRetryTime)
  288. case 3:
  289. require.GreaterOrEqual(t, elapsed, 3*options.MinRetryTime)
  290. case 4, 5, 6:
  291. require.GreaterOrEqual(t, elapsed, 4*options.MinRetryTime)
  292. default:
  293. t.Fatal("unexpected retry")
  294. }
  295. require.NoError(t, peerManager.DialFailed(ctx, a))
  296. }
  297. }
  298. func TestPeerManager_DialNext_WakeOnAdd(t *testing.T) {
  299. ctx, cancel := context.WithCancel(context.Background())
  300. defer cancel()
  301. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  302. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  303. require.NoError(t, err)
  304. // Spawn a goroutine to add a peer after a delay.
  305. go func() {
  306. time.Sleep(200 * time.Millisecond)
  307. added, err := peerManager.Add(a)
  308. require.NoError(t, err)
  309. require.True(t, added)
  310. }()
  311. // This will block until peer is added above.
  312. ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
  313. defer cancel()
  314. dial, err := peerManager.DialNext(ctx)
  315. require.NoError(t, err)
  316. require.Equal(t, a, dial)
  317. }
  318. func TestPeerManager_DialNext_WakeOnDialFailed(t *testing.T) {
  319. ctx, cancel := context.WithCancel(context.Background())
  320. defer cancel()
  321. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  322. MaxConnected: 1,
  323. })
  324. require.NoError(t, err)
  325. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  326. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  327. // Add and dial a.
  328. added, err := peerManager.Add(a)
  329. require.NoError(t, err)
  330. require.True(t, added)
  331. dial, err := peerManager.TryDialNext()
  332. require.NoError(t, err)
  333. require.Equal(t, a, dial)
  334. // Add b. We shouldn't be able to dial it, due to MaxConnected.
  335. added, err = peerManager.Add(b)
  336. require.NoError(t, err)
  337. require.True(t, added)
  338. dial, err = peerManager.TryDialNext()
  339. require.NoError(t, err)
  340. require.Zero(t, dial)
  341. // Spawn a goroutine to fail a's dial attempt.
  342. sig := make(chan struct{})
  343. go func() {
  344. defer close(sig)
  345. time.Sleep(200 * time.Millisecond)
  346. require.NoError(t, peerManager.DialFailed(ctx, a))
  347. }()
  348. // This should make b available for dialing (not a, retries are disabled).
  349. opctx, opcancel := context.WithTimeout(ctx, 3*time.Second)
  350. defer opcancel()
  351. dial, err = peerManager.DialNext(opctx)
  352. require.NoError(t, err)
  353. require.Equal(t, b, dial)
  354. <-sig
  355. }
  356. func TestPeerManager_DialNext_WakeOnDialFailedRetry(t *testing.T) {
  357. ctx, cancel := context.WithCancel(context.Background())
  358. defer cancel()
  359. options := p2p.PeerManagerOptions{MinRetryTime: 200 * time.Millisecond}
  360. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), options)
  361. require.NoError(t, err)
  362. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  363. // Add a, dial it, and mark it a failure. This will start a retry timer.
  364. added, err := peerManager.Add(a)
  365. require.NoError(t, err)
  366. require.True(t, added)
  367. dial, err := peerManager.TryDialNext()
  368. require.NoError(t, err)
  369. require.Equal(t, a, dial)
  370. require.NoError(t, peerManager.DialFailed(ctx, dial))
  371. failed := time.Now()
  372. // The retry timer should unblock DialNext and make a available again after
  373. // the retry time passes.
  374. ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
  375. defer cancel()
  376. dial, err = peerManager.DialNext(ctx)
  377. require.NoError(t, err)
  378. require.Equal(t, a, dial)
  379. require.GreaterOrEqual(t, time.Since(failed), options.MinRetryTime)
  380. }
  381. func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) {
  382. ctx, cancel := context.WithCancel(context.Background())
  383. defer cancel()
  384. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  385. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  386. require.NoError(t, err)
  387. added, err := peerManager.Add(a)
  388. require.NoError(t, err)
  389. require.True(t, added)
  390. err = peerManager.Accepted(a.NodeID)
  391. require.NoError(t, err)
  392. dial, err := peerManager.TryDialNext()
  393. require.NoError(t, err)
  394. require.Zero(t, dial)
  395. dctx, dcancel := context.WithTimeout(ctx, 300*time.Millisecond)
  396. defer dcancel()
  397. go func() {
  398. time.Sleep(200 * time.Millisecond)
  399. peerManager.Disconnected(dctx, a.NodeID)
  400. }()
  401. ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
  402. defer cancel()
  403. dial, err = peerManager.DialNext(ctx)
  404. require.NoError(t, err)
  405. require.Equal(t, a, dial)
  406. }
  407. func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
  408. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  409. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  410. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  411. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  412. MaxConnected: 2,
  413. })
  414. require.NoError(t, err)
  415. // Add a and connect to it.
  416. added, err := peerManager.Add(a)
  417. require.NoError(t, err)
  418. require.True(t, added)
  419. dial, err := peerManager.TryDialNext()
  420. require.NoError(t, err)
  421. require.Equal(t, a, dial)
  422. require.NoError(t, peerManager.Dialed(a))
  423. // Add b and start dialing it.
  424. added, err = peerManager.Add(b)
  425. require.NoError(t, err)
  426. require.True(t, added)
  427. dial, err = peerManager.TryDialNext()
  428. require.NoError(t, err)
  429. require.Equal(t, b, dial)
  430. // At this point, adding c will not allow dialing it.
  431. added, err = peerManager.Add(c)
  432. require.NoError(t, err)
  433. require.True(t, added)
  434. dial, err = peerManager.TryDialNext()
  435. require.NoError(t, err)
  436. require.Zero(t, dial)
  437. }
  438. func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
  439. ctx, cancel := context.WithCancel(context.Background())
  440. defer cancel()
  441. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  442. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  443. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  444. d := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("d", 40))}
  445. e := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("e", 40))}
  446. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  447. PeerScores: map[types.NodeID]p2p.PeerScore{
  448. a.NodeID: 0,
  449. b.NodeID: 1,
  450. c.NodeID: 2,
  451. d.NodeID: 3,
  452. e.NodeID: 0,
  453. },
  454. PersistentPeers: []types.NodeID{c.NodeID, d.NodeID},
  455. MaxConnected: 2,
  456. MaxConnectedUpgrade: 1,
  457. })
  458. require.NoError(t, err)
  459. // Add a and connect to it.
  460. added, err := peerManager.Add(a)
  461. require.NoError(t, err)
  462. require.True(t, added)
  463. dial, err := peerManager.TryDialNext()
  464. require.NoError(t, err)
  465. require.Equal(t, a, dial)
  466. require.NoError(t, peerManager.Dialed(a))
  467. // Add b and start dialing it.
  468. added, err = peerManager.Add(b)
  469. require.NoError(t, err)
  470. require.True(t, added)
  471. dial, err = peerManager.TryDialNext()
  472. require.NoError(t, err)
  473. require.Equal(t, b, dial)
  474. // Even though we are at capacity, we should be allowed to dial c for an
  475. // upgrade of a, since it's higher-scored.
  476. added, err = peerManager.Add(c)
  477. require.NoError(t, err)
  478. require.True(t, added)
  479. dial, err = peerManager.TryDialNext()
  480. require.NoError(t, err)
  481. require.Equal(t, c, dial)
  482. // However, since we're using all upgrade slots now, we can't add and dial
  483. // d, even though it's also higher-scored.
  484. added, err = peerManager.Add(d)
  485. require.NoError(t, err)
  486. require.True(t, added)
  487. dial, err = peerManager.TryDialNext()
  488. require.NoError(t, err)
  489. require.Zero(t, dial)
  490. // We go through with c's upgrade.
  491. require.NoError(t, peerManager.Dialed(c))
  492. // Still can't dial d.
  493. dial, err = peerManager.TryDialNext()
  494. require.NoError(t, err)
  495. require.Zero(t, dial)
  496. // Now, if we disconnect a, we should be allowed to dial d because we have a
  497. // free upgrade slot.
  498. peerManager.Disconnected(ctx, a.NodeID)
  499. dial, err = peerManager.TryDialNext()
  500. require.NoError(t, err)
  501. require.Equal(t, d, dial)
  502. require.NoError(t, peerManager.Dialed(d))
  503. // However, if we disconnect b (such that only c and d are connected), we
  504. // should not be allowed to dial e even though there are upgrade slots,
  505. // because there are no lower-scored nodes that can be upgraded.
  506. peerManager.Disconnected(ctx, b.NodeID)
  507. added, err = peerManager.Add(e)
  508. require.NoError(t, err)
  509. require.True(t, added)
  510. dial, err = peerManager.TryDialNext()
  511. require.NoError(t, err)
  512. require.Zero(t, dial)
  513. }
  514. func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
  515. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  516. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  517. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  518. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  519. PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 1, c.NodeID: 1},
  520. MaxConnected: 1,
  521. MaxConnectedUpgrade: 2,
  522. })
  523. require.NoError(t, err)
  524. // Add a and connect to it.
  525. added, err := peerManager.Add(a)
  526. require.NoError(t, err)
  527. require.True(t, added)
  528. dial, err := peerManager.TryDialNext()
  529. require.NoError(t, err)
  530. require.Equal(t, a, dial)
  531. require.NoError(t, peerManager.Dialed(a))
  532. // Add b and start dialing it. This will claim a for upgrading.
  533. added, err = peerManager.Add(b)
  534. require.NoError(t, err)
  535. require.True(t, added)
  536. dial, err = peerManager.TryDialNext()
  537. require.NoError(t, err)
  538. require.Equal(t, b, dial)
  539. // Adding c and dialing it will fail, because a is the only connected
  540. // peer that can be upgraded, and b is already trying to upgrade it.
  541. added, err = peerManager.Add(c)
  542. require.NoError(t, err)
  543. require.True(t, added)
  544. dial, err = peerManager.TryDialNext()
  545. require.NoError(t, err)
  546. require.Empty(t, dial)
  547. }
  548. func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
  549. aID := types.NodeID(strings.Repeat("a", 40))
  550. a := p2p.NodeAddress{Protocol: "memory", NodeID: aID}
  551. aTCP := p2p.NodeAddress{Protocol: "tcp", NodeID: aID, Hostname: "localhost"}
  552. bID := types.NodeID(strings.Repeat("b", 40))
  553. b := p2p.NodeAddress{Protocol: "memory", NodeID: bID}
  554. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  555. MaxConnected: 2,
  556. })
  557. require.NoError(t, err)
  558. // Add a and dial it.
  559. added, err := peerManager.Add(a)
  560. require.NoError(t, err)
  561. require.True(t, added)
  562. dial, err := peerManager.TryDialNext()
  563. require.NoError(t, err)
  564. require.Equal(t, a, dial)
  565. // Adding a's TCP address will not dispense a, since it's already dialing.
  566. added, err = peerManager.Add(aTCP)
  567. require.NoError(t, err)
  568. require.True(t, added)
  569. dial, err = peerManager.TryDialNext()
  570. require.NoError(t, err)
  571. require.Zero(t, dial)
  572. // Marking a as dialed will still not dispense it.
  573. require.NoError(t, peerManager.Dialed(a))
  574. dial, err = peerManager.TryDialNext()
  575. require.NoError(t, err)
  576. require.Zero(t, dial)
  577. // Adding b and accepting a connection from it will not dispense it either.
  578. added, err = peerManager.Add(b)
  579. require.NoError(t, err)
  580. require.True(t, added)
  581. require.NoError(t, peerManager.Accepted(bID))
  582. dial, err = peerManager.TryDialNext()
  583. require.NoError(t, err)
  584. require.Zero(t, dial)
  585. }
  586. func TestPeerManager_TryDialNext_Multiple(t *testing.T) {
  587. ctx, cancel := context.WithCancel(context.Background())
  588. defer cancel()
  589. aID := types.NodeID(strings.Repeat("a", 40))
  590. bID := types.NodeID(strings.Repeat("b", 40))
  591. addresses := []p2p.NodeAddress{
  592. {Protocol: "memory", NodeID: aID},
  593. {Protocol: "memory", NodeID: bID},
  594. {Protocol: "tcp", NodeID: aID, Hostname: "127.0.0.1"},
  595. {Protocol: "tcp", NodeID: bID, Hostname: "::1"},
  596. }
  597. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  598. require.NoError(t, err)
  599. for _, address := range addresses {
  600. added, err := peerManager.Add(address)
  601. require.NoError(t, err)
  602. require.True(t, added)
  603. }
  604. // All addresses should be dispensed as long as dialing them has failed.
  605. dial := []p2p.NodeAddress{}
  606. for range addresses {
  607. address, err := peerManager.TryDialNext()
  608. require.NoError(t, err)
  609. require.NotZero(t, address)
  610. require.NoError(t, peerManager.DialFailed(ctx, address))
  611. dial = append(dial, address)
  612. }
  613. require.ElementsMatch(t, dial, addresses)
  614. address, err := peerManager.TryDialNext()
  615. require.NoError(t, err)
  616. require.Zero(t, address)
  617. }
  618. func TestPeerManager_DialFailed(t *testing.T) {
  619. // DialFailed is tested through other tests, we'll just check a few basic
  620. // things here, e.g. reporting unknown addresses.
  621. aID := types.NodeID(strings.Repeat("a", 40))
  622. a := p2p.NodeAddress{Protocol: "memory", NodeID: aID}
  623. bID := types.NodeID(strings.Repeat("b", 40))
  624. b := p2p.NodeAddress{Protocol: "memory", NodeID: bID}
  625. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  626. require.NoError(t, err)
  627. added, err := peerManager.Add(a)
  628. require.NoError(t, err)
  629. require.True(t, added)
  630. ctx, cancel := context.WithCancel(context.Background())
  631. defer cancel()
  632. // Dialing and then calling DialFailed with a different address (same
  633. // NodeID) should unmark as dialing and allow us to dial the other address
  634. // again, but not register the failed address.
  635. dial, err := peerManager.TryDialNext()
  636. require.NoError(t, err)
  637. require.Equal(t, a, dial)
  638. require.NoError(t, peerManager.DialFailed(ctx, p2p.NodeAddress{
  639. Protocol: "tcp", NodeID: aID, Hostname: "localhost"}))
  640. require.Equal(t, []p2p.NodeAddress{a}, peerManager.Addresses(aID))
  641. dial, err = peerManager.TryDialNext()
  642. require.NoError(t, err)
  643. require.Equal(t, a, dial)
  644. // Calling DialFailed on same address twice should be fine.
  645. require.NoError(t, peerManager.DialFailed(ctx, a))
  646. require.NoError(t, peerManager.DialFailed(ctx, a))
  647. // DialFailed on an unknown peer shouldn't error or add it.
  648. require.NoError(t, peerManager.DialFailed(ctx, b))
  649. require.Equal(t, []types.NodeID{aID}, peerManager.Peers())
  650. }
  651. func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
  652. ctx, cancel := context.WithCancel(context.Background())
  653. defer cancel()
  654. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  655. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  656. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  657. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  658. PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 1, c.NodeID: 1},
  659. MaxConnected: 1,
  660. MaxConnectedUpgrade: 2,
  661. })
  662. require.NoError(t, err)
  663. // Add a and connect to it.
  664. added, err := peerManager.Add(a)
  665. require.NoError(t, err)
  666. require.True(t, added)
  667. dial, err := peerManager.TryDialNext()
  668. require.NoError(t, err)
  669. require.Equal(t, a, dial)
  670. require.NoError(t, peerManager.Dialed(a))
  671. // Add b and start dialing it. This will claim a for upgrading.
  672. added, err = peerManager.Add(b)
  673. require.NoError(t, err)
  674. require.True(t, added)
  675. dial, err = peerManager.TryDialNext()
  676. require.NoError(t, err)
  677. require.Equal(t, b, dial)
  678. // Adding c and dialing it will fail, even though it could upgrade a and we
  679. // have free upgrade slots, because a is the only connected peer that can be
  680. // upgraded and b is already trying to upgrade it.
  681. added, err = peerManager.Add(c)
  682. require.NoError(t, err)
  683. require.True(t, added)
  684. dial, err = peerManager.TryDialNext()
  685. require.NoError(t, err)
  686. require.Empty(t, dial)
  687. // Failing b's dial will now make c available for dialing.
  688. require.NoError(t, peerManager.DialFailed(ctx, b))
  689. dial, err = peerManager.TryDialNext()
  690. require.NoError(t, err)
  691. require.Equal(t, c, dial)
  692. }
  693. func TestPeerManager_Dialed_Connected(t *testing.T) {
  694. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  695. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  696. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  697. require.NoError(t, err)
  698. // Marking a as dialed twice should error.
  699. added, err := peerManager.Add(a)
  700. require.NoError(t, err)
  701. require.True(t, added)
  702. dial, err := peerManager.TryDialNext()
  703. require.NoError(t, err)
  704. require.Equal(t, a, dial)
  705. require.NoError(t, peerManager.Dialed(a))
  706. require.Error(t, peerManager.Dialed(a))
  707. // Accepting a connection from b and then trying to mark it as dialed should fail.
  708. added, err = peerManager.Add(b)
  709. require.NoError(t, err)
  710. require.True(t, added)
  711. dial, err = peerManager.TryDialNext()
  712. require.NoError(t, err)
  713. require.Equal(t, b, dial)
  714. require.NoError(t, peerManager.Accepted(b.NodeID))
  715. require.Error(t, peerManager.Dialed(b))
  716. }
  717. func TestPeerManager_Dialed_Self(t *testing.T) {
  718. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  719. require.NoError(t, err)
  720. // Dialing self should error.
  721. _, err = peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: selfID})
  722. require.Error(t, err)
  723. }
  724. func TestPeerManager_Dialed_MaxConnected(t *testing.T) {
  725. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  726. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  727. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  728. MaxConnected: 1,
  729. })
  730. require.NoError(t, err)
  731. // Start to dial a.
  732. added, err := peerManager.Add(a)
  733. require.NoError(t, err)
  734. require.True(t, added)
  735. dial, err := peerManager.TryDialNext()
  736. require.NoError(t, err)
  737. require.Equal(t, a, dial)
  738. // Marking b as dialed in the meanwhile (even without TryDialNext)
  739. // should be fine.
  740. added, err = peerManager.Add(b)
  741. require.NoError(t, err)
  742. require.True(t, added)
  743. require.NoError(t, peerManager.Dialed(b))
  744. // Completing the dial for a should now error.
  745. require.Error(t, peerManager.Dialed(a))
  746. }
  747. func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) {
  748. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  749. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  750. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  751. d := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("d", 40))}
  752. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  753. MaxConnected: 2,
  754. MaxConnectedUpgrade: 1,
  755. PeerScores: map[types.NodeID]p2p.PeerScore{c.NodeID: 1, d.NodeID: 1},
  756. })
  757. require.NoError(t, err)
  758. // Dialing a and b is fine.
  759. added, err := peerManager.Add(a)
  760. require.NoError(t, err)
  761. require.True(t, added)
  762. require.NoError(t, peerManager.Dialed(a))
  763. added, err = peerManager.Add(b)
  764. require.NoError(t, err)
  765. require.True(t, added)
  766. require.NoError(t, peerManager.Dialed(b))
  767. // Starting an upgrade of c should be fine.
  768. added, err = peerManager.Add(c)
  769. require.NoError(t, err)
  770. require.True(t, added)
  771. dial, err := peerManager.TryDialNext()
  772. require.NoError(t, err)
  773. require.Equal(t, c, dial)
  774. require.NoError(t, peerManager.Dialed(c))
  775. // Trying to mark d dialed should fail, since there are no more upgrade
  776. // slots and a/b haven't been evicted yet.
  777. added, err = peerManager.Add(d)
  778. require.NoError(t, err)
  779. require.True(t, added)
  780. require.Error(t, peerManager.Dialed(d))
  781. }
  782. func TestPeerManager_Dialed_Unknown(t *testing.T) {
  783. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  784. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  785. require.NoError(t, err)
  786. // Marking an unknown node as dialed should error.
  787. require.Error(t, peerManager.Dialed(a))
  788. }
  789. func TestPeerManager_Dialed_Upgrade(t *testing.T) {
  790. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  791. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  792. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  793. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  794. MaxConnected: 1,
  795. MaxConnectedUpgrade: 2,
  796. PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 1, c.NodeID: 1},
  797. })
  798. require.NoError(t, err)
  799. // Dialing a is fine.
  800. added, err := peerManager.Add(a)
  801. require.NoError(t, err)
  802. require.True(t, added)
  803. require.NoError(t, peerManager.Dialed(a))
  804. // Upgrading it with b should work, since b has a higher score.
  805. added, err = peerManager.Add(b)
  806. require.NoError(t, err)
  807. require.True(t, added)
  808. dial, err := peerManager.TryDialNext()
  809. require.NoError(t, err)
  810. require.Equal(t, b, dial)
  811. require.NoError(t, peerManager.Dialed(b))
  812. // a hasn't been evicted yet, but c shouldn't be allowed to upgrade anyway
  813. // since it's about to be evicted.
  814. added, err = peerManager.Add(c)
  815. require.NoError(t, err)
  816. require.True(t, added)
  817. dial, err = peerManager.TryDialNext()
  818. require.NoError(t, err)
  819. require.Empty(t, dial)
  820. // a should now be evicted.
  821. evict, err := peerManager.TryEvictNext()
  822. require.NoError(t, err)
  823. require.Equal(t, a.NodeID, evict)
  824. }
  825. func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
  826. ctx, cancel := context.WithCancel(context.Background())
  827. defer cancel()
  828. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  829. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  830. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  831. d := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("d", 40))}
  832. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  833. MaxConnected: 2,
  834. MaxConnectedUpgrade: 1,
  835. PeerScores: map[types.NodeID]p2p.PeerScore{
  836. a.NodeID: 3,
  837. b.NodeID: 2,
  838. c.NodeID: 10,
  839. d.NodeID: 1,
  840. },
  841. })
  842. require.NoError(t, err)
  843. // Connect to a and b.
  844. added, err := peerManager.Add(a)
  845. require.NoError(t, err)
  846. require.True(t, added)
  847. require.NoError(t, peerManager.Dialed(a))
  848. added, err = peerManager.Add(b)
  849. require.NoError(t, err)
  850. require.True(t, added)
  851. require.NoError(t, peerManager.Dialed(b))
  852. // Start an upgrade with c, which should pick b to upgrade (since it
  853. // has score 2).
  854. added, err = peerManager.Add(c)
  855. require.NoError(t, err)
  856. require.True(t, added)
  857. dial, err := peerManager.TryDialNext()
  858. require.NoError(t, err)
  859. require.Equal(t, c, dial)
  860. // In the meanwhile, a disconnects and d connects. d is even lower-scored
  861. // than b (1 vs 2), which is currently being upgraded.
  862. peerManager.Disconnected(ctx, a.NodeID)
  863. added, err = peerManager.Add(d)
  864. require.NoError(t, err)
  865. require.True(t, added)
  866. require.NoError(t, peerManager.Accepted(d.NodeID))
  867. // Once c completes the upgrade of b, it should instead evict d,
  868. // since it has en even lower score.
  869. require.NoError(t, peerManager.Dialed(c))
  870. evict, err := peerManager.TryEvictNext()
  871. require.NoError(t, err)
  872. require.Equal(t, d.NodeID, evict)
  873. }
  874. func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) {
  875. ctx, cancel := context.WithCancel(context.Background())
  876. defer cancel()
  877. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  878. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  879. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  880. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  881. MaxConnected: 2,
  882. MaxConnectedUpgrade: 1,
  883. PeerScores: map[types.NodeID]p2p.PeerScore{
  884. a.NodeID: 1,
  885. b.NodeID: 2,
  886. c.NodeID: 3,
  887. },
  888. })
  889. require.NoError(t, err)
  890. // Connect to a and b.
  891. added, err := peerManager.Add(a)
  892. require.NoError(t, err)
  893. require.True(t, added)
  894. require.NoError(t, peerManager.Dialed(a))
  895. added, err = peerManager.Add(b)
  896. require.NoError(t, err)
  897. require.True(t, added)
  898. require.NoError(t, peerManager.Dialed(b))
  899. // Start an upgrade with c, which should pick a to upgrade.
  900. added, err = peerManager.Add(c)
  901. require.NoError(t, err)
  902. require.True(t, added)
  903. dial, err := peerManager.TryDialNext()
  904. require.NoError(t, err)
  905. require.Equal(t, c, dial)
  906. // In the meanwhile, b disconnects.
  907. peerManager.Disconnected(ctx, b.NodeID)
  908. // Once c completes the upgrade of b, there is no longer a need to
  909. // evict anything since we're at capacity.
  910. // since it has en even lower score.
  911. require.NoError(t, peerManager.Dialed(c))
  912. evict, err := peerManager.TryEvictNext()
  913. require.NoError(t, err)
  914. require.Zero(t, evict)
  915. }
  916. func TestPeerManager_Accepted(t *testing.T) {
  917. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  918. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  919. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  920. d := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("d", 40))}
  921. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  922. require.NoError(t, err)
  923. // Accepting a connection from self should error.
  924. require.Error(t, peerManager.Accepted(selfID))
  925. // Accepting a connection from a known peer should work.
  926. added, err := peerManager.Add(a)
  927. require.NoError(t, err)
  928. require.True(t, added)
  929. require.NoError(t, peerManager.Accepted(a.NodeID))
  930. // Accepting a connection from an already accepted peer should error.
  931. require.Error(t, peerManager.Accepted(a.NodeID))
  932. // Accepting a connection from an unknown peer should work and register it.
  933. require.NoError(t, peerManager.Accepted(b.NodeID))
  934. require.ElementsMatch(t, []types.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
  935. // Accepting a connection from a peer that's being dialed should work, and
  936. // should cause the dial to fail.
  937. added, err = peerManager.Add(c)
  938. require.NoError(t, err)
  939. require.True(t, added)
  940. dial, err := peerManager.TryDialNext()
  941. require.NoError(t, err)
  942. require.Equal(t, c, dial)
  943. require.NoError(t, peerManager.Accepted(c.NodeID))
  944. require.Error(t, peerManager.Dialed(c))
  945. // Accepting a connection from a peer that's been dialed should fail.
  946. added, err = peerManager.Add(d)
  947. require.NoError(t, err)
  948. require.True(t, added)
  949. dial, err = peerManager.TryDialNext()
  950. require.NoError(t, err)
  951. require.Equal(t, d, dial)
  952. require.NoError(t, peerManager.Dialed(d))
  953. require.Error(t, peerManager.Accepted(d.NodeID))
  954. }
  955. func TestPeerManager_Accepted_MaxConnected(t *testing.T) {
  956. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  957. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  958. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  959. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  960. MaxConnected: 2,
  961. })
  962. require.NoError(t, err)
  963. // Connect to a and b.
  964. added, err := peerManager.Add(a)
  965. require.NoError(t, err)
  966. require.True(t, added)
  967. require.NoError(t, peerManager.Dialed(a))
  968. added, err = peerManager.Add(b)
  969. require.NoError(t, err)
  970. require.True(t, added)
  971. require.NoError(t, peerManager.Accepted(b.NodeID))
  972. // Accepting c should now fail.
  973. added, err = peerManager.Add(c)
  974. require.NoError(t, err)
  975. require.True(t, added)
  976. require.Error(t, peerManager.Accepted(c.NodeID))
  977. }
  978. func TestPeerManager_Accepted_MaxConnectedUpgrade(t *testing.T) {
  979. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  980. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  981. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  982. d := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("d", 40))}
  983. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  984. PeerScores: map[types.NodeID]p2p.PeerScore{
  985. c.NodeID: 1,
  986. d.NodeID: 2,
  987. },
  988. MaxConnected: 1,
  989. MaxConnectedUpgrade: 1,
  990. })
  991. require.NoError(t, err)
  992. // Dial a.
  993. added, err := peerManager.Add(a)
  994. require.NoError(t, err)
  995. require.True(t, added)
  996. require.NoError(t, peerManager.Dialed(a))
  997. // Accepting b should fail, since it's not an upgrade over a.
  998. added, err = peerManager.Add(b)
  999. require.NoError(t, err)
  1000. require.True(t, added)
  1001. require.Error(t, peerManager.Accepted(b.NodeID))
  1002. // Accepting c should work, since it upgrades a.
  1003. added, err = peerManager.Add(c)
  1004. require.NoError(t, err)
  1005. require.True(t, added)
  1006. require.NoError(t, peerManager.Accepted(c.NodeID))
  1007. // a still hasn't been evicted, so accepting b should still fail.
  1008. _, err = peerManager.Add(b)
  1009. require.NoError(t, err)
  1010. require.Error(t, peerManager.Accepted(b.NodeID))
  1011. // Also, accepting d should fail, since all upgrade slots are full.
  1012. added, err = peerManager.Add(d)
  1013. require.NoError(t, err)
  1014. require.True(t, added)
  1015. require.Error(t, peerManager.Accepted(d.NodeID))
  1016. }
  1017. func TestPeerManager_Accepted_Upgrade(t *testing.T) {
  1018. ctx, cancel := context.WithCancel(context.Background())
  1019. defer cancel()
  1020. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1021. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  1022. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  1023. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  1024. PeerScores: map[types.NodeID]p2p.PeerScore{
  1025. b.NodeID: 1,
  1026. c.NodeID: 1,
  1027. },
  1028. MaxConnected: 1,
  1029. MaxConnectedUpgrade: 2,
  1030. })
  1031. require.NoError(t, err)
  1032. // Accept a.
  1033. added, err := peerManager.Add(a)
  1034. require.NoError(t, err)
  1035. require.True(t, added)
  1036. require.NoError(t, peerManager.Accepted(a.NodeID))
  1037. // Accepting b should work, since it upgrades a.
  1038. added, err = peerManager.Add(b)
  1039. require.NoError(t, err)
  1040. require.True(t, added)
  1041. require.NoError(t, peerManager.Accepted(b.NodeID))
  1042. // c cannot get accepted, since a has been upgraded by b.
  1043. added, err = peerManager.Add(c)
  1044. require.NoError(t, err)
  1045. require.True(t, added)
  1046. require.Error(t, peerManager.Accepted(c.NodeID))
  1047. // This should cause a to get evicted.
  1048. evict, err := peerManager.TryEvictNext()
  1049. require.NoError(t, err)
  1050. require.Equal(t, a.NodeID, evict)
  1051. peerManager.Disconnected(ctx, a.NodeID)
  1052. // c still cannot get accepted, since it's not scored above b.
  1053. require.Error(t, peerManager.Accepted(c.NodeID))
  1054. }
  1055. func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) {
  1056. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1057. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  1058. c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}
  1059. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  1060. PeerScores: map[types.NodeID]p2p.PeerScore{
  1061. b.NodeID: 1,
  1062. c.NodeID: 1,
  1063. },
  1064. MaxConnected: 1,
  1065. MaxConnectedUpgrade: 2,
  1066. })
  1067. require.NoError(t, err)
  1068. // Accept a.
  1069. added, err := peerManager.Add(a)
  1070. require.NoError(t, err)
  1071. require.True(t, added)
  1072. require.NoError(t, peerManager.Accepted(a.NodeID))
  1073. // Start dial upgrade from a to b.
  1074. added, err = peerManager.Add(b)
  1075. require.NoError(t, err)
  1076. require.True(t, added)
  1077. dial, err := peerManager.TryDialNext()
  1078. require.NoError(t, err)
  1079. require.Equal(t, b, dial)
  1080. // a has already been claimed as an upgrade of a, so accepting
  1081. // c should fail since there's noone else to upgrade.
  1082. added, err = peerManager.Add(c)
  1083. require.NoError(t, err)
  1084. require.True(t, added)
  1085. require.Error(t, peerManager.Accepted(c.NodeID))
  1086. // However, if b connects to us while we're also trying to upgrade to it via
  1087. // dialing, then we accept the incoming connection as an upgrade.
  1088. require.NoError(t, peerManager.Accepted(b.NodeID))
  1089. // This should cause a to get evicted, and the dial upgrade to fail.
  1090. evict, err := peerManager.TryEvictNext()
  1091. require.NoError(t, err)
  1092. require.Equal(t, a.NodeID, evict)
  1093. require.Error(t, peerManager.Dialed(b))
  1094. }
  1095. func TestPeerManager_Ready(t *testing.T) {
  1096. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1097. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  1098. ctx, cancel := context.WithCancel(context.Background())
  1099. defer cancel()
  1100. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1101. require.NoError(t, err)
  1102. sub := peerManager.Subscribe(ctx)
  1103. // Connecting to a should still have it as status down.
  1104. added, err := peerManager.Add(a)
  1105. require.NoError(t, err)
  1106. require.True(t, added)
  1107. require.NoError(t, peerManager.Accepted(a.NodeID))
  1108. require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
  1109. // Marking a as ready should transition it to PeerStatusUp and send an update.
  1110. peerManager.Ready(ctx, a.NodeID)
  1111. require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
  1112. require.Equal(t, p2p.PeerUpdate{
  1113. NodeID: a.NodeID,
  1114. Status: p2p.PeerStatusUp,
  1115. }, <-sub.Updates())
  1116. // Marking an unconnected peer as ready should do nothing.
  1117. added, err = peerManager.Add(b)
  1118. require.NoError(t, err)
  1119. require.True(t, added)
  1120. require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
  1121. peerManager.Ready(ctx, b.NodeID)
  1122. require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
  1123. require.Empty(t, sub.Updates())
  1124. }
  1125. // See TryEvictNext for most tests, this just tests blocking behavior.
  1126. func TestPeerManager_EvictNext(t *testing.T) {
  1127. ctx, cancel := context.WithCancel(context.Background())
  1128. defer cancel()
  1129. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1130. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1131. require.NoError(t, err)
  1132. added, err := peerManager.Add(a)
  1133. require.NoError(t, err)
  1134. require.True(t, added)
  1135. require.NoError(t, peerManager.Accepted(a.NodeID))
  1136. peerManager.Ready(ctx, a.NodeID)
  1137. // Since there are no peers to evict, EvictNext should block until timeout.
  1138. timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
  1139. defer cancel()
  1140. _, err = peerManager.EvictNext(timeoutCtx)
  1141. require.Error(t, err)
  1142. require.Equal(t, context.DeadlineExceeded, err)
  1143. // Erroring the peer will return it from EvictNext().
  1144. peerManager.Errored(a.NodeID, errors.New("foo"))
  1145. evict, err := peerManager.EvictNext(timeoutCtx)
  1146. require.NoError(t, err)
  1147. require.Equal(t, a.NodeID, evict)
  1148. // Since there are no more peers to evict, the next call should block.
  1149. timeoutCtx, cancel = context.WithTimeout(ctx, 100*time.Millisecond)
  1150. defer cancel()
  1151. _, err = peerManager.EvictNext(timeoutCtx)
  1152. require.Error(t, err)
  1153. require.Equal(t, context.DeadlineExceeded, err)
  1154. }
  1155. func TestPeerManager_EvictNext_WakeOnError(t *testing.T) {
  1156. ctx, cancel := context.WithCancel(context.Background())
  1157. defer cancel()
  1158. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1159. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1160. require.NoError(t, err)
  1161. added, err := peerManager.Add(a)
  1162. require.NoError(t, err)
  1163. require.True(t, added)
  1164. require.NoError(t, peerManager.Accepted(a.NodeID))
  1165. peerManager.Ready(ctx, a.NodeID)
  1166. // Spawn a goroutine to error a peer after a delay.
  1167. go func() {
  1168. time.Sleep(200 * time.Millisecond)
  1169. peerManager.Errored(a.NodeID, errors.New("foo"))
  1170. }()
  1171. // This will block until peer errors above.
  1172. ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
  1173. defer cancel()
  1174. evict, err := peerManager.EvictNext(ctx)
  1175. require.NoError(t, err)
  1176. require.Equal(t, a.NodeID, evict)
  1177. }
  1178. func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
  1179. ctx, cancel := context.WithCancel(context.Background())
  1180. defer cancel()
  1181. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1182. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  1183. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  1184. MaxConnected: 1,
  1185. MaxConnectedUpgrade: 1,
  1186. PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 1},
  1187. })
  1188. require.NoError(t, err)
  1189. // Connect a.
  1190. added, err := peerManager.Add(a)
  1191. require.NoError(t, err)
  1192. require.True(t, added)
  1193. require.NoError(t, peerManager.Accepted(a.NodeID))
  1194. peerManager.Ready(ctx, a.NodeID)
  1195. // Spawn a goroutine to upgrade to b with a delay.
  1196. go func() {
  1197. time.Sleep(200 * time.Millisecond)
  1198. added, err := peerManager.Add(b)
  1199. require.NoError(t, err)
  1200. require.True(t, added)
  1201. dial, err := peerManager.TryDialNext()
  1202. require.NoError(t, err)
  1203. require.Equal(t, b, dial)
  1204. require.NoError(t, peerManager.Dialed(b))
  1205. }()
  1206. // This will block until peer is upgraded above.
  1207. ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
  1208. defer cancel()
  1209. evict, err := peerManager.EvictNext(ctx)
  1210. require.NoError(t, err)
  1211. require.Equal(t, a.NodeID, evict)
  1212. }
  1213. func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) {
  1214. ctx, cancel := context.WithCancel(context.Background())
  1215. defer cancel()
  1216. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1217. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  1218. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  1219. MaxConnected: 1,
  1220. MaxConnectedUpgrade: 1,
  1221. PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 1},
  1222. })
  1223. require.NoError(t, err)
  1224. // Connect a.
  1225. added, err := peerManager.Add(a)
  1226. require.NoError(t, err)
  1227. require.True(t, added)
  1228. require.NoError(t, peerManager.Accepted(a.NodeID))
  1229. peerManager.Ready(ctx, a.NodeID)
  1230. // Spawn a goroutine to upgrade b with a delay.
  1231. go func() {
  1232. time.Sleep(200 * time.Millisecond)
  1233. require.NoError(t, peerManager.Accepted(b.NodeID))
  1234. }()
  1235. // This will block until peer is upgraded above.
  1236. ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
  1237. defer cancel()
  1238. evict, err := peerManager.EvictNext(ctx)
  1239. require.NoError(t, err)
  1240. require.Equal(t, a.NodeID, evict)
  1241. }
  1242. func TestPeerManager_TryEvictNext(t *testing.T) {
  1243. ctx, cancel := context.WithCancel(context.Background())
  1244. defer cancel()
  1245. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1246. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1247. require.NoError(t, err)
  1248. added, err := peerManager.Add(a)
  1249. require.NoError(t, err)
  1250. require.True(t, added)
  1251. // Nothing is evicted with no peers connected.
  1252. evict, err := peerManager.TryEvictNext()
  1253. require.NoError(t, err)
  1254. require.Zero(t, evict)
  1255. // Connecting to a won't evict anything either.
  1256. require.NoError(t, peerManager.Accepted(a.NodeID))
  1257. peerManager.Ready(ctx, a.NodeID)
  1258. // But if a errors it should be evicted.
  1259. peerManager.Errored(a.NodeID, errors.New("foo"))
  1260. evict, err = peerManager.TryEvictNext()
  1261. require.NoError(t, err)
  1262. require.Equal(t, a.NodeID, evict)
  1263. // While a is being evicted (before disconnect), it shouldn't get evicted again.
  1264. evict, err = peerManager.TryEvictNext()
  1265. require.NoError(t, err)
  1266. require.Zero(t, evict)
  1267. peerManager.Errored(a.NodeID, errors.New("foo"))
  1268. evict, err = peerManager.TryEvictNext()
  1269. require.NoError(t, err)
  1270. require.Zero(t, evict)
  1271. }
  1272. func TestPeerManager_Disconnected(t *testing.T) {
  1273. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1274. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1275. require.NoError(t, err)
  1276. ctx, cancel := context.WithCancel(context.Background())
  1277. defer cancel()
  1278. sub := peerManager.Subscribe(ctx)
  1279. // Disconnecting an unknown peer does nothing.
  1280. peerManager.Disconnected(ctx, a.NodeID)
  1281. require.Empty(t, peerManager.Peers())
  1282. require.Empty(t, sub.Updates())
  1283. // Disconnecting an accepted non-ready peer does not send a status update.
  1284. added, err := peerManager.Add(a)
  1285. require.NoError(t, err)
  1286. require.True(t, added)
  1287. require.NoError(t, peerManager.Accepted(a.NodeID))
  1288. peerManager.Disconnected(ctx, a.NodeID)
  1289. require.Empty(t, sub.Updates())
  1290. // Disconnecting a ready peer sends a status update.
  1291. _, err = peerManager.Add(a)
  1292. require.NoError(t, err)
  1293. require.NoError(t, peerManager.Accepted(a.NodeID))
  1294. peerManager.Ready(ctx, a.NodeID)
  1295. require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
  1296. require.NotEmpty(t, sub.Updates())
  1297. require.Equal(t, p2p.PeerUpdate{
  1298. NodeID: a.NodeID,
  1299. Status: p2p.PeerStatusUp,
  1300. }, <-sub.Updates())
  1301. peerManager.Disconnected(ctx, a.NodeID)
  1302. require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
  1303. require.NotEmpty(t, sub.Updates())
  1304. require.Equal(t, p2p.PeerUpdate{
  1305. NodeID: a.NodeID,
  1306. Status: p2p.PeerStatusDown,
  1307. }, <-sub.Updates())
  1308. // Disconnecting a dialing peer does not unmark it as dialing, to avoid
  1309. // dialing it multiple times in parallel.
  1310. dial, err := peerManager.TryDialNext()
  1311. require.NoError(t, err)
  1312. require.Equal(t, a, dial)
  1313. peerManager.Disconnected(ctx, a.NodeID)
  1314. dial, err = peerManager.TryDialNext()
  1315. require.NoError(t, err)
  1316. require.Zero(t, dial)
  1317. }
  1318. func TestPeerManager_Errored(t *testing.T) {
  1319. ctx, cancel := context.WithCancel(context.Background())
  1320. defer cancel()
  1321. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1322. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1323. require.NoError(t, err)
  1324. // Erroring an unknown peer does nothing.
  1325. peerManager.Errored(a.NodeID, errors.New("foo"))
  1326. require.Empty(t, peerManager.Peers())
  1327. evict, err := peerManager.TryEvictNext()
  1328. require.NoError(t, err)
  1329. require.Zero(t, evict)
  1330. // Erroring a known peer does nothing, and won't evict it later,
  1331. // even when it connects.
  1332. added, err := peerManager.Add(a)
  1333. require.NoError(t, err)
  1334. require.True(t, added)
  1335. peerManager.Errored(a.NodeID, errors.New("foo"))
  1336. evict, err = peerManager.TryEvictNext()
  1337. require.NoError(t, err)
  1338. require.Zero(t, evict)
  1339. require.NoError(t, peerManager.Accepted(a.NodeID))
  1340. peerManager.Ready(ctx, a.NodeID)
  1341. evict, err = peerManager.TryEvictNext()
  1342. require.NoError(t, err)
  1343. require.Zero(t, evict)
  1344. // However, erroring once connected will evict it.
  1345. peerManager.Errored(a.NodeID, errors.New("foo"))
  1346. evict, err = peerManager.TryEvictNext()
  1347. require.NoError(t, err)
  1348. require.Equal(t, a.NodeID, evict)
  1349. }
  1350. func TestPeerManager_Subscribe(t *testing.T) {
  1351. ctx, cancel := context.WithCancel(context.Background())
  1352. defer cancel()
  1353. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1354. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1355. require.NoError(t, err)
  1356. // This tests all subscription events for full peer lifecycles.
  1357. sub := peerManager.Subscribe(ctx)
  1358. added, err := peerManager.Add(a)
  1359. require.NoError(t, err)
  1360. require.True(t, added)
  1361. require.Empty(t, sub.Updates())
  1362. // Inbound connection.
  1363. require.NoError(t, peerManager.Accepted(a.NodeID))
  1364. require.Empty(t, sub.Updates())
  1365. peerManager.Ready(ctx, a.NodeID)
  1366. require.NotEmpty(t, sub.Updates())
  1367. require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
  1368. peerManager.Disconnected(ctx, a.NodeID)
  1369. require.NotEmpty(t, sub.Updates())
  1370. require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
  1371. // Outbound connection with peer error and eviction.
  1372. dial, err := peerManager.TryDialNext()
  1373. require.NoError(t, err)
  1374. require.Equal(t, a, dial)
  1375. require.Empty(t, sub.Updates())
  1376. require.NoError(t, peerManager.Dialed(a))
  1377. require.Empty(t, sub.Updates())
  1378. peerManager.Ready(ctx, a.NodeID)
  1379. require.NotEmpty(t, sub.Updates())
  1380. require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
  1381. peerManager.Errored(a.NodeID, errors.New("foo"))
  1382. require.Empty(t, sub.Updates())
  1383. evict, err := peerManager.TryEvictNext()
  1384. require.NoError(t, err)
  1385. require.Equal(t, a.NodeID, evict)
  1386. peerManager.Disconnected(ctx, a.NodeID)
  1387. require.NotEmpty(t, sub.Updates())
  1388. require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
  1389. // Outbound connection with dial failure.
  1390. dial, err = peerManager.TryDialNext()
  1391. require.NoError(t, err)
  1392. require.Equal(t, a, dial)
  1393. require.Empty(t, sub.Updates())
  1394. require.NoError(t, peerManager.DialFailed(ctx, a))
  1395. require.Empty(t, sub.Updates())
  1396. }
  1397. func TestPeerManager_Subscribe_Close(t *testing.T) {
  1398. ctx, cancel := context.WithCancel(context.Background())
  1399. defer cancel()
  1400. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1401. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1402. require.NoError(t, err)
  1403. sub := peerManager.Subscribe(ctx)
  1404. added, err := peerManager.Add(a)
  1405. require.NoError(t, err)
  1406. require.True(t, added)
  1407. require.NoError(t, peerManager.Accepted(a.NodeID))
  1408. require.Empty(t, sub.Updates())
  1409. peerManager.Ready(ctx, a.NodeID)
  1410. require.NotEmpty(t, sub.Updates())
  1411. require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
  1412. // Closing the subscription should not send us the disconnected update.
  1413. cancel()
  1414. peerManager.Disconnected(ctx, a.NodeID)
  1415. require.Empty(t, sub.Updates())
  1416. }
  1417. func TestPeerManager_Subscribe_Broadcast(t *testing.T) {
  1418. ctx, cancel := context.WithCancel(context.Background())
  1419. defer cancel()
  1420. t.Cleanup(leaktest.Check(t))
  1421. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1422. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  1423. require.NoError(t, err)
  1424. s2ctx, s2cancel := context.WithCancel(ctx)
  1425. defer s2cancel()
  1426. s1 := peerManager.Subscribe(ctx)
  1427. s2 := peerManager.Subscribe(s2ctx)
  1428. s3 := peerManager.Subscribe(ctx)
  1429. // Connecting to a peer should send updates on all subscriptions.
  1430. added, err := peerManager.Add(a)
  1431. require.NoError(t, err)
  1432. require.True(t, added)
  1433. require.NoError(t, peerManager.Accepted(a.NodeID))
  1434. peerManager.Ready(ctx, a.NodeID)
  1435. expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}
  1436. require.NotEmpty(t, s1)
  1437. require.Equal(t, expectUp, <-s1.Updates())
  1438. require.NotEmpty(t, s2)
  1439. require.Equal(t, expectUp, <-s2.Updates())
  1440. require.NotEmpty(t, s3)
  1441. require.Equal(t, expectUp, <-s3.Updates())
  1442. // We now close s2. Disconnecting the peer should only send updates
  1443. // on s1 and s3.
  1444. s2cancel()
  1445. time.Sleep(250 * time.Millisecond) // give the thread a chance to exit
  1446. peerManager.Disconnected(ctx, a.NodeID)
  1447. expectDown := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}
  1448. require.NotEmpty(t, s1)
  1449. require.Equal(t, expectDown, <-s1.Updates())
  1450. require.Empty(t, s2.Updates())
  1451. require.NotEmpty(t, s3)
  1452. require.Equal(t, expectDown, <-s3.Updates())
  1453. }
  1454. func TestPeerManager_Close(t *testing.T) {
  1455. // leaktest will check that spawned goroutines are closed.
  1456. t.Cleanup(leaktest.CheckTimeout(t, 1*time.Second))
  1457. ctx, cancel := context.WithCancel(context.Background())
  1458. defer cancel()
  1459. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1460. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  1461. MinRetryTime: 10 * time.Second,
  1462. })
  1463. require.NoError(t, err)
  1464. // This subscription isn't closed, but PeerManager.Close()
  1465. // should reap the spawned goroutine.
  1466. _ = peerManager.Subscribe(ctx)
  1467. // This dial failure will start a retry timer for 10 seconds, which
  1468. // should be reaped.
  1469. added, err := peerManager.Add(a)
  1470. require.NoError(t, err)
  1471. require.True(t, added)
  1472. dial, err := peerManager.TryDialNext()
  1473. require.NoError(t, err)
  1474. require.Equal(t, a, dial)
  1475. require.NoError(t, peerManager.DialFailed(ctx, a))
  1476. }
  1477. func TestPeerManager_Advertise(t *testing.T) {
  1478. aID := types.NodeID(strings.Repeat("a", 40))
  1479. aTCP := p2p.NodeAddress{Protocol: "tcp", NodeID: aID, Hostname: "127.0.0.1", Port: 26657, Path: "/path"}
  1480. aMem := p2p.NodeAddress{Protocol: "memory", NodeID: aID}
  1481. bID := types.NodeID(strings.Repeat("b", 40))
  1482. bTCP := p2p.NodeAddress{Protocol: "tcp", NodeID: bID, Hostname: "b10c::1", Port: 26657, Path: "/path"}
  1483. bMem := p2p.NodeAddress{Protocol: "memory", NodeID: bID}
  1484. cID := types.NodeID(strings.Repeat("c", 40))
  1485. cTCP := p2p.NodeAddress{Protocol: "tcp", NodeID: cID, Hostname: "host.domain", Port: 80}
  1486. cMem := p2p.NodeAddress{Protocol: "memory", NodeID: cID}
  1487. dID := types.NodeID(strings.Repeat("d", 40))
  1488. // Create an initial peer manager and add the peers.
  1489. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  1490. PeerScores: map[types.NodeID]p2p.PeerScore{aID: 3, bID: 2, cID: 1},
  1491. })
  1492. require.NoError(t, err)
  1493. added, err := peerManager.Add(aTCP)
  1494. require.NoError(t, err)
  1495. require.True(t, added)
  1496. added, err = peerManager.Add(aMem)
  1497. require.NoError(t, err)
  1498. require.True(t, added)
  1499. added, err = peerManager.Add(bTCP)
  1500. require.NoError(t, err)
  1501. require.True(t, added)
  1502. added, err = peerManager.Add(bMem)
  1503. require.NoError(t, err)
  1504. require.True(t, added)
  1505. added, err = peerManager.Add(cTCP)
  1506. require.NoError(t, err)
  1507. require.True(t, added)
  1508. added, err = peerManager.Add(cMem)
  1509. require.NoError(t, err)
  1510. require.True(t, added)
  1511. // d should get all addresses.
  1512. require.ElementsMatch(t, []p2p.NodeAddress{
  1513. aTCP, aMem, bTCP, bMem, cTCP, cMem,
  1514. }, peerManager.Advertise(dID, 100))
  1515. // a should not get its own addresses.
  1516. require.ElementsMatch(t, []p2p.NodeAddress{
  1517. bTCP, bMem, cTCP, cMem,
  1518. }, peerManager.Advertise(aID, 100))
  1519. // Asking for 0 addresses should return, well, 0.
  1520. require.Empty(t, peerManager.Advertise(aID, 0))
  1521. // Asking for 2 addresses should get the highest-rated ones, i.e. a.
  1522. require.ElementsMatch(t, []p2p.NodeAddress{
  1523. aTCP, aMem,
  1524. }, peerManager.Advertise(dID, 2))
  1525. }
  1526. func TestPeerManager_Advertise_Self(t *testing.T) {
  1527. dID := types.NodeID(strings.Repeat("d", 40))
  1528. self := p2p.NodeAddress{Protocol: "tcp", NodeID: selfID, Hostname: "2001:db8::1", Port: 26657}
  1529. // Create a peer manager with SelfAddress defined.
  1530. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  1531. SelfAddress: self,
  1532. })
  1533. require.NoError(t, err)
  1534. // peer manager should always advertise its SelfAddress.
  1535. require.ElementsMatch(t, []p2p.NodeAddress{
  1536. self,
  1537. }, peerManager.Advertise(dID, 100))
  1538. }
  1539. func TestPeerManager_SetHeight_GetHeight(t *testing.T) {
  1540. a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
  1541. b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))}
  1542. db := dbm.NewMemDB()
  1543. peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
  1544. require.NoError(t, err)
  1545. // Getting a height should default to 0, for unknown peers and
  1546. // for known peers without height.
  1547. added, err := peerManager.Add(a)
  1548. require.NoError(t, err)
  1549. require.True(t, added)
  1550. require.EqualValues(t, 0, peerManager.GetHeight(a.NodeID))
  1551. require.EqualValues(t, 0, peerManager.GetHeight(b.NodeID))
  1552. // Setting a height should work for a known node.
  1553. require.NoError(t, peerManager.SetHeight(a.NodeID, 3))
  1554. require.EqualValues(t, 3, peerManager.GetHeight(a.NodeID))
  1555. // Setting a height should add an unknown node.
  1556. require.Equal(t, []types.NodeID{a.NodeID}, peerManager.Peers())
  1557. require.NoError(t, peerManager.SetHeight(b.NodeID, 7))
  1558. require.EqualValues(t, 7, peerManager.GetHeight(b.NodeID))
  1559. require.ElementsMatch(t, []types.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
  1560. // The heights should not be persisted.
  1561. peerManager, err = p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
  1562. require.NoError(t, err)
  1563. require.ElementsMatch(t, []types.NodeID{a.NodeID, b.NodeID}, peerManager.Peers())
  1564. require.Zero(t, peerManager.GetHeight(a.NodeID))
  1565. require.Zero(t, peerManager.GetHeight(b.NodeID))
  1566. }