You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

355 lines
10 KiB

  1. package p2ptest
  2. import (
  3. "context"
  4. "math/rand"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/require"
  8. dbm "github.com/tendermint/tm-db"
  9. "github.com/tendermint/tendermint/crypto"
  10. "github.com/tendermint/tendermint/crypto/ed25519"
  11. "github.com/tendermint/tendermint/internal/p2p"
  12. "github.com/tendermint/tendermint/libs/log"
  13. "github.com/tendermint/tendermint/types"
  14. )
  15. // Network sets up an in-memory network that can be used for high-level P2P
  16. // testing. It creates an arbitrary number of nodes that are connected to each
  17. // other, and can open channels across all nodes with custom reactors.
  18. type Network struct {
  19. Nodes map[types.NodeID]*Node
  20. logger log.Logger
  21. memoryNetwork *p2p.MemoryNetwork
  22. cancel context.CancelFunc
  23. }
  24. // NetworkOptions is an argument structure to parameterize the
  25. // MakeNetwork function.
  26. type NetworkOptions struct {
  27. NumNodes int
  28. BufferSize int
  29. NodeOpts NodeOptions
  30. }
  31. type NodeOptions struct {
  32. MaxPeers uint16
  33. MaxConnected uint16
  34. }
  35. func (opts *NetworkOptions) setDefaults() {
  36. if opts.BufferSize == 0 {
  37. opts.BufferSize = 1
  38. }
  39. }
  40. // MakeNetwork creates a test network with the given number of nodes and
  41. // connects them to each other.
  42. func MakeNetwork(ctx context.Context, t *testing.T, opts NetworkOptions) *Network {
  43. opts.setDefaults()
  44. logger := log.TestingLogger()
  45. network := &Network{
  46. Nodes: map[types.NodeID]*Node{},
  47. logger: logger,
  48. memoryNetwork: p2p.NewMemoryNetwork(logger, opts.BufferSize),
  49. }
  50. for i := 0; i < opts.NumNodes; i++ {
  51. node := network.MakeNode(ctx, t, opts.NodeOpts)
  52. network.Nodes[node.NodeID] = node
  53. }
  54. return network
  55. }
  56. // Start starts the network by setting up a list of node addresses to dial in
  57. // addition to creating a peer update subscription for each node. Finally, all
  58. // nodes are connected to each other.
  59. func (n *Network) Start(ctx context.Context, t *testing.T) {
  60. ctx, n.cancel = context.WithCancel(ctx)
  61. t.Cleanup(n.cancel)
  62. // Set up a list of node addresses to dial, and a peer update subscription
  63. // for each node.
  64. dialQueue := []p2p.NodeAddress{}
  65. subs := map[types.NodeID]*p2p.PeerUpdates{}
  66. subctx, subcancel := context.WithCancel(ctx)
  67. defer subcancel()
  68. for _, node := range n.Nodes {
  69. dialQueue = append(dialQueue, node.NodeAddress)
  70. subs[node.NodeID] = node.PeerManager.Subscribe(subctx)
  71. }
  72. // For each node, dial the nodes that it still doesn't have a connection to
  73. // (either inbound or outbound), and wait for both sides to confirm the
  74. // connection via the subscriptions.
  75. for i, sourceAddress := range dialQueue {
  76. sourceNode := n.Nodes[sourceAddress.NodeID]
  77. sourceSub := subs[sourceAddress.NodeID]
  78. for _, targetAddress := range dialQueue[i+1:] { // nodes <i already connected
  79. targetNode := n.Nodes[targetAddress.NodeID]
  80. targetSub := subs[targetAddress.NodeID]
  81. added, err := sourceNode.PeerManager.Add(targetAddress)
  82. require.NoError(t, err)
  83. require.True(t, added)
  84. select {
  85. case <-ctx.Done():
  86. require.Fail(t, "operation canceled")
  87. case peerUpdate := <-sourceSub.Updates():
  88. require.Equal(t, targetNode.NodeID, peerUpdate.NodeID)
  89. require.Equal(t, p2p.PeerStatusUp, peerUpdate.Status)
  90. case <-time.After(3 * time.Second):
  91. require.Fail(t, "timed out waiting for peer", "%v dialing %v",
  92. sourceNode.NodeID, targetNode.NodeID)
  93. }
  94. select {
  95. case <-ctx.Done():
  96. require.Fail(t, "operation canceled")
  97. case peerUpdate := <-targetSub.Updates():
  98. peerUpdate.Channels = nil
  99. require.Equal(t, p2p.PeerUpdate{
  100. NodeID: sourceNode.NodeID,
  101. Status: p2p.PeerStatusUp,
  102. }, peerUpdate)
  103. case <-time.After(3 * time.Second):
  104. require.Fail(t, "timed out waiting for peer", "%v accepting %v",
  105. targetNode.NodeID, sourceNode.NodeID)
  106. }
  107. // Add the address to the target as well, so it's able to dial the
  108. // source back if that's even necessary.
  109. added, err = targetNode.PeerManager.Add(sourceAddress)
  110. require.NoError(t, err)
  111. require.True(t, added)
  112. }
  113. }
  114. }
  115. // NodeIDs returns the network's node IDs.
  116. func (n *Network) NodeIDs() []types.NodeID {
  117. ids := []types.NodeID{}
  118. for id := range n.Nodes {
  119. ids = append(ids, id)
  120. }
  121. return ids
  122. }
  123. // MakeChannels makes a channel on all nodes and returns them, automatically
  124. // doing error checks and cleanups.
  125. func (n *Network) MakeChannels(
  126. ctx context.Context,
  127. t *testing.T,
  128. chDesc *p2p.ChannelDescriptor,
  129. ) map[types.NodeID]*p2p.Channel {
  130. channels := map[types.NodeID]*p2p.Channel{}
  131. for _, node := range n.Nodes {
  132. channels[node.NodeID] = node.MakeChannel(ctx, t, chDesc)
  133. }
  134. return channels
  135. }
  136. // MakeChannelsNoCleanup makes a channel on all nodes and returns them,
  137. // automatically doing error checks. The caller must ensure proper cleanup of
  138. // all the channels.
  139. func (n *Network) MakeChannelsNoCleanup(
  140. ctx context.Context,
  141. t *testing.T,
  142. chDesc *p2p.ChannelDescriptor,
  143. ) map[types.NodeID]*p2p.Channel {
  144. channels := map[types.NodeID]*p2p.Channel{}
  145. for _, node := range n.Nodes {
  146. channels[node.NodeID] = node.MakeChannelNoCleanup(ctx, t, chDesc)
  147. }
  148. return channels
  149. }
  150. // RandomNode returns a random node.
  151. func (n *Network) RandomNode() *Node {
  152. nodes := make([]*Node, 0, len(n.Nodes))
  153. for _, node := range n.Nodes {
  154. nodes = append(nodes, node)
  155. }
  156. return nodes[rand.Intn(len(nodes))] // nolint:gosec
  157. }
  158. // Peers returns a node's peers (i.e. everyone except itself).
  159. func (n *Network) Peers(id types.NodeID) []*Node {
  160. peers := make([]*Node, 0, len(n.Nodes)-1)
  161. for _, peer := range n.Nodes {
  162. if peer.NodeID != id {
  163. peers = append(peers, peer)
  164. }
  165. }
  166. return peers
  167. }
  168. // Remove removes a node from the network, stopping it and waiting for all other
  169. // nodes to pick up the disconnection.
  170. func (n *Network) Remove(ctx context.Context, t *testing.T, id types.NodeID) {
  171. require.Contains(t, n.Nodes, id)
  172. node := n.Nodes[id]
  173. delete(n.Nodes, id)
  174. subs := []*p2p.PeerUpdates{}
  175. subctx, subcancel := context.WithCancel(ctx)
  176. defer subcancel()
  177. for _, peer := range n.Nodes {
  178. sub := peer.PeerManager.Subscribe(subctx)
  179. subs = append(subs, sub)
  180. }
  181. require.NoError(t, node.Transport.Close())
  182. node.cancel()
  183. if node.Router.IsRunning() {
  184. node.Router.Stop()
  185. node.Router.Wait()
  186. }
  187. for _, sub := range subs {
  188. RequireUpdate(t, sub, p2p.PeerUpdate{
  189. NodeID: node.NodeID,
  190. Status: p2p.PeerStatusDown,
  191. })
  192. }
  193. }
  194. // Node is a node in a Network, with a Router and a PeerManager.
  195. type Node struct {
  196. NodeID types.NodeID
  197. NodeInfo types.NodeInfo
  198. NodeAddress p2p.NodeAddress
  199. PrivKey crypto.PrivKey
  200. Router *p2p.Router
  201. PeerManager *p2p.PeerManager
  202. Transport *p2p.MemoryTransport
  203. cancel context.CancelFunc
  204. }
  205. // MakeNode creates a new Node configured for the network with a
  206. // running peer manager, but does not add it to the existing
  207. // network. Callers are responsible for updating peering relationships.
  208. func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions) *Node {
  209. ctx, cancel := context.WithCancel(ctx)
  210. privKey := ed25519.GenPrivKey()
  211. nodeID := types.NodeIDFromPubKey(privKey.PubKey())
  212. nodeInfo := types.NodeInfo{
  213. NodeID: nodeID,
  214. ListenAddr: "0.0.0.0:0", // FIXME: We have to fake this for now.
  215. Moniker: string(nodeID),
  216. }
  217. transport := n.memoryNetwork.CreateTransport(nodeID)
  218. require.Len(t, transport.Endpoints(), 1, "transport not listening on 1 endpoint")
  219. peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{
  220. MinRetryTime: 10 * time.Millisecond,
  221. MaxRetryTime: 100 * time.Millisecond,
  222. RetryTimeJitter: time.Millisecond,
  223. MaxPeers: opts.MaxPeers,
  224. MaxConnected: opts.MaxConnected,
  225. })
  226. require.NoError(t, err)
  227. router, err := p2p.NewRouter(
  228. ctx,
  229. n.logger,
  230. p2p.NopMetrics(),
  231. nodeInfo,
  232. privKey,
  233. peerManager,
  234. []p2p.Transport{transport},
  235. transport.Endpoints(),
  236. p2p.RouterOptions{DialSleep: func(_ context.Context) {}},
  237. )
  238. require.NoError(t, err)
  239. require.NoError(t, router.Start(ctx))
  240. t.Cleanup(func() {
  241. if router.IsRunning() {
  242. router.Stop()
  243. router.Wait()
  244. }
  245. require.NoError(t, transport.Close())
  246. cancel()
  247. })
  248. return &Node{
  249. NodeID: nodeID,
  250. NodeInfo: nodeInfo,
  251. NodeAddress: transport.Endpoints()[0].NodeAddress(nodeID),
  252. PrivKey: privKey,
  253. Router: router,
  254. PeerManager: peerManager,
  255. Transport: transport,
  256. cancel: cancel,
  257. }
  258. }
  259. // MakeChannel opens a channel, with automatic error handling and cleanup. On
  260. // test cleanup, it also checks that the channel is empty, to make sure
  261. // all expected messages have been asserted.
  262. func (n *Node) MakeChannel(
  263. ctx context.Context,
  264. t *testing.T,
  265. chDesc *p2p.ChannelDescriptor,
  266. ) *p2p.Channel {
  267. ctx, cancel := context.WithCancel(ctx)
  268. channel, err := n.Router.OpenChannel(ctx, chDesc)
  269. require.NoError(t, err)
  270. require.Contains(t, n.Router.NodeInfo().Channels, byte(chDesc.ID))
  271. t.Cleanup(func() {
  272. RequireEmpty(ctx, t, channel)
  273. cancel()
  274. })
  275. return channel
  276. }
  277. // MakeChannelNoCleanup opens a channel, with automatic error handling. The
  278. // caller must ensure proper cleanup of the channel.
  279. func (n *Node) MakeChannelNoCleanup(
  280. ctx context.Context,
  281. t *testing.T,
  282. chDesc *p2p.ChannelDescriptor,
  283. ) *p2p.Channel {
  284. channel, err := n.Router.OpenChannel(ctx, chDesc)
  285. require.NoError(t, err)
  286. return channel
  287. }
  288. // MakePeerUpdates opens a peer update subscription, with automatic cleanup.
  289. // It checks that all updates have been consumed during cleanup.
  290. func (n *Node) MakePeerUpdates(ctx context.Context, t *testing.T) *p2p.PeerUpdates {
  291. t.Helper()
  292. sub := n.PeerManager.Subscribe(ctx)
  293. t.Cleanup(func() {
  294. RequireNoUpdates(ctx, t, sub)
  295. })
  296. return sub
  297. }
  298. // MakePeerUpdatesNoRequireEmpty opens a peer update subscription, with automatic cleanup.
  299. // It does *not* check that all updates have been consumed, but will
  300. // close the update channel.
  301. func (n *Node) MakePeerUpdatesNoRequireEmpty(ctx context.Context, t *testing.T) *p2p.PeerUpdates {
  302. return n.PeerManager.Subscribe(ctx)
  303. }
  304. func MakeChannelDesc(chID p2p.ChannelID) *p2p.ChannelDescriptor {
  305. return &p2p.ChannelDescriptor{
  306. ID: chID,
  307. MessageType: &Message{},
  308. Priority: 5,
  309. SendQueueCapacity: 10,
  310. RecvMessageCapacity: 10,
  311. }
  312. }