You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

969 lines
29 KiB

p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
  1. package p2p_test
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "runtime"
  8. "strings"
  9. "sync"
  10. "testing"
  11. "time"
  12. "github.com/fortytw2/leaktest"
  13. "github.com/gogo/protobuf/proto"
  14. gogotypes "github.com/gogo/protobuf/types"
  15. "github.com/stretchr/testify/mock"
  16. "github.com/stretchr/testify/require"
  17. dbm "github.com/tendermint/tm-db"
  18. "github.com/tendermint/tendermint/crypto"
  19. "github.com/tendermint/tendermint/internal/p2p"
  20. "github.com/tendermint/tendermint/internal/p2p/mocks"
  21. "github.com/tendermint/tendermint/internal/p2p/p2ptest"
  22. "github.com/tendermint/tendermint/libs/log"
  23. "github.com/tendermint/tendermint/types"
  24. )
  25. func echoReactor(ctx context.Context, channel *p2p.Channel) {
  26. iter := channel.Receive(ctx)
  27. for iter.Next(ctx) {
  28. envelope := iter.Envelope()
  29. value := envelope.Message.(*p2ptest.Message).Value
  30. if err := channel.Send(ctx, p2p.Envelope{
  31. To: envelope.From,
  32. Message: &p2ptest.Message{Value: value},
  33. }); err != nil {
  34. return
  35. }
  36. }
  37. }
  38. func TestRouter_Network(t *testing.T) {
  39. ctx, cancel := context.WithCancel(context.Background())
  40. defer cancel()
  41. t.Cleanup(leaktest.Check(t))
  42. // Create a test network and open a channel where all peers run echoReactor.
  43. network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 8})
  44. local := network.RandomNode()
  45. peers := network.Peers(local.NodeID)
  46. channels := network.MakeChannels(ctx, t, chDesc)
  47. network.Start(ctx, t)
  48. channel := channels[local.NodeID]
  49. for _, peer := range peers {
  50. go echoReactor(ctx, channels[peer.NodeID])
  51. }
  52. // Sending a message to each peer should work.
  53. for _, peer := range peers {
  54. p2ptest.RequireSendReceive(ctx, t, channel, peer.NodeID,
  55. &p2ptest.Message{Value: "foo"},
  56. &p2ptest.Message{Value: "foo"},
  57. )
  58. }
  59. // Sending a broadcast should return back a message from all peers.
  60. p2ptest.RequireSend(ctx, t, channel, p2p.Envelope{
  61. Broadcast: true,
  62. Message: &p2ptest.Message{Value: "bar"},
  63. })
  64. expect := []*p2p.Envelope{}
  65. for _, peer := range peers {
  66. expect = append(expect, &p2p.Envelope{
  67. From: peer.NodeID,
  68. ChannelID: 1,
  69. Message: &p2ptest.Message{Value: "bar"},
  70. })
  71. }
  72. p2ptest.RequireReceiveUnordered(ctx, t, channel, expect)
  73. // We then submit an error for a peer, and watch it get disconnected and
  74. // then reconnected as the router retries it.
  75. peerUpdates := local.MakePeerUpdatesNoRequireEmpty(ctx, t)
  76. require.NoError(t, channel.SendError(ctx, p2p.PeerError{
  77. NodeID: peers[0].NodeID,
  78. Err: errors.New("boom"),
  79. }))
  80. p2ptest.RequireUpdates(t, peerUpdates, []p2p.PeerUpdate{
  81. {NodeID: peers[0].NodeID, Status: p2p.PeerStatusDown},
  82. {NodeID: peers[0].NodeID, Status: p2p.PeerStatusUp},
  83. })
  84. }
  85. func TestRouter_Channel_Basic(t *testing.T) {
  86. t.Cleanup(leaktest.Check(t))
  87. ctx, cancel := context.WithCancel(context.Background())
  88. defer cancel()
  89. // Set up a router with no transports (so no peers).
  90. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  91. require.NoError(t, err)
  92. router, err := p2p.NewRouter(
  93. ctx,
  94. log.TestingLogger(),
  95. p2p.NopMetrics(),
  96. selfInfo,
  97. selfKey,
  98. peerManager,
  99. nil,
  100. nil,
  101. p2p.RouterOptions{},
  102. )
  103. require.NoError(t, err)
  104. require.NoError(t, router.Start(ctx))
  105. t.Cleanup(router.Wait)
  106. // Opening a channel should work.
  107. chctx, chcancel := context.WithCancel(ctx)
  108. defer chcancel()
  109. channel, err := router.OpenChannel(chctx, chDesc)
  110. require.NoError(t, err)
  111. require.Contains(t, router.NodeInfo().Channels, byte(chDesc.ID))
  112. require.NotNil(t, channel)
  113. // Opening the same channel again should fail.
  114. _, err = router.OpenChannel(ctx, chDesc)
  115. require.Error(t, err)
  116. // Opening a different channel should work.
  117. chDesc2 := &p2p.ChannelDescriptor{ID: 2, MessageType: &p2ptest.Message{}}
  118. _, err = router.OpenChannel(ctx, chDesc2)
  119. require.NoError(t, err)
  120. require.Contains(t, router.NodeInfo().Channels, byte(chDesc2.ID))
  121. // Closing the channel, then opening it again should be fine.
  122. chcancel()
  123. time.Sleep(200 * time.Millisecond) // yes yes, but Close() is async...
  124. channel, err = router.OpenChannel(ctx, chDesc)
  125. require.NoError(t, err)
  126. // We should be able to send on the channel, even though there are no peers.
  127. p2ptest.RequireSend(ctx, t, channel, p2p.Envelope{
  128. To: types.NodeID(strings.Repeat("a", 40)),
  129. Message: &p2ptest.Message{Value: "foo"},
  130. })
  131. // A message to ourselves should be dropped.
  132. p2ptest.RequireSend(ctx, t, channel, p2p.Envelope{
  133. To: selfID,
  134. Message: &p2ptest.Message{Value: "self"},
  135. })
  136. p2ptest.RequireEmpty(ctx, t, channel)
  137. }
  138. // Channel tests are hairy to mock, so we use an in-memory network instead.
  139. func TestRouter_Channel_SendReceive(t *testing.T) {
  140. ctx, cancel := context.WithCancel(context.Background())
  141. defer cancel()
  142. t.Cleanup(leaktest.Check(t))
  143. // Create a test network and open a channel on all nodes.
  144. network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 3})
  145. ids := network.NodeIDs()
  146. aID, bID, cID := ids[0], ids[1], ids[2]
  147. channels := network.MakeChannels(ctx, t, chDesc)
  148. a, b, c := channels[aID], channels[bID], channels[cID]
  149. otherChannels := network.MakeChannels(ctx, t, p2ptest.MakeChannelDesc(9))
  150. network.Start(ctx, t)
  151. // Sending a message a->b should work, and not send anything
  152. // further to a, b, or c.
  153. p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "foo"}})
  154. p2ptest.RequireReceive(ctx, t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "foo"}})
  155. p2ptest.RequireEmpty(ctx, t, a, b, c)
  156. // Sending a nil message a->b should be dropped.
  157. p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: nil})
  158. p2ptest.RequireEmpty(ctx, t, a, b, c)
  159. // Sending a different message type should be dropped.
  160. p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &gogotypes.BoolValue{Value: true}})
  161. p2ptest.RequireEmpty(ctx, t, a, b, c)
  162. // Sending to an unknown peer should be dropped.
  163. p2ptest.RequireSend(ctx, t, a, p2p.Envelope{
  164. To: types.NodeID(strings.Repeat("a", 40)),
  165. Message: &p2ptest.Message{Value: "a"},
  166. })
  167. p2ptest.RequireEmpty(ctx, t, a, b, c)
  168. // Sending without a recipient should be dropped.
  169. p2ptest.RequireSend(ctx, t, a, p2p.Envelope{Message: &p2ptest.Message{Value: "noto"}})
  170. p2ptest.RequireEmpty(ctx, t, a, b, c)
  171. // Sending to self should be dropped.
  172. p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: aID, Message: &p2ptest.Message{Value: "self"}})
  173. p2ptest.RequireEmpty(ctx, t, a, b, c)
  174. // Removing b and sending to it should be dropped.
  175. network.Remove(ctx, t, bID)
  176. p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "nob"}})
  177. p2ptest.RequireEmpty(ctx, t, a, b, c)
  178. // After all this, sending a message c->a should work.
  179. p2ptest.RequireSend(ctx, t, c, p2p.Envelope{To: aID, Message: &p2ptest.Message{Value: "bar"}})
  180. p2ptest.RequireReceive(ctx, t, a, p2p.Envelope{From: cID, Message: &p2ptest.Message{Value: "bar"}})
  181. p2ptest.RequireEmpty(ctx, t, a, b, c)
  182. // None of these messages should have made it onto the other channels.
  183. for _, other := range otherChannels {
  184. p2ptest.RequireEmpty(ctx, t, other)
  185. }
  186. }
  187. func TestRouter_Channel_Broadcast(t *testing.T) {
  188. t.Cleanup(leaktest.Check(t))
  189. ctx, cancel := context.WithCancel(context.Background())
  190. defer cancel()
  191. // Create a test network and open a channel on all nodes.
  192. network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 4})
  193. ids := network.NodeIDs()
  194. aID, bID, cID, dID := ids[0], ids[1], ids[2], ids[3]
  195. channels := network.MakeChannels(ctx, t, chDesc)
  196. a, b, c, d := channels[aID], channels[bID], channels[cID], channels[dID]
  197. network.Start(ctx, t)
  198. // Sending a broadcast from b should work.
  199. p2ptest.RequireSend(ctx, t, b, p2p.Envelope{Broadcast: true, Message: &p2ptest.Message{Value: "foo"}})
  200. p2ptest.RequireReceive(ctx, t, a, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}})
  201. p2ptest.RequireReceive(ctx, t, c, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}})
  202. p2ptest.RequireReceive(ctx, t, d, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}})
  203. p2ptest.RequireEmpty(ctx, t, a, b, c, d)
  204. // Removing one node from the network shouldn't prevent broadcasts from working.
  205. network.Remove(ctx, t, dID)
  206. p2ptest.RequireSend(ctx, t, a, p2p.Envelope{Broadcast: true, Message: &p2ptest.Message{Value: "bar"}})
  207. p2ptest.RequireReceive(ctx, t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "bar"}})
  208. p2ptest.RequireReceive(ctx, t, c, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "bar"}})
  209. p2ptest.RequireEmpty(ctx, t, a, b, c, d)
  210. }
  211. func TestRouter_Channel_Wrapper(t *testing.T) {
  212. t.Cleanup(leaktest.Check(t))
  213. ctx, cancel := context.WithCancel(context.Background())
  214. defer cancel()
  215. // Create a test network and open a channel on all nodes.
  216. network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 2})
  217. ids := network.NodeIDs()
  218. aID, bID := ids[0], ids[1]
  219. chDesc := &p2p.ChannelDescriptor{
  220. ID: chID,
  221. MessageType: &wrapperMessage{},
  222. Priority: 5,
  223. SendQueueCapacity: 10,
  224. RecvMessageCapacity: 10,
  225. }
  226. channels := network.MakeChannels(ctx, t, chDesc)
  227. a, b := channels[aID], channels[bID]
  228. network.Start(ctx, t)
  229. // Since wrapperMessage implements p2p.Wrapper and handles Message, it
  230. // should automatically wrap and unwrap sent messages -- we prepend the
  231. // wrapper actions to the message value to signal this.
  232. p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "foo"}})
  233. p2ptest.RequireReceive(ctx, t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "unwrap:wrap:foo"}})
  234. // If we send a different message that can't be wrapped, it should be dropped.
  235. p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &gogotypes.BoolValue{Value: true}})
  236. p2ptest.RequireEmpty(ctx, t, b)
  237. // If we send the wrapper message itself, it should also be passed through
  238. // since WrapperMessage supports it, and should only be unwrapped at the receiver.
  239. p2ptest.RequireSend(ctx, t, a, p2p.Envelope{
  240. To: bID,
  241. Message: &wrapperMessage{Message: p2ptest.Message{Value: "foo"}},
  242. })
  243. p2ptest.RequireReceive(ctx, t, b, p2p.Envelope{
  244. From: aID,
  245. Message: &p2ptest.Message{Value: "unwrap:foo"},
  246. })
  247. }
  248. // WrapperMessage prepends the value with "wrap:" and "unwrap:" to test it.
  249. type wrapperMessage struct {
  250. p2ptest.Message
  251. }
  252. var _ p2p.Wrapper = (*wrapperMessage)(nil)
  253. func (w *wrapperMessage) Wrap(inner proto.Message) error {
  254. switch inner := inner.(type) {
  255. case *p2ptest.Message:
  256. w.Message.Value = fmt.Sprintf("wrap:%v", inner.Value)
  257. case *wrapperMessage:
  258. *w = *inner
  259. default:
  260. return fmt.Errorf("invalid message type %T", inner)
  261. }
  262. return nil
  263. }
  264. func (w *wrapperMessage) Unwrap() (proto.Message, error) {
  265. return &p2ptest.Message{Value: fmt.Sprintf("unwrap:%v", w.Message.Value)}, nil
  266. }
  267. func TestRouter_Channel_Error(t *testing.T) {
  268. t.Cleanup(leaktest.Check(t))
  269. ctx, cancel := context.WithCancel(context.Background())
  270. defer cancel()
  271. // Create a test network and open a channel on all nodes.
  272. network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 3})
  273. network.Start(ctx, t)
  274. ids := network.NodeIDs()
  275. aID, bID := ids[0], ids[1]
  276. channels := network.MakeChannels(ctx, t, chDesc)
  277. a := channels[aID]
  278. // Erroring b should cause it to be disconnected. It will reconnect shortly after.
  279. sub := network.Nodes[aID].MakePeerUpdates(ctx, t)
  280. p2ptest.RequireError(ctx, t, a, p2p.PeerError{NodeID: bID, Err: errors.New("boom")})
  281. p2ptest.RequireUpdates(t, sub, []p2p.PeerUpdate{
  282. {NodeID: bID, Status: p2p.PeerStatusDown},
  283. {NodeID: bID, Status: p2p.PeerStatusUp},
  284. })
  285. }
  286. func TestRouter_AcceptPeers(t *testing.T) {
  287. testcases := map[string]struct {
  288. peerInfo types.NodeInfo
  289. peerKey crypto.PubKey
  290. ok bool
  291. }{
  292. "valid handshake": {peerInfo, peerKey.PubKey(), true},
  293. "empty handshake": {types.NodeInfo{}, nil, false},
  294. "invalid key": {peerInfo, selfKey.PubKey(), false},
  295. "self handshake": {selfInfo, selfKey.PubKey(), false},
  296. "incompatible peer": {
  297. types.NodeInfo{
  298. NodeID: peerID,
  299. ListenAddr: "0.0.0.0:0",
  300. Network: "other-network",
  301. Moniker: string(peerID),
  302. },
  303. peerKey.PubKey(),
  304. false,
  305. },
  306. }
  307. bctx, bcancel := context.WithCancel(context.Background())
  308. defer bcancel()
  309. for name, tc := range testcases {
  310. tc := tc
  311. t.Run(name, func(t *testing.T) {
  312. ctx, cancel := context.WithCancel(bctx)
  313. defer cancel()
  314. t.Cleanup(leaktest.Check(t))
  315. // Set up a mock transport that handshakes.
  316. connCtx, connCancel := context.WithCancel(context.Background())
  317. mockConnection := &mocks.Connection{}
  318. mockConnection.On("String").Maybe().Return("mock")
  319. mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
  320. Return(tc.peerInfo, tc.peerKey, nil)
  321. mockConnection.On("Close").Run(func(_ mock.Arguments) { connCancel() }).Return(nil).Maybe()
  322. mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
  323. if tc.ok {
  324. mockConnection.On("ReceiveMessage", mock.Anything).Return(chID, nil, io.EOF).Maybe()
  325. }
  326. mockTransport := &mocks.Transport{}
  327. mockTransport.On("String").Maybe().Return("mock")
  328. mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"})
  329. mockTransport.On("Close").Return(nil).Maybe()
  330. mockTransport.On("Accept", mock.Anything).Once().Return(mockConnection, nil)
  331. mockTransport.On("Accept", mock.Anything).Maybe().Return(nil, io.EOF)
  332. // Set up and start the router.
  333. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  334. require.NoError(t, err)
  335. sub := peerManager.Subscribe(ctx)
  336. router, err := p2p.NewRouter(
  337. ctx,
  338. log.TestingLogger(),
  339. p2p.NopMetrics(),
  340. selfInfo,
  341. selfKey,
  342. peerManager,
  343. []p2p.Transport{mockTransport},
  344. nil,
  345. p2p.RouterOptions{},
  346. )
  347. require.NoError(t, err)
  348. require.NoError(t, router.Start(ctx))
  349. if tc.ok {
  350. p2ptest.RequireUpdate(t, sub, p2p.PeerUpdate{
  351. NodeID: tc.peerInfo.NodeID,
  352. Status: p2p.PeerStatusUp,
  353. })
  354. // force a context switch so that the
  355. // connection is handled.
  356. time.Sleep(time.Millisecond)
  357. } else {
  358. select {
  359. case <-connCtx.Done():
  360. case <-time.After(100 * time.Millisecond):
  361. require.Fail(t, "connection not closed")
  362. }
  363. }
  364. require.NoError(t, router.Stop())
  365. mockTransport.AssertExpectations(t)
  366. mockConnection.AssertExpectations(t)
  367. })
  368. }
  369. }
  370. func TestRouter_AcceptPeers_Error(t *testing.T) {
  371. t.Cleanup(leaktest.Check(t))
  372. ctx, cancel := context.WithCancel(context.Background())
  373. defer cancel()
  374. // Set up a mock transport that returns an error, which should prevent
  375. // the router from calling Accept again.
  376. mockTransport := &mocks.Transport{}
  377. mockTransport.On("String").Maybe().Return("mock")
  378. mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"})
  379. mockTransport.On("Accept", mock.Anything).Once().Return(nil, errors.New("boom"))
  380. mockTransport.On("Close").Return(nil)
  381. // Set up and start the router.
  382. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  383. require.NoError(t, err)
  384. router, err := p2p.NewRouter(
  385. ctx,
  386. log.TestingLogger(),
  387. p2p.NopMetrics(),
  388. selfInfo,
  389. selfKey,
  390. peerManager,
  391. []p2p.Transport{mockTransport},
  392. nil,
  393. p2p.RouterOptions{},
  394. )
  395. require.NoError(t, err)
  396. require.NoError(t, router.Start(ctx))
  397. time.Sleep(time.Second)
  398. require.NoError(t, router.Stop())
  399. mockTransport.AssertExpectations(t)
  400. }
  401. func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) {
  402. t.Cleanup(leaktest.Check(t))
  403. ctx, cancel := context.WithCancel(context.Background())
  404. defer cancel()
  405. // Set up a mock transport that returns io.EOF once, which should prevent
  406. // the router from calling Accept again.
  407. mockTransport := &mocks.Transport{}
  408. mockTransport.On("String").Maybe().Return("mock")
  409. mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"})
  410. mockTransport.On("Accept", mock.Anything).Once().Return(nil, io.EOF)
  411. mockTransport.On("Close").Return(nil)
  412. // Set up and start the router.
  413. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  414. require.NoError(t, err)
  415. router, err := p2p.NewRouter(
  416. ctx,
  417. log.TestingLogger(),
  418. p2p.NopMetrics(),
  419. selfInfo,
  420. selfKey,
  421. peerManager,
  422. []p2p.Transport{mockTransport},
  423. nil,
  424. p2p.RouterOptions{},
  425. )
  426. require.NoError(t, err)
  427. require.NoError(t, router.Start(ctx))
  428. time.Sleep(time.Second)
  429. require.NoError(t, router.Stop())
  430. mockTransport.AssertExpectations(t)
  431. }
  432. func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
  433. t.Cleanup(leaktest.Check(t))
  434. ctx, cancel := context.WithCancel(context.Background())
  435. defer cancel()
  436. // Set up a mock transport that returns a connection that blocks during the
  437. // handshake. It should be able to accept several of these in parallel, i.e.
  438. // a single connection can't halt other connections being accepted.
  439. acceptCh := make(chan bool, 3)
  440. closeCh := make(chan time.Time)
  441. mockConnection := &mocks.Connection{}
  442. mockConnection.On("String").Maybe().Return("mock")
  443. mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
  444. WaitUntil(closeCh).Return(types.NodeInfo{}, nil, io.EOF)
  445. mockConnection.On("Close").Return(nil)
  446. mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
  447. mockTransport := &mocks.Transport{}
  448. mockTransport.On("String").Maybe().Return("mock")
  449. mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"})
  450. mockTransport.On("Close").Return(nil)
  451. mockTransport.On("Accept", mock.Anything).Times(3).Run(func(_ mock.Arguments) {
  452. acceptCh <- true
  453. }).Return(mockConnection, nil)
  454. mockTransport.On("Accept", mock.Anything).Once().Return(nil, io.EOF)
  455. // Set up and start the router.
  456. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  457. require.NoError(t, err)
  458. router, err := p2p.NewRouter(
  459. ctx,
  460. log.TestingLogger(),
  461. p2p.NopMetrics(),
  462. selfInfo,
  463. selfKey,
  464. peerManager,
  465. []p2p.Transport{mockTransport},
  466. nil,
  467. p2p.RouterOptions{},
  468. )
  469. require.NoError(t, err)
  470. require.NoError(t, router.Start(ctx))
  471. require.Eventually(t, func() bool {
  472. return len(acceptCh) == 3
  473. }, time.Second, 10*time.Millisecond, "num", len(acceptCh))
  474. close(closeCh)
  475. time.Sleep(100 * time.Millisecond)
  476. require.NoError(t, router.Stop())
  477. mockTransport.AssertExpectations(t)
  478. mockConnection.AssertExpectations(t)
  479. }
  480. func TestRouter_DialPeers(t *testing.T) {
  481. testcases := map[string]struct {
  482. dialID types.NodeID
  483. peerInfo types.NodeInfo
  484. peerKey crypto.PubKey
  485. dialErr error
  486. ok bool
  487. }{
  488. "valid dial": {peerInfo.NodeID, peerInfo, peerKey.PubKey(), nil, true},
  489. "empty handshake": {peerInfo.NodeID, types.NodeInfo{}, nil, nil, false},
  490. "invalid key": {peerInfo.NodeID, peerInfo, selfKey.PubKey(), nil, false},
  491. "unexpected node ID": {peerInfo.NodeID, selfInfo, selfKey.PubKey(), nil, false},
  492. "dial error": {peerInfo.NodeID, peerInfo, peerKey.PubKey(), errors.New("boom"), false},
  493. "incompatible peer": {
  494. peerInfo.NodeID,
  495. types.NodeInfo{
  496. NodeID: peerID,
  497. ListenAddr: "0.0.0.0:0",
  498. Network: "other-network",
  499. Moniker: string(peerID),
  500. },
  501. peerKey.PubKey(),
  502. nil,
  503. false,
  504. },
  505. }
  506. bctx, bcancel := context.WithCancel(context.Background())
  507. defer bcancel()
  508. for name, tc := range testcases {
  509. tc := tc
  510. t.Run(name, func(t *testing.T) {
  511. t.Cleanup(leaktest.Check(t))
  512. ctx, cancel := context.WithCancel(bctx)
  513. defer cancel()
  514. address := p2p.NodeAddress{Protocol: "mock", NodeID: tc.dialID}
  515. endpoint := p2p.Endpoint{Protocol: "mock", Path: string(tc.dialID)}
  516. // Set up a mock transport that handshakes.
  517. connCtx, connCancel := context.WithCancel(context.Background())
  518. defer connCancel()
  519. mockConnection := &mocks.Connection{}
  520. mockConnection.On("String").Maybe().Return("mock")
  521. if tc.dialErr == nil {
  522. mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
  523. Return(tc.peerInfo, tc.peerKey, nil)
  524. mockConnection.On("Close").Run(func(_ mock.Arguments) { connCancel() }).Return(nil).Maybe()
  525. }
  526. if tc.ok {
  527. mockConnection.On("ReceiveMessage", mock.Anything).Return(chID, nil, io.EOF).Maybe()
  528. }
  529. mockTransport := &mocks.Transport{}
  530. mockTransport.On("String").Maybe().Return("mock")
  531. mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"})
  532. mockTransport.On("Close").Return(nil).Maybe()
  533. mockTransport.On("Accept", mock.Anything).Maybe().Return(nil, io.EOF)
  534. if tc.dialErr == nil {
  535. mockTransport.On("Dial", mock.Anything, endpoint).Once().Return(mockConnection, nil)
  536. // This handles the retry when a dialed connection gets closed after ReceiveMessage
  537. // returns io.EOF above.
  538. mockTransport.On("Dial", mock.Anything, endpoint).Maybe().Return(nil, io.EOF)
  539. } else {
  540. mockTransport.On("Dial", mock.Anything, endpoint).Once().
  541. Run(func(_ mock.Arguments) { connCancel() }).
  542. Return(nil, tc.dialErr)
  543. }
  544. // Set up and start the router.
  545. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  546. require.NoError(t, err)
  547. added, err := peerManager.Add(address)
  548. require.NoError(t, err)
  549. require.True(t, added)
  550. sub := peerManager.Subscribe(ctx)
  551. router, err := p2p.NewRouter(
  552. ctx,
  553. log.TestingLogger(),
  554. p2p.NopMetrics(),
  555. selfInfo,
  556. selfKey,
  557. peerManager,
  558. []p2p.Transport{mockTransport},
  559. nil,
  560. p2p.RouterOptions{},
  561. )
  562. require.NoError(t, err)
  563. require.NoError(t, router.Start(ctx))
  564. if tc.ok {
  565. p2ptest.RequireUpdate(t, sub, p2p.PeerUpdate{
  566. NodeID: tc.peerInfo.NodeID,
  567. Status: p2p.PeerStatusUp,
  568. })
  569. // force a context switch so that the
  570. // connection is handled.
  571. time.Sleep(time.Millisecond)
  572. } else {
  573. select {
  574. case <-connCtx.Done():
  575. case <-time.After(100 * time.Millisecond):
  576. require.Fail(t, "connection not closed")
  577. }
  578. }
  579. require.NoError(t, router.Stop())
  580. mockTransport.AssertExpectations(t)
  581. mockConnection.AssertExpectations(t)
  582. })
  583. }
  584. }
  585. func TestRouter_DialPeers_Parallel(t *testing.T) {
  586. t.Cleanup(leaktest.Check(t))
  587. ctx, cancel := context.WithCancel(context.Background())
  588. defer cancel()
  589. a := p2p.NodeAddress{Protocol: "mock", NodeID: types.NodeID(strings.Repeat("a", 40))}
  590. b := p2p.NodeAddress{Protocol: "mock", NodeID: types.NodeID(strings.Repeat("b", 40))}
  591. c := p2p.NodeAddress{Protocol: "mock", NodeID: types.NodeID(strings.Repeat("c", 40))}
  592. // Set up a mock transport that returns a connection that blocks during the
  593. // handshake. It should dial all peers in parallel.
  594. dialCh := make(chan bool, 3)
  595. closeCh := make(chan time.Time)
  596. mockConnection := &mocks.Connection{}
  597. mockConnection.On("String").Maybe().Return("mock")
  598. mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
  599. WaitUntil(closeCh).Return(types.NodeInfo{}, nil, io.EOF)
  600. mockConnection.On("Close").Return(nil)
  601. mockTransport := &mocks.Transport{}
  602. mockTransport.On("String").Maybe().Return("mock")
  603. mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"})
  604. mockTransport.On("Close").Return(nil)
  605. mockTransport.On("Accept", mock.Anything).Once().Return(nil, io.EOF)
  606. for _, address := range []p2p.NodeAddress{a, b, c} {
  607. endpoint := p2p.Endpoint{Protocol: address.Protocol, Path: string(address.NodeID)}
  608. mockTransport.On("Dial", mock.Anything, endpoint).Run(func(_ mock.Arguments) {
  609. dialCh <- true
  610. }).Return(mockConnection, nil)
  611. }
  612. // Set up and start the router.
  613. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  614. require.NoError(t, err)
  615. added, err := peerManager.Add(a)
  616. require.NoError(t, err)
  617. require.True(t, added)
  618. added, err = peerManager.Add(b)
  619. require.NoError(t, err)
  620. require.True(t, added)
  621. added, err = peerManager.Add(c)
  622. require.NoError(t, err)
  623. require.True(t, added)
  624. router, err := p2p.NewRouter(
  625. ctx,
  626. log.TestingLogger(),
  627. p2p.NopMetrics(),
  628. selfInfo,
  629. selfKey,
  630. peerManager,
  631. []p2p.Transport{mockTransport},
  632. nil,
  633. p2p.RouterOptions{
  634. DialSleep: func(_ context.Context) {},
  635. NumConcurrentDials: func() int {
  636. ncpu := runtime.NumCPU()
  637. if ncpu <= 3 {
  638. return 3
  639. }
  640. return ncpu
  641. },
  642. },
  643. )
  644. require.NoError(t, err)
  645. require.NoError(t, router.Start(ctx))
  646. require.Eventually(t,
  647. func() bool {
  648. return len(dialCh) == 3
  649. },
  650. 5*time.Second,
  651. 100*time.Millisecond,
  652. "reached %d rather than 3", len(dialCh))
  653. close(closeCh)
  654. time.Sleep(500 * time.Millisecond)
  655. require.NoError(t, router.Stop())
  656. mockTransport.AssertExpectations(t)
  657. mockConnection.AssertExpectations(t)
  658. }
  659. func TestRouter_EvictPeers(t *testing.T) {
  660. t.Cleanup(leaktest.Check(t))
  661. ctx, cancel := context.WithCancel(context.Background())
  662. defer cancel()
  663. // Set up a mock transport that we can evict.
  664. closeCh := make(chan time.Time)
  665. closeOnce := sync.Once{}
  666. mockConnection := &mocks.Connection{}
  667. mockConnection.On("String").Maybe().Return("mock")
  668. mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
  669. Return(peerInfo, peerKey.PubKey(), nil)
  670. mockConnection.On("ReceiveMessage", mock.Anything).WaitUntil(closeCh).Return(chID, nil, io.EOF)
  671. mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
  672. mockConnection.On("Close").Run(func(_ mock.Arguments) {
  673. closeOnce.Do(func() {
  674. close(closeCh)
  675. })
  676. }).Return(nil)
  677. mockTransport := &mocks.Transport{}
  678. mockTransport.On("String").Maybe().Return("mock")
  679. mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"})
  680. mockTransport.On("Close").Return(nil)
  681. mockTransport.On("Accept", mock.Anything).Once().Return(mockConnection, nil)
  682. mockTransport.On("Accept", mock.Anything).Maybe().Return(nil, io.EOF)
  683. // Set up and start the router.
  684. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  685. require.NoError(t, err)
  686. sub := peerManager.Subscribe(ctx)
  687. router, err := p2p.NewRouter(
  688. ctx,
  689. log.TestingLogger(),
  690. p2p.NopMetrics(),
  691. selfInfo,
  692. selfKey,
  693. peerManager,
  694. []p2p.Transport{mockTransport},
  695. nil,
  696. p2p.RouterOptions{},
  697. )
  698. require.NoError(t, err)
  699. require.NoError(t, router.Start(ctx))
  700. // Wait for the mock peer to connect, then evict it by reporting an error.
  701. p2ptest.RequireUpdate(t, sub, p2p.PeerUpdate{
  702. NodeID: peerInfo.NodeID,
  703. Status: p2p.PeerStatusUp,
  704. })
  705. peerManager.Errored(peerInfo.NodeID, errors.New("boom"))
  706. p2ptest.RequireUpdate(t, sub, p2p.PeerUpdate{
  707. NodeID: peerInfo.NodeID,
  708. Status: p2p.PeerStatusDown,
  709. })
  710. require.NoError(t, router.Stop())
  711. mockTransport.AssertExpectations(t)
  712. mockConnection.AssertExpectations(t)
  713. }
  714. func TestRouter_ChannelCompatability(t *testing.T) {
  715. t.Cleanup(leaktest.Check(t))
  716. ctx, cancel := context.WithCancel(context.Background())
  717. defer cancel()
  718. incompatiblePeer := types.NodeInfo{
  719. NodeID: peerID,
  720. ListenAddr: "0.0.0.0:0",
  721. Network: "test",
  722. Moniker: string(peerID),
  723. Channels: []byte{0x03},
  724. }
  725. mockConnection := &mocks.Connection{}
  726. mockConnection.On("String").Maybe().Return("mock")
  727. mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
  728. Return(incompatiblePeer, peerKey.PubKey(), nil)
  729. mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
  730. mockConnection.On("Close").Return(nil)
  731. mockTransport := &mocks.Transport{}
  732. mockTransport.On("String").Maybe().Return("mock")
  733. mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"})
  734. mockTransport.On("Close").Return(nil)
  735. mockTransport.On("Accept", mock.Anything).Once().Return(mockConnection, nil)
  736. mockTransport.On("Accept", mock.Anything).Once().Return(nil, io.EOF)
  737. // Set up and start the router.
  738. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  739. require.NoError(t, err)
  740. router, err := p2p.NewRouter(
  741. ctx,
  742. log.TestingLogger(),
  743. p2p.NopMetrics(),
  744. selfInfo,
  745. selfKey,
  746. peerManager,
  747. []p2p.Transport{mockTransport},
  748. nil,
  749. p2p.RouterOptions{},
  750. )
  751. require.NoError(t, err)
  752. require.NoError(t, router.Start(ctx))
  753. time.Sleep(1 * time.Second)
  754. require.NoError(t, router.Stop())
  755. require.Empty(t, peerManager.Peers())
  756. mockConnection.AssertExpectations(t)
  757. mockTransport.AssertExpectations(t)
  758. }
  759. func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
  760. t.Cleanup(leaktest.Check(t))
  761. ctx, cancel := context.WithCancel(context.Background())
  762. defer cancel()
  763. peer := types.NodeInfo{
  764. NodeID: peerID,
  765. ListenAddr: "0.0.0.0:0",
  766. Network: "test",
  767. Moniker: string(peerID),
  768. Channels: []byte{0x02},
  769. }
  770. mockConnection := &mocks.Connection{}
  771. mockConnection.On("String").Maybe().Return("mock")
  772. mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
  773. Return(peer, peerKey.PubKey(), nil)
  774. mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
  775. mockConnection.On("Close").Return(nil)
  776. mockConnection.On("ReceiveMessage", mock.Anything).Return(chID, nil, io.EOF)
  777. mockTransport := &mocks.Transport{}
  778. mockTransport.On("AddChannelDescriptors", mock.Anything).Return()
  779. mockTransport.On("String").Maybe().Return("mock")
  780. mockTransport.On("Protocols").Return([]p2p.Protocol{"mock"})
  781. mockTransport.On("Close").Return(nil)
  782. mockTransport.On("Accept", mock.Anything).Once().Return(mockConnection, nil)
  783. mockTransport.On("Accept", mock.Anything).Maybe().Return(nil, io.EOF)
  784. // Set up and start the router.
  785. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
  786. require.NoError(t, err)
  787. sub := peerManager.Subscribe(ctx)
  788. router, err := p2p.NewRouter(
  789. ctx,
  790. log.TestingLogger(),
  791. p2p.NopMetrics(),
  792. selfInfo,
  793. selfKey,
  794. peerManager,
  795. []p2p.Transport{mockTransport},
  796. nil,
  797. p2p.RouterOptions{},
  798. )
  799. require.NoError(t, err)
  800. require.NoError(t, router.Start(ctx))
  801. p2ptest.RequireUpdate(t, sub, p2p.PeerUpdate{
  802. NodeID: peerInfo.NodeID,
  803. Status: p2p.PeerStatusUp,
  804. })
  805. channel, err := router.OpenChannel(ctx, chDesc)
  806. require.NoError(t, err)
  807. require.NoError(t, channel.Send(ctx, p2p.Envelope{
  808. To: peer.NodeID,
  809. Message: &p2ptest.Message{Value: "Hi"},
  810. }))
  811. require.NoError(t, router.Stop())
  812. mockTransport.AssertExpectations(t)
  813. }