You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

207 lines
4.8 KiB

  1. package p2p_test
  2. import (
  3. "sync"
  4. "testing"
  5. "github.com/gogo/protobuf/proto"
  6. "github.com/stretchr/testify/mock"
  7. "github.com/stretchr/testify/require"
  8. "github.com/tendermint/tendermint/config"
  9. "github.com/tendermint/tendermint/internal/p2p"
  10. p2pmocks "github.com/tendermint/tendermint/internal/p2p/mocks"
  11. "github.com/tendermint/tendermint/libs/log"
  12. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  13. "github.com/tendermint/tendermint/types"
  14. )
  15. var (
  16. channelID1 = byte(0x01)
  17. channelID2 = byte(0x02)
  18. p2pCfg = config.DefaultP2PConfig()
  19. testChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
  20. p2p.ChannelID(channelID1): {
  21. MsgType: new(ssproto.Message),
  22. Descriptor: &p2p.ChannelDescriptor{
  23. ID: channelID1,
  24. Priority: 3,
  25. SendQueueCapacity: 10,
  26. RecvMessageCapacity: int(4e6),
  27. },
  28. },
  29. p2p.ChannelID(channelID2): {
  30. MsgType: new(ssproto.Message),
  31. Descriptor: &p2p.ChannelDescriptor{
  32. ID: channelID2,
  33. Priority: 1,
  34. SendQueueCapacity: 4,
  35. RecvMessageCapacity: int(16e6),
  36. },
  37. },
  38. }
  39. )
  40. type reactorShimTestSuite struct {
  41. shim *p2p.ReactorShim
  42. sw *p2p.Switch
  43. }
  44. func setup(t *testing.T, peers []p2p.Peer) *reactorShimTestSuite {
  45. t.Helper()
  46. rts := &reactorShimTestSuite{
  47. shim: p2p.NewReactorShim(log.TestingLogger(), "TestShim", testChannelShims),
  48. }
  49. rts.sw = p2p.MakeSwitch(p2pCfg, 1, "testing", "123.123.123", func(_ int, sw *p2p.Switch) *p2p.Switch {
  50. for _, peer := range peers {
  51. p2p.AddPeerToSwitchPeerSet(sw, peer)
  52. }
  53. sw.AddReactor(rts.shim.Name, rts.shim)
  54. return sw
  55. }, log.TestingLogger())
  56. // start the reactor shim
  57. require.NoError(t, rts.shim.Start())
  58. t.Cleanup(func() {
  59. require.NoError(t, rts.shim.Stop())
  60. for _, chs := range rts.shim.Channels {
  61. chs.Channel.Close()
  62. }
  63. })
  64. return rts
  65. }
  66. func simplePeer(t *testing.T, id string) (*p2pmocks.Peer, types.NodeID) {
  67. t.Helper()
  68. peerID := types.NodeID(id)
  69. peer := &p2pmocks.Peer{}
  70. peer.On("ID").Return(peerID)
  71. return peer, peerID
  72. }
  73. func TestReactorShim_GetChannel(t *testing.T) {
  74. rts := setup(t, nil)
  75. p2pCh := rts.shim.GetChannel(p2p.ChannelID(channelID1))
  76. require.NotNil(t, p2pCh)
  77. require.Equal(t, p2pCh.ID, p2p.ChannelID(channelID1))
  78. p2pCh = rts.shim.GetChannel(p2p.ChannelID(byte(0x03)))
  79. require.Nil(t, p2pCh)
  80. }
  81. func TestReactorShim_GetChannels(t *testing.T) {
  82. rts := setup(t, nil)
  83. p2pChs := rts.shim.GetChannels()
  84. require.Len(t, p2pChs, 2)
  85. require.Equal(t, p2p.ChannelID(p2pChs[0].ID), p2p.ChannelID(channelID1))
  86. require.Equal(t, p2p.ChannelID(p2pChs[1].ID), p2p.ChannelID(channelID2))
  87. }
  88. func TestReactorShim_AddPeer(t *testing.T) {
  89. peerA, peerIDA := simplePeer(t, "aa")
  90. rts := setup(t, []p2p.Peer{peerA})
  91. var wg sync.WaitGroup
  92. wg.Add(1)
  93. var peerUpdate p2p.PeerUpdate
  94. go func() {
  95. peerUpdate = <-rts.shim.PeerUpdates.Updates()
  96. wg.Done()
  97. }()
  98. rts.shim.AddPeer(peerA)
  99. wg.Wait()
  100. require.Equal(t, peerIDA, peerUpdate.NodeID)
  101. require.Equal(t, p2p.PeerStatusUp, peerUpdate.Status)
  102. }
  103. func TestReactorShim_RemovePeer(t *testing.T) {
  104. peerA, peerIDA := simplePeer(t, "aa")
  105. rts := setup(t, []p2p.Peer{peerA})
  106. var wg sync.WaitGroup
  107. wg.Add(1)
  108. var peerUpdate p2p.PeerUpdate
  109. go func() {
  110. peerUpdate = <-rts.shim.PeerUpdates.Updates()
  111. wg.Done()
  112. }()
  113. rts.shim.RemovePeer(peerA, "test reason")
  114. wg.Wait()
  115. require.Equal(t, peerIDA, peerUpdate.NodeID)
  116. require.Equal(t, p2p.PeerStatusDown, peerUpdate.Status)
  117. }
  118. func TestReactorShim_Receive(t *testing.T) {
  119. peerA, peerIDA := simplePeer(t, "aa")
  120. rts := setup(t, []p2p.Peer{peerA})
  121. msg := &ssproto.Message{
  122. Sum: &ssproto.Message_ChunkRequest{
  123. ChunkRequest: &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  124. },
  125. }
  126. bz, err := proto.Marshal(msg)
  127. require.NoError(t, err)
  128. var wg sync.WaitGroup
  129. var response *ssproto.Message
  130. peerA.On("Send", channelID1, mock.Anything).Run(func(args mock.Arguments) {
  131. m := &ssproto.Message{}
  132. require.NoError(t, proto.Unmarshal(args[1].([]byte), m))
  133. response = m
  134. wg.Done()
  135. }).Return(true)
  136. p2pCh := rts.shim.Channels[p2p.ChannelID(channelID1)]
  137. wg.Add(2)
  138. // Simulate receiving the envelope in some real reactor and replying back with
  139. // the same envelope and then closing the Channel.
  140. go func() {
  141. e := <-p2pCh.Channel.In
  142. require.Equal(t, peerIDA, e.From)
  143. require.NotNil(t, e.Message)
  144. p2pCh.Channel.Out <- p2p.Envelope{To: e.From, Message: e.Message}
  145. p2pCh.Channel.Close()
  146. wg.Done()
  147. }()
  148. rts.shim.Receive(channelID1, peerA, bz)
  149. // wait until the mock peer called Send and we (fake) proxied the envelope
  150. wg.Wait()
  151. require.NotNil(t, response)
  152. m, err := response.Unwrap()
  153. require.NoError(t, err)
  154. require.Equal(t, msg.GetChunkRequest(), m)
  155. // Since p2pCh was closed in the simulated reactor above, calling Receive
  156. // should not block.
  157. rts.shim.Receive(channelID1, peerA, bz)
  158. require.Empty(t, p2pCh.Channel.In)
  159. peerA.AssertExpectations(t)
  160. }