You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

218 lines
4.9 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. golog "log"
  5. "net"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. "github.com/tendermint/tendermint/crypto"
  11. "github.com/tendermint/tendermint/crypto/ed25519"
  12. cmn "github.com/tendermint/tendermint/libs/common"
  13. "github.com/tendermint/tendermint/libs/log"
  14. "github.com/tendermint/tendermint/config"
  15. tmconn "github.com/tendermint/tendermint/p2p/conn"
  16. )
  17. func TestPeerBasic(t *testing.T) {
  18. assert, require := assert.New(t), require.New(t)
  19. // simulate remote peer
  20. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
  21. rp.Start()
  22. defer rp.Stop()
  23. p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), cfg, tmconn.DefaultMConnConfig())
  24. require.Nil(err)
  25. err = p.Start()
  26. require.Nil(err)
  27. defer p.Stop()
  28. assert.True(p.IsRunning())
  29. assert.True(p.IsOutbound())
  30. assert.False(p.IsPersistent())
  31. p.persistent = true
  32. assert.True(p.IsPersistent())
  33. assert.Equal(rp.Addr().DialString(), p.Addr().String())
  34. assert.Equal(rp.ID(), p.ID())
  35. }
  36. func TestPeerSend(t *testing.T) {
  37. assert, require := assert.New(t), require.New(t)
  38. config := cfg
  39. // simulate remote peer
  40. rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: config}
  41. rp.Start()
  42. defer rp.Stop()
  43. p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config, tmconn.DefaultMConnConfig())
  44. require.Nil(err)
  45. err = p.Start()
  46. require.Nil(err)
  47. defer p.Stop()
  48. assert.True(p.CanSend(testCh))
  49. assert.True(p.Send(testCh, []byte("Asylum")))
  50. }
  51. func createOutboundPeerAndPerformHandshake(
  52. addr *NetAddress,
  53. config *config.P2PConfig,
  54. mConfig tmconn.MConnConfig,
  55. ) (*peer, error) {
  56. chDescs := []*tmconn.ChannelDescriptor{
  57. {ID: testCh, Priority: 1},
  58. }
  59. reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)}
  60. pk := ed25519.GenPrivKey()
  61. pc, err := testOutboundPeerConn(addr, config, false, pk)
  62. if err != nil {
  63. return nil, err
  64. }
  65. timeout := 1 * time.Second
  66. ourNodeInfo := testNodeInfo(addr.ID, "host_peer")
  67. peerNodeInfo, err := handshake(pc.conn, timeout, ourNodeInfo)
  68. if err != nil {
  69. return nil, err
  70. }
  71. p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {})
  72. p.SetLogger(log.TestingLogger().With("peer", addr))
  73. return p, nil
  74. }
  75. func testDial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) {
  76. if cfg.TestDialFail {
  77. return nil, fmt.Errorf("dial err (peerConfig.DialFail == true)")
  78. }
  79. conn, err := addr.DialTimeout(cfg.DialTimeout)
  80. if err != nil {
  81. return nil, err
  82. }
  83. return conn, nil
  84. }
  85. func testOutboundPeerConn(
  86. addr *NetAddress,
  87. config *config.P2PConfig,
  88. persistent bool,
  89. ourNodePrivKey crypto.PrivKey,
  90. ) (peerConn, error) {
  91. conn, err := testDial(addr, config)
  92. if err != nil {
  93. return peerConn{}, cmn.ErrorWrap(err, "Error creating peer")
  94. }
  95. pc, err := testPeerConn(conn, config, true, persistent, ourNodePrivKey, addr)
  96. if err != nil {
  97. if cerr := conn.Close(); cerr != nil {
  98. return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
  99. }
  100. return peerConn{}, err
  101. }
  102. // ensure dialed ID matches connection ID
  103. if addr.ID != pc.ID() {
  104. if cerr := conn.Close(); cerr != nil {
  105. return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
  106. }
  107. return peerConn{}, ErrSwitchAuthenticationFailure{addr, pc.ID()}
  108. }
  109. return pc, nil
  110. }
  111. type remotePeer struct {
  112. PrivKey crypto.PrivKey
  113. Config *config.P2PConfig
  114. addr *NetAddress
  115. quit chan struct{}
  116. channels cmn.HexBytes
  117. listenAddr string
  118. }
  119. func (rp *remotePeer) Addr() *NetAddress {
  120. return rp.addr
  121. }
  122. func (rp *remotePeer) ID() ID {
  123. return PubKeyToID(rp.PrivKey.PubKey())
  124. }
  125. func (rp *remotePeer) Start() {
  126. if rp.listenAddr == "" {
  127. rp.listenAddr = "127.0.0.1:0"
  128. }
  129. l, e := net.Listen("tcp", rp.listenAddr) // any available address
  130. if e != nil {
  131. golog.Fatalf("net.Listen tcp :0: %+v", e)
  132. }
  133. rp.addr = NewNetAddress(PubKeyToID(rp.PrivKey.PubKey()), l.Addr())
  134. rp.quit = make(chan struct{})
  135. if rp.channels == nil {
  136. rp.channels = []byte{testCh}
  137. }
  138. go rp.accept(l)
  139. }
  140. func (rp *remotePeer) Stop() {
  141. close(rp.quit)
  142. }
  143. func (rp *remotePeer) accept(l net.Listener) {
  144. conns := []net.Conn{}
  145. for {
  146. conn, err := l.Accept()
  147. if err != nil {
  148. golog.Fatalf("Failed to accept conn: %+v", err)
  149. }
  150. pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey)
  151. if err != nil {
  152. golog.Fatalf("Failed to create a peer: %+v", err)
  153. }
  154. _, err = handshake(pc.conn, time.Second, rp.nodeInfo(l))
  155. if err != nil {
  156. golog.Fatalf("Failed to perform handshake: %+v", err)
  157. }
  158. conns = append(conns, conn)
  159. select {
  160. case <-rp.quit:
  161. for _, conn := range conns {
  162. if err := conn.Close(); err != nil {
  163. golog.Fatal(err)
  164. }
  165. }
  166. return
  167. default:
  168. }
  169. }
  170. }
  171. func (rp *remotePeer) nodeInfo(l net.Listener) NodeInfo {
  172. return DefaultNodeInfo{
  173. ProtocolVersion: defaultProtocolVersion,
  174. ID_: rp.Addr().ID,
  175. ListenAddr: l.Addr().String(),
  176. Network: "testing",
  177. Version: "1.2.3-rc0-deadbeef",
  178. Channels: rp.channels,
  179. Moniker: "remote_peer",
  180. }
  181. }