You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

675 lines
18 KiB

p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
  1. package p2p
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sync"
  9. "time"
  10. "github.com/tendermint/tendermint/crypto"
  11. "github.com/tendermint/tendermint/libs/log"
  12. "github.com/tendermint/tendermint/libs/protoio"
  13. "github.com/tendermint/tendermint/p2p/conn"
  14. p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
  15. "golang.org/x/net/netutil"
  16. )
  17. const (
  18. defaultDialTimeout = time.Second
  19. defaultFilterTimeout = 5 * time.Second
  20. defaultHandshakeTimeout = 3 * time.Second
  21. )
  22. // MConnProtocol is the MConn protocol identifier.
  23. const MConnProtocol Protocol = "mconn"
  24. // MConnTransportOption sets an option for MConnTransport.
  25. type MConnTransportOption func(*MConnTransport)
  26. // MConnTransportMaxIncomingConnections sets the maximum number of
  27. // simultaneous incoming connections. Default: 0 (unlimited)
  28. func MConnTransportMaxIncomingConnections(max int) MConnTransportOption {
  29. return func(mt *MConnTransport) { mt.maxIncomingConnections = max }
  30. }
  31. // MConnTransportFilterTimeout sets the timeout for filter callbacks.
  32. func MConnTransportFilterTimeout(timeout time.Duration) MConnTransportOption {
  33. return func(mt *MConnTransport) { mt.filterTimeout = timeout }
  34. }
  35. // MConnTransportConnFilters sets connection filters.
  36. func MConnTransportConnFilters(filters ...ConnFilterFunc) MConnTransportOption {
  37. return func(mt *MConnTransport) { mt.connFilters = filters }
  38. }
  39. // ConnFilterFunc is a callback for connection filtering. If it returns an
  40. // error, the connection is rejected. The set of existing connections is passed
  41. // along with the new connection and all resolved IPs.
  42. type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error
  43. // ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection
  44. // and refuses new ones if they come from a known ip.
  45. var ConnDuplicateIPFilter ConnFilterFunc = func(cs ConnSet, c net.Conn, ips []net.IP) error {
  46. for _, ip := range ips {
  47. if cs.HasIP(ip) {
  48. return ErrRejected{
  49. conn: c,
  50. err: fmt.Errorf("ip<%v> already connected", ip),
  51. isDuplicate: true,
  52. }
  53. }
  54. }
  55. return nil
  56. }
  57. // MConnTransport is a Transport implementation using the current multiplexed
  58. // Tendermint protocol ("MConn"). It inherits lots of code and logic from the
  59. // previous implementation for parity with the current P2P stack (such as
  60. // connection filtering, peer verification, and panic handling), which should be
  61. // moved out of the transport once the rest of the P2P stack is rewritten.
  62. type MConnTransport struct {
  63. privKey crypto.PrivKey
  64. nodeInfo NodeInfo
  65. channelDescs []*ChannelDescriptor
  66. mConnConfig conn.MConnConfig
  67. maxIncomingConnections int
  68. dialTimeout time.Duration
  69. handshakeTimeout time.Duration
  70. filterTimeout time.Duration
  71. logger log.Logger
  72. listener net.Listener
  73. closeOnce sync.Once
  74. chAccept chan *mConnConnection
  75. chError chan error
  76. chClose chan struct{}
  77. // FIXME: This is a vestige from the old transport, and should be managed
  78. // by the router once we rewrite the P2P core.
  79. conns ConnSet
  80. connFilters []ConnFilterFunc
  81. }
  82. // NewMConnTransport sets up a new MConn transport.
  83. func NewMConnTransport(
  84. logger log.Logger,
  85. nodeInfo NodeInfo,
  86. privKey crypto.PrivKey,
  87. mConnConfig conn.MConnConfig,
  88. opts ...MConnTransportOption,
  89. ) *MConnTransport {
  90. m := &MConnTransport{
  91. privKey: privKey,
  92. nodeInfo: nodeInfo,
  93. mConnConfig: mConnConfig,
  94. channelDescs: []*ChannelDescriptor{},
  95. dialTimeout: defaultDialTimeout,
  96. handshakeTimeout: defaultHandshakeTimeout,
  97. filterTimeout: defaultFilterTimeout,
  98. logger: logger,
  99. chAccept: make(chan *mConnConnection),
  100. chError: make(chan error),
  101. chClose: make(chan struct{}),
  102. conns: NewConnSet(),
  103. connFilters: []ConnFilterFunc{},
  104. }
  105. for _, opt := range opts {
  106. opt(m)
  107. }
  108. return m
  109. }
  110. // SetChannelDescriptors implements Transport.
  111. func (m *MConnTransport) SetChannelDescriptors(chDescs []*conn.ChannelDescriptor) {
  112. m.channelDescs = chDescs
  113. }
  114. // Listen asynchronously listens for inbound connections on the given endpoint.
  115. // It must be called exactly once before calling Accept(), and the caller must
  116. // call Close() to shut down the listener.
  117. func (m *MConnTransport) Listen(endpoint Endpoint) error {
  118. if m.listener != nil {
  119. return errors.New("MConn transport is already listening")
  120. }
  121. err := m.normalizeEndpoint(&endpoint)
  122. if err != nil {
  123. return fmt.Errorf("invalid MConn listen endpoint %q: %w", endpoint, err)
  124. }
  125. m.listener, err = net.Listen("tcp", fmt.Sprintf("%v:%v", endpoint.IP, endpoint.Port))
  126. if err != nil {
  127. return err
  128. }
  129. if m.maxIncomingConnections > 0 {
  130. m.listener = netutil.LimitListener(m.listener, m.maxIncomingConnections)
  131. }
  132. // Spawn a goroutine to accept inbound connections asynchronously.
  133. go m.accept()
  134. return nil
  135. }
  136. // accept accepts inbound connections in a loop, and asynchronously handshakes
  137. // with the peer to avoid head-of-line blocking. Established connections are
  138. // passed to Accept() via chAccept.
  139. // See: https://github.com/tendermint/tendermint/issues/204
  140. func (m *MConnTransport) accept() {
  141. for {
  142. tcpConn, err := m.listener.Accept()
  143. if err != nil {
  144. // We have to check for closure first, since we don't want to
  145. // propagate "use of closed network connection" errors.
  146. select {
  147. case <-m.chClose:
  148. default:
  149. // We also select on chClose here, in case the transport closes
  150. // while we're blocked on error propagation.
  151. select {
  152. case m.chError <- err:
  153. case <-m.chClose:
  154. }
  155. }
  156. return
  157. }
  158. go func() {
  159. err := m.filterTCPConn(tcpConn)
  160. if err != nil {
  161. if err := tcpConn.Close(); err != nil {
  162. m.logger.Debug("failed to close TCP connection", "err", err)
  163. }
  164. select {
  165. case m.chError <- err:
  166. case <-m.chClose:
  167. }
  168. return
  169. }
  170. conn, err := newMConnConnection(m, tcpConn, "")
  171. if err != nil {
  172. m.conns.Remove(tcpConn)
  173. if err := tcpConn.Close(); err != nil {
  174. m.logger.Debug("failed to close TCP connection", "err", err)
  175. }
  176. select {
  177. case m.chError <- err:
  178. case <-m.chClose:
  179. }
  180. } else {
  181. select {
  182. case m.chAccept <- conn:
  183. case <-m.chClose:
  184. if err := tcpConn.Close(); err != nil {
  185. m.logger.Debug("failed to close TCP connection", "err", err)
  186. }
  187. }
  188. }
  189. }()
  190. }
  191. }
  192. // Accept implements Transport.
  193. //
  194. // accept() runs a concurrent accept loop that accepts inbound connections
  195. // and then handshakes in a non-blocking fashion. The handshaked and validated
  196. // connections are returned via this call, picking them off of the chAccept
  197. // channel (or the handshake error, if any).
  198. func (m *MConnTransport) Accept(ctx context.Context) (Connection, error) {
  199. select {
  200. case conn := <-m.chAccept:
  201. return conn, nil
  202. case err := <-m.chError:
  203. return nil, err
  204. case <-m.chClose:
  205. return nil, ErrTransportClosed{}
  206. case <-ctx.Done():
  207. return nil, nil
  208. }
  209. }
  210. // Dial implements Transport.
  211. func (m *MConnTransport) Dial(ctx context.Context, endpoint Endpoint) (Connection, error) {
  212. err := m.normalizeEndpoint(&endpoint)
  213. if err != nil {
  214. return nil, err
  215. }
  216. ctx, cancel := context.WithTimeout(ctx, m.dialTimeout)
  217. defer cancel()
  218. dialer := net.Dialer{}
  219. tcpConn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%v:%v", endpoint.IP, endpoint.Port))
  220. if err != nil {
  221. return nil, err
  222. }
  223. err = m.filterTCPConn(tcpConn)
  224. if err != nil {
  225. if err := tcpConn.Close(); err != nil {
  226. m.logger.Debug("failed to close TCP connection", "err", err)
  227. }
  228. return nil, err
  229. }
  230. conn, err := newMConnConnection(m, tcpConn, endpoint.PeerID)
  231. if err != nil {
  232. m.conns.Remove(tcpConn)
  233. if err := tcpConn.Close(); err != nil {
  234. m.logger.Debug("failed to close TCP connection", "err", err)
  235. }
  236. return nil, err
  237. }
  238. return conn, nil
  239. }
  240. // Endpoints implements Transport.
  241. func (m *MConnTransport) Endpoints() []Endpoint {
  242. if m.listener == nil {
  243. return []Endpoint{}
  244. }
  245. addr := m.listener.Addr().(*net.TCPAddr)
  246. return []Endpoint{{
  247. Protocol: MConnProtocol,
  248. PeerID: m.nodeInfo.ID(),
  249. IP: addr.IP,
  250. Port: uint16(addr.Port),
  251. }}
  252. }
  253. // Close implements Transport.
  254. func (m *MConnTransport) Close() error {
  255. var err error
  256. m.closeOnce.Do(func() {
  257. // We have to close chClose first, so that accept() will detect
  258. // the closure and not propagate the error.
  259. close(m.chClose)
  260. if m.listener != nil {
  261. err = m.listener.Close()
  262. }
  263. })
  264. return err
  265. }
  266. // filterTCPConn filters a TCP connection, rejecting it if this function errors.
  267. func (m *MConnTransport) filterTCPConn(tcpConn net.Conn) error {
  268. if m.conns.Has(tcpConn) {
  269. return ErrRejected{conn: tcpConn, isDuplicate: true}
  270. }
  271. host, _, err := net.SplitHostPort(tcpConn.RemoteAddr().String())
  272. if err != nil {
  273. return err
  274. }
  275. ip := net.ParseIP(host)
  276. if ip == nil {
  277. return fmt.Errorf("connection address has invalid IP address %q", host)
  278. }
  279. // Apply filter callbacks.
  280. chErr := make(chan error, len(m.connFilters))
  281. for _, connFilter := range m.connFilters {
  282. go func(connFilter ConnFilterFunc) {
  283. chErr <- connFilter(m.conns, tcpConn, []net.IP{ip})
  284. }(connFilter)
  285. }
  286. for i := 0; i < cap(chErr); i++ {
  287. select {
  288. case err := <-chErr:
  289. if err != nil {
  290. return ErrRejected{conn: tcpConn, err: err, isFiltered: true}
  291. }
  292. case <-time.After(m.filterTimeout):
  293. return ErrFilterTimeout{}
  294. }
  295. }
  296. // FIXME: Doesn't really make sense to set this here, but we preserve the
  297. // behavior from the previous P2P transport implementation. This should
  298. // be moved to the router.
  299. m.conns.Set(tcpConn, []net.IP{ip})
  300. return nil
  301. }
  302. // normalizeEndpoint normalizes and validates an endpoint.
  303. func (m *MConnTransport) normalizeEndpoint(endpoint *Endpoint) error {
  304. if endpoint == nil {
  305. return errors.New("nil endpoint")
  306. }
  307. if err := endpoint.Validate(); err != nil {
  308. return err
  309. }
  310. if endpoint.Protocol == "" {
  311. endpoint.Protocol = MConnProtocol
  312. }
  313. if endpoint.Protocol != MConnProtocol {
  314. return fmt.Errorf("unsupported protocol %q", endpoint.Protocol)
  315. }
  316. if len(endpoint.IP) == 0 {
  317. return errors.New("endpoint must have an IP address")
  318. }
  319. if endpoint.Path != "" {
  320. return fmt.Errorf("endpoint cannot have path (got %q)", endpoint.Path)
  321. }
  322. if endpoint.Port == 0 {
  323. endpoint.Port = 26657
  324. }
  325. return nil
  326. }
  327. // mConnConnection implements Connection for MConnTransport. It takes a base TCP
  328. // connection and upgrades it to MConnection over an encrypted SecretConnection.
  329. type mConnConnection struct {
  330. logger log.Logger
  331. transport *MConnTransport
  332. secretConn *conn.SecretConnection
  333. mConn *conn.MConnection
  334. peerInfo NodeInfo
  335. closeOnce sync.Once
  336. chReceive chan mConnMessage
  337. chError chan error
  338. chClose chan struct{}
  339. }
  340. // mConnMessage passes MConnection messages through internal channels.
  341. type mConnMessage struct {
  342. channelID byte
  343. payload []byte
  344. }
  345. // newMConnConnection creates a new mConnConnection by handshaking
  346. // with a peer.
  347. func newMConnConnection(
  348. transport *MConnTransport,
  349. tcpConn net.Conn,
  350. expectPeerID ID,
  351. ) (c *mConnConnection, err error) {
  352. // FIXME: Since the MConnection code panics, we need to recover here
  353. // and turn it into an error. Be careful not to alias err, so we can
  354. // update it from within this function. We should remove panics instead.
  355. defer func() {
  356. if r := recover(); r != nil {
  357. err = ErrRejected{
  358. conn: tcpConn,
  359. err: fmt.Errorf("recovered from panic: %v", r),
  360. isAuthFailure: true,
  361. }
  362. }
  363. }()
  364. err = tcpConn.SetDeadline(time.Now().Add(transport.handshakeTimeout))
  365. if err != nil {
  366. err = ErrRejected{
  367. conn: tcpConn,
  368. err: fmt.Errorf("secret conn failed: %v", err),
  369. isAuthFailure: true,
  370. }
  371. return
  372. }
  373. c = &mConnConnection{
  374. transport: transport,
  375. chReceive: make(chan mConnMessage),
  376. chError: make(chan error),
  377. chClose: make(chan struct{}),
  378. }
  379. c.secretConn, err = conn.MakeSecretConnection(tcpConn, transport.privKey)
  380. if err != nil {
  381. err = ErrRejected{
  382. conn: tcpConn,
  383. err: fmt.Errorf("secret conn failed: %v", err),
  384. isAuthFailure: true,
  385. }
  386. return
  387. }
  388. c.peerInfo, err = c.handshake()
  389. if err != nil {
  390. err = ErrRejected{
  391. conn: tcpConn,
  392. err: fmt.Errorf("handshake failed: %v", err),
  393. isAuthFailure: true,
  394. }
  395. return
  396. }
  397. // Validate node info.
  398. // FIXME: All of the ID verification code below should be moved to the
  399. // router once implemented.
  400. err = c.peerInfo.Validate()
  401. if err != nil {
  402. err = ErrRejected{
  403. conn: tcpConn,
  404. err: err,
  405. isNodeInfoInvalid: true,
  406. }
  407. return
  408. }
  409. // For outgoing conns, ensure connection key matches dialed key.
  410. if expectPeerID != "" {
  411. peerID := PubKeyToID(c.PubKey())
  412. if expectPeerID != peerID {
  413. err = ErrRejected{
  414. conn: tcpConn,
  415. id: peerID,
  416. err: fmt.Errorf(
  417. "conn.ID (%v) dialed ID (%v) mismatch",
  418. peerID,
  419. expectPeerID,
  420. ),
  421. isAuthFailure: true,
  422. }
  423. return
  424. }
  425. }
  426. // Reject self.
  427. if transport.nodeInfo.ID() == c.peerInfo.ID() {
  428. err = ErrRejected{
  429. addr: *NewNetAddress(c.peerInfo.ID(), c.secretConn.RemoteAddr()),
  430. conn: tcpConn,
  431. id: c.peerInfo.ID(),
  432. isSelf: true,
  433. }
  434. return
  435. }
  436. err = transport.nodeInfo.CompatibleWith(c.peerInfo)
  437. if err != nil {
  438. err = ErrRejected{
  439. conn: tcpConn,
  440. err: err,
  441. id: c.peerInfo.ID(),
  442. isIncompatible: true,
  443. }
  444. return
  445. }
  446. err = tcpConn.SetDeadline(time.Time{})
  447. if err != nil {
  448. err = ErrRejected{
  449. conn: tcpConn,
  450. err: fmt.Errorf("secret conn failed: %v", err),
  451. isAuthFailure: true,
  452. }
  453. return
  454. }
  455. // Set up the MConnection wrapper
  456. c.mConn = conn.NewMConnectionWithConfig(
  457. c.secretConn,
  458. transport.channelDescs,
  459. c.onReceive,
  460. c.onError,
  461. transport.mConnConfig,
  462. )
  463. // FIXME: Log format is set up for compatibility with existing peer code.
  464. c.logger = transport.logger.With("peer", c.RemoteEndpoint().NetAddress())
  465. c.mConn.SetLogger(c.logger)
  466. err = c.mConn.Start()
  467. return c, err
  468. }
  469. // handshake performs an MConn handshake, returning the peer's node info.
  470. func (c *mConnConnection) handshake() (NodeInfo, error) {
  471. var pbNodeInfo p2pproto.NodeInfo
  472. chErr := make(chan error, 2)
  473. go func() {
  474. _, err := protoio.NewDelimitedWriter(c.secretConn).WriteMsg(c.transport.nodeInfo.ToProto())
  475. chErr <- err
  476. }()
  477. go func() {
  478. chErr <- protoio.NewDelimitedReader(c.secretConn, MaxNodeInfoSize()).ReadMsg(&pbNodeInfo)
  479. }()
  480. for i := 0; i < cap(chErr); i++ {
  481. if err := <-chErr; err != nil {
  482. return NodeInfo{}, err
  483. }
  484. }
  485. return NodeInfoFromProto(&pbNodeInfo)
  486. }
  487. // onReceive is a callback for MConnection received messages.
  488. func (c *mConnConnection) onReceive(channelID byte, payload []byte) {
  489. select {
  490. case c.chReceive <- mConnMessage{channelID: channelID, payload: payload}:
  491. case <-c.chClose:
  492. }
  493. }
  494. // onError is a callback for MConnection errors. The error is passed to
  495. // chError, which is only consumed by ReceiveMessage() for parity with
  496. // the old MConnection behavior.
  497. func (c *mConnConnection) onError(e interface{}) {
  498. err, ok := e.(error)
  499. if !ok {
  500. err = fmt.Errorf("%v", err)
  501. }
  502. select {
  503. case c.chError <- err:
  504. case <-c.chClose:
  505. }
  506. }
  507. // String displays connection information.
  508. // FIXME: This is here for backwards compatibility with existing code,
  509. // it should probably just return RemoteEndpoint().String(), if anything.
  510. func (c *mConnConnection) String() string {
  511. endpoint := c.RemoteEndpoint()
  512. return fmt.Sprintf("MConn{%v:%v}", endpoint.IP, endpoint.Port)
  513. }
  514. // SendMessage implements Connection.
  515. func (c *mConnConnection) SendMessage(channelID byte, msg []byte) (bool, error) {
  516. // We don't check chError here, to preserve old MConnection behavior.
  517. select {
  518. case <-c.chClose:
  519. return false, io.EOF
  520. default:
  521. return c.mConn.Send(channelID, msg), nil
  522. }
  523. }
  524. // TrySendMessage implements Connection.
  525. func (c *mConnConnection) TrySendMessage(channelID byte, msg []byte) (bool, error) {
  526. // We don't check chError here, to preserve old MConnection behavior.
  527. select {
  528. case <-c.chClose:
  529. return false, io.EOF
  530. default:
  531. return c.mConn.TrySend(channelID, msg), nil
  532. }
  533. }
  534. // ReceiveMessage implements Connection.
  535. func (c *mConnConnection) ReceiveMessage() (byte, []byte, error) {
  536. select {
  537. case err := <-c.chError:
  538. return 0, nil, err
  539. case <-c.chClose:
  540. return 0, nil, io.EOF
  541. case msg := <-c.chReceive:
  542. return msg.channelID, msg.payload, nil
  543. }
  544. }
  545. // NodeInfo implements Connection.
  546. func (c *mConnConnection) NodeInfo() NodeInfo {
  547. return c.peerInfo
  548. }
  549. // PubKey implements Connection.
  550. func (c *mConnConnection) PubKey() crypto.PubKey {
  551. return c.secretConn.RemotePubKey()
  552. }
  553. // LocalEndpoint implements Connection.
  554. func (c *mConnConnection) LocalEndpoint() Endpoint {
  555. // FIXME: For compatibility with existing P2P tests we need to
  556. // handle non-TCP connections. This should be removed.
  557. endpoint := Endpoint{
  558. Protocol: MConnProtocol,
  559. PeerID: c.transport.nodeInfo.ID(),
  560. }
  561. if addr, ok := c.secretConn.LocalAddr().(*net.TCPAddr); ok {
  562. endpoint.IP = addr.IP
  563. endpoint.Port = uint16(addr.Port)
  564. }
  565. return endpoint
  566. }
  567. // RemoteEndpoint implements Connection.
  568. func (c *mConnConnection) RemoteEndpoint() Endpoint {
  569. // FIXME: For compatibility with existing P2P tests we need to
  570. // handle non-TCP connections. This should be removed.
  571. endpoint := Endpoint{
  572. Protocol: MConnProtocol,
  573. PeerID: c.peerInfo.ID(),
  574. }
  575. if addr, ok := c.secretConn.RemoteAddr().(*net.TCPAddr); ok {
  576. endpoint.IP = addr.IP
  577. endpoint.Port = uint16(addr.Port)
  578. }
  579. return endpoint
  580. }
  581. // Status implements Connection.
  582. func (c *mConnConnection) Status() conn.ConnectionStatus {
  583. return c.mConn.Status()
  584. }
  585. // Close implements Connection.
  586. func (c *mConnConnection) Close() error {
  587. c.transport.conns.RemoveAddr(c.secretConn.RemoteAddr())
  588. var err error
  589. c.closeOnce.Do(func() {
  590. err = c.mConn.Stop()
  591. close(c.chClose)
  592. })
  593. return err
  594. }
  595. // FlushClose implements Connection.
  596. func (c *mConnConnection) FlushClose() error {
  597. c.transport.conns.RemoveAddr(c.secretConn.RemoteAddr())
  598. c.closeOnce.Do(func() {
  599. c.mConn.FlushStop()
  600. close(c.chClose)
  601. })
  602. return nil
  603. }