You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

428 lines
11 KiB

  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. crypto "github.com/tendermint/tendermint/crypto"
  12. "github.com/tendermint/tendermint/libs/log"
  13. "github.com/tendermint/tendermint/config"
  14. "github.com/tendermint/tendermint/p2p/conn"
  15. )
  16. var (
  17. cfg *config.P2PConfig
  18. )
  19. func init() {
  20. cfg = config.DefaultP2PConfig()
  21. cfg.PexReactor = true
  22. cfg.AllowDuplicateIP = true
  23. }
  24. type PeerMessage struct {
  25. PeerID ID
  26. Bytes []byte
  27. Counter int
  28. }
  29. type TestReactor struct {
  30. BaseReactor
  31. mtx sync.Mutex
  32. channels []*conn.ChannelDescriptor
  33. logMessages bool
  34. msgsCounter int
  35. msgsReceived map[byte][]PeerMessage
  36. }
  37. func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
  38. tr := &TestReactor{
  39. channels: channels,
  40. logMessages: logMessages,
  41. msgsReceived: make(map[byte][]PeerMessage),
  42. }
  43. tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
  44. tr.SetLogger(log.TestingLogger())
  45. return tr
  46. }
  47. func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
  48. return tr.channels
  49. }
  50. func (tr *TestReactor) AddPeer(peer Peer) {}
  51. func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
  52. func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
  53. if tr.logMessages {
  54. tr.mtx.Lock()
  55. defer tr.mtx.Unlock()
  56. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  57. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
  58. tr.msgsCounter++
  59. }
  60. }
  61. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  62. tr.mtx.Lock()
  63. defer tr.mtx.Unlock()
  64. return tr.msgsReceived[chID]
  65. }
  66. //-----------------------------------------------------------------------------
  67. // convenience method for creating two switches connected to each other.
  68. // XXX: note this uses net.Pipe and not a proper TCP conn
  69. func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  70. // Create two switches that will be interconnected.
  71. switches := MakeConnectedSwitches(cfg, 2, initSwitch, Connect2Switches)
  72. return switches[0], switches[1]
  73. }
  74. func initSwitchFunc(i int, sw *Switch) *Switch {
  75. sw.SetAddrBook(&addrBookMock{
  76. addrs: make(map[string]struct{}),
  77. ourAddrs: make(map[string]struct{})})
  78. // Make two reactors of two channels each
  79. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  80. {ID: byte(0x00), Priority: 10},
  81. {ID: byte(0x01), Priority: 10},
  82. }, true))
  83. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  84. {ID: byte(0x02), Priority: 10},
  85. {ID: byte(0x03), Priority: 10},
  86. }, true))
  87. return sw
  88. }
  89. func TestSwitches(t *testing.T) {
  90. s1, s2 := MakeSwitchPair(t, initSwitchFunc)
  91. defer s1.Stop()
  92. defer s2.Stop()
  93. if s1.Peers().Size() != 1 {
  94. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  95. }
  96. if s2.Peers().Size() != 1 {
  97. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  98. }
  99. // Lets send some messages
  100. ch0Msg := []byte("channel zero")
  101. ch1Msg := []byte("channel foo")
  102. ch2Msg := []byte("channel bar")
  103. s1.Broadcast(byte(0x00), ch0Msg)
  104. s1.Broadcast(byte(0x01), ch1Msg)
  105. s1.Broadcast(byte(0x02), ch2Msg)
  106. assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  107. assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  108. assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  109. }
  110. func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
  111. ticker := time.NewTicker(checkPeriod)
  112. for {
  113. select {
  114. case <-ticker.C:
  115. msgs := reactor.getMsgs(channel)
  116. if len(msgs) > 0 {
  117. if !bytes.Equal(msgs[0].Bytes, msgBytes) {
  118. t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
  119. }
  120. return
  121. }
  122. case <-time.After(timeout):
  123. t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
  124. }
  125. }
  126. }
  127. func TestConnAddrFilter(t *testing.T) {
  128. s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  129. s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  130. defer s1.Stop()
  131. defer s2.Stop()
  132. c1, c2 := conn.NetPipe()
  133. s1.SetAddrFilter(func(addr net.Addr) error {
  134. if addr.String() == c1.RemoteAddr().String() {
  135. return fmt.Errorf("Error: pipe is blacklisted")
  136. }
  137. return nil
  138. })
  139. // connect to good peer
  140. go func() {
  141. err := s1.addPeerWithConnection(c1)
  142. assert.NotNil(t, err, "expected err")
  143. }()
  144. go func() {
  145. err := s2.addPeerWithConnection(c2)
  146. assert.NotNil(t, err, "expected err")
  147. }()
  148. assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
  149. assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
  150. }
  151. func TestSwitchFiltersOutItself(t *testing.T) {
  152. s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc)
  153. // addr := s1.NodeInfo().NetAddress()
  154. // // add ourselves like we do in node.go#427
  155. // s1.addrBook.AddOurAddress(addr)
  156. // simulate s1 having a public IP by creating a remote peer with the same ID
  157. rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg}
  158. rp.Start()
  159. // addr should be rejected in addPeer based on the same ID
  160. err := s1.DialPeerWithAddress(rp.Addr(), false)
  161. if assert.Error(t, err) {
  162. assert.Equal(t, ErrSwitchConnectToSelf{rp.Addr()}.Error(), err.Error())
  163. }
  164. assert.True(t, s1.addrBook.OurAddress(rp.Addr()))
  165. assert.False(t, s1.addrBook.HasAddress(rp.Addr()))
  166. rp.Stop()
  167. assertNoPeersAfterTimeout(t, s1, 100*time.Millisecond)
  168. }
  169. func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
  170. time.Sleep(timeout)
  171. if sw.Peers().Size() != 0 {
  172. t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
  173. }
  174. }
  175. func TestConnIDFilter(t *testing.T) {
  176. s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  177. s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  178. defer s1.Stop()
  179. defer s2.Stop()
  180. c1, c2 := conn.NetPipe()
  181. s1.SetIDFilter(func(id ID) error {
  182. if id == s2.nodeInfo.ID {
  183. return fmt.Errorf("Error: pipe is blacklisted")
  184. }
  185. return nil
  186. })
  187. s2.SetIDFilter(func(id ID) error {
  188. if id == s1.nodeInfo.ID {
  189. return fmt.Errorf("Error: pipe is blacklisted")
  190. }
  191. return nil
  192. })
  193. go func() {
  194. err := s1.addPeerWithConnection(c1)
  195. assert.NotNil(t, err, "expected error")
  196. }()
  197. go func() {
  198. err := s2.addPeerWithConnection(c2)
  199. assert.NotNil(t, err, "expected error")
  200. }()
  201. assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
  202. assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
  203. }
  204. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  205. assert, require := assert.New(t), require.New(t)
  206. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  207. err := sw.Start()
  208. if err != nil {
  209. t.Error(err)
  210. }
  211. defer sw.Stop()
  212. // simulate remote peer
  213. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: cfg}
  214. rp.Start()
  215. defer rp.Stop()
  216. pc, err := newOutboundPeerConn(rp.Addr(), cfg, false, sw.nodeKey.PrivKey)
  217. require.Nil(err)
  218. err = sw.addPeer(pc)
  219. require.Nil(err)
  220. peer := sw.Peers().Get(rp.ID())
  221. require.NotNil(peer)
  222. // simulate failure by closing connection
  223. pc.CloseConn()
  224. assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
  225. assert.False(peer.IsRunning())
  226. }
  227. func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
  228. assert, require := assert.New(t), require.New(t)
  229. sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
  230. err := sw.Start()
  231. if err != nil {
  232. t.Error(err)
  233. }
  234. defer sw.Stop()
  235. // simulate remote peer
  236. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: cfg}
  237. rp.Start()
  238. defer rp.Stop()
  239. pc, err := newOutboundPeerConn(rp.Addr(), cfg, true, sw.nodeKey.PrivKey)
  240. // sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey,
  241. require.Nil(err)
  242. require.Nil(sw.addPeer(pc))
  243. peer := sw.Peers().Get(rp.ID())
  244. require.NotNil(peer)
  245. // simulate failure by closing connection
  246. pc.CloseConn()
  247. // TODO: remove sleep, detect the disconnection, wait for reconnect
  248. npeers := sw.Peers().Size()
  249. for i := 0; i < 20; i++ {
  250. time.Sleep(250 * time.Millisecond)
  251. npeers = sw.Peers().Size()
  252. if npeers > 0 {
  253. break
  254. }
  255. }
  256. assert.NotZero(npeers)
  257. assert.False(peer.IsRunning())
  258. // simulate another remote peer
  259. rp = &remotePeer{
  260. PrivKey: crypto.GenPrivKeyEd25519(),
  261. Config: cfg,
  262. // Use different interface to prevent duplicate IP filter, this will break
  263. // beyond two peers.
  264. listenAddr: "127.0.0.1:0",
  265. }
  266. rp.Start()
  267. defer rp.Stop()
  268. // simulate first time dial failure
  269. conf := config.DefaultP2PConfig()
  270. conf.TestDialFail = true
  271. err = sw.addOutboundPeerWithConfig(rp.Addr(), conf, true)
  272. require.NotNil(err)
  273. // DialPeerWithAddres - sw.peerConfig resets the dialer
  274. // TODO: same as above
  275. for i := 0; i < 20; i++ {
  276. time.Sleep(250 * time.Millisecond)
  277. npeers = sw.Peers().Size()
  278. if npeers > 1 {
  279. break
  280. }
  281. }
  282. assert.EqualValues(2, npeers)
  283. }
  284. func TestSwitchFullConnectivity(t *testing.T) {
  285. switches := MakeConnectedSwitches(cfg, 3, initSwitchFunc, Connect2Switches)
  286. defer func() {
  287. for _, sw := range switches {
  288. sw.Stop()
  289. }
  290. }()
  291. for i, sw := range switches {
  292. if sw.Peers().Size() != 2 {
  293. t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
  294. }
  295. }
  296. }
  297. func BenchmarkSwitchBroadcast(b *testing.B) {
  298. s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
  299. // Make bar reactors of bar channels each
  300. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  301. {ID: byte(0x00), Priority: 10},
  302. {ID: byte(0x01), Priority: 10},
  303. }, false))
  304. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  305. {ID: byte(0x02), Priority: 10},
  306. {ID: byte(0x03), Priority: 10},
  307. }, false))
  308. return sw
  309. })
  310. defer s1.Stop()
  311. defer s2.Stop()
  312. // Allow time for goroutines to boot up
  313. time.Sleep(1 * time.Second)
  314. b.ResetTimer()
  315. numSuccess, numFailure := 0, 0
  316. // Send random message from foo channel to another
  317. for i := 0; i < b.N; i++ {
  318. chID := byte(i % 4)
  319. successChan := s1.Broadcast(chID, []byte("test data"))
  320. for s := range successChan {
  321. if s {
  322. numSuccess++
  323. } else {
  324. numFailure++
  325. }
  326. }
  327. }
  328. b.Logf("success: %v, failure: %v", numSuccess, numFailure)
  329. }
  330. type addrBookMock struct {
  331. addrs map[string]struct{}
  332. ourAddrs map[string]struct{}
  333. }
  334. var _ AddrBook = (*addrBookMock)(nil)
  335. func (book *addrBookMock) AddAddress(addr *NetAddress, src *NetAddress) error {
  336. book.addrs[addr.String()] = struct{}{}
  337. return nil
  338. }
  339. func (book *addrBookMock) AddOurAddress(addr *NetAddress) { book.ourAddrs[addr.String()] = struct{}{} }
  340. func (book *addrBookMock) OurAddress(addr *NetAddress) bool {
  341. _, ok := book.ourAddrs[addr.String()]
  342. return ok
  343. }
  344. func (book *addrBookMock) MarkGood(*NetAddress) {}
  345. func (book *addrBookMock) HasAddress(addr *NetAddress) bool {
  346. _, ok := book.addrs[addr.String()]
  347. return ok
  348. }
  349. func (book *addrBookMock) RemoveAddress(addr *NetAddress) {
  350. delete(book.addrs, addr.String())
  351. }
  352. func (book *addrBookMock) Save() {}