You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

205 lines
4.7 KiB

  1. package p2p_test
  2. import (
  3. "sync"
  4. "testing"
  5. "github.com/gogo/protobuf/proto"
  6. "github.com/stretchr/testify/mock"
  7. "github.com/stretchr/testify/require"
  8. "github.com/tendermint/tendermint/config"
  9. "github.com/tendermint/tendermint/p2p"
  10. p2pmocks "github.com/tendermint/tendermint/p2p/mocks"
  11. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  12. )
  13. var (
  14. channelID1 = byte(0x01)
  15. channelID2 = byte(0x02)
  16. p2pCfg = config.DefaultP2PConfig()
  17. testChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
  18. p2p.ChannelID(channelID1): {
  19. MsgType: new(ssproto.Message),
  20. Descriptor: &p2p.ChannelDescriptor{
  21. ID: channelID1,
  22. Priority: 3,
  23. SendQueueCapacity: 10,
  24. RecvMessageCapacity: int(4e6),
  25. },
  26. },
  27. p2p.ChannelID(channelID2): {
  28. MsgType: new(ssproto.Message),
  29. Descriptor: &p2p.ChannelDescriptor{
  30. ID: channelID2,
  31. Priority: 1,
  32. SendQueueCapacity: 4,
  33. RecvMessageCapacity: int(16e6),
  34. },
  35. },
  36. }
  37. )
  38. type reactorShimTestSuite struct {
  39. shim *p2p.ReactorShim
  40. sw *p2p.Switch
  41. }
  42. func setup(t *testing.T, peers []p2p.Peer) *reactorShimTestSuite {
  43. t.Helper()
  44. rts := &reactorShimTestSuite{
  45. shim: p2p.NewReactorShim("TestShim", testChannelShims),
  46. }
  47. rts.sw = p2p.MakeSwitch(p2pCfg, 1, "testing", "123.123.123", func(_ int, sw *p2p.Switch) *p2p.Switch {
  48. for _, peer := range peers {
  49. p2p.AddPeerToSwitchPeerSet(sw, peer)
  50. }
  51. sw.AddReactor(rts.shim.Name, rts.shim)
  52. return sw
  53. })
  54. // start the reactor shim
  55. require.NoError(t, rts.shim.Start())
  56. t.Cleanup(func() {
  57. require.NoError(t, rts.shim.Stop())
  58. for _, chs := range rts.shim.Channels {
  59. chs.Channel.Close()
  60. }
  61. })
  62. return rts
  63. }
  64. func simplePeer(t *testing.T, id string) (*p2pmocks.Peer, p2p.NodeID) {
  65. t.Helper()
  66. peerID := p2p.NodeID(id)
  67. peer := &p2pmocks.Peer{}
  68. peer.On("ID").Return(peerID)
  69. return peer, peerID
  70. }
  71. func TestReactorShim_GetChannel(t *testing.T) {
  72. rts := setup(t, nil)
  73. p2pCh := rts.shim.GetChannel(p2p.ChannelID(channelID1))
  74. require.NotNil(t, p2pCh)
  75. require.Equal(t, p2pCh.ID(), p2p.ChannelID(channelID1))
  76. p2pCh = rts.shim.GetChannel(p2p.ChannelID(byte(0x03)))
  77. require.Nil(t, p2pCh)
  78. }
  79. func TestReactorShim_GetChannels(t *testing.T) {
  80. rts := setup(t, nil)
  81. p2pChs := rts.shim.GetChannels()
  82. require.Len(t, p2pChs, 2)
  83. require.Equal(t, p2p.ChannelID(p2pChs[0].ID), p2p.ChannelID(channelID1))
  84. require.Equal(t, p2p.ChannelID(p2pChs[1].ID), p2p.ChannelID(channelID2))
  85. }
  86. func TestReactorShim_AddPeer(t *testing.T) {
  87. peerA, peerIDA := simplePeer(t, "aa")
  88. rts := setup(t, []p2p.Peer{peerA})
  89. var wg sync.WaitGroup
  90. wg.Add(1)
  91. var peerUpdate p2p.PeerUpdate
  92. go func() {
  93. peerUpdate = <-rts.shim.PeerUpdates.Updates()
  94. wg.Done()
  95. }()
  96. rts.shim.AddPeer(peerA)
  97. wg.Wait()
  98. require.Equal(t, peerIDA, peerUpdate.PeerID)
  99. require.Equal(t, p2p.PeerStatusUp, peerUpdate.Status)
  100. }
  101. func TestReactorShim_RemovePeer(t *testing.T) {
  102. peerA, peerIDA := simplePeer(t, "aa")
  103. rts := setup(t, []p2p.Peer{peerA})
  104. var wg sync.WaitGroup
  105. wg.Add(1)
  106. var peerUpdate p2p.PeerUpdate
  107. go func() {
  108. peerUpdate = <-rts.shim.PeerUpdates.Updates()
  109. wg.Done()
  110. }()
  111. rts.shim.RemovePeer(peerA, "test reason")
  112. wg.Wait()
  113. require.Equal(t, peerIDA, peerUpdate.PeerID)
  114. require.Equal(t, p2p.PeerStatusDown, peerUpdate.Status)
  115. }
  116. func TestReactorShim_Receive(t *testing.T) {
  117. peerA, peerIDA := simplePeer(t, "aa")
  118. rts := setup(t, []p2p.Peer{peerA})
  119. msg := &ssproto.Message{
  120. Sum: &ssproto.Message_ChunkRequest{
  121. ChunkRequest: &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  122. },
  123. }
  124. bz, err := proto.Marshal(msg)
  125. require.NoError(t, err)
  126. var wg sync.WaitGroup
  127. var response *ssproto.Message
  128. peerA.On("Send", channelID1, mock.Anything).Run(func(args mock.Arguments) {
  129. m := &ssproto.Message{}
  130. require.NoError(t, proto.Unmarshal(args[1].([]byte), m))
  131. response = m
  132. wg.Done()
  133. }).Return(true)
  134. p2pCh := rts.shim.Channels[p2p.ChannelID(channelID1)]
  135. wg.Add(2)
  136. // Simulate receiving the envelope in some real reactor and replying back with
  137. // the same envelope and then closing the Channel.
  138. go func() {
  139. e := <-p2pCh.Channel.In()
  140. require.Equal(t, peerIDA, e.From)
  141. require.NotNil(t, e.Message)
  142. p2pCh.Channel.Out() <- p2p.Envelope{To: e.From, Message: e.Message}
  143. p2pCh.Channel.Close()
  144. wg.Done()
  145. }()
  146. rts.shim.Receive(channelID1, peerA, bz)
  147. // wait until the mock peer called Send and we (fake) proxied the envelope
  148. wg.Wait()
  149. require.NotNil(t, response)
  150. m, err := response.Unwrap()
  151. require.NoError(t, err)
  152. require.Equal(t, msg.GetChunkRequest(), m)
  153. // Since p2pCh was closed in the simulated reactor above, calling Receive
  154. // should not block.
  155. rts.shim.Receive(channelID1, peerA, bz)
  156. require.Empty(t, p2pCh.Channel.In())
  157. peerA.AssertExpectations(t)
  158. }