You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

909 lines
24 KiB

p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "runtime/debug"
  11. "sync/atomic"
  12. "time"
  13. "github.com/gogo/protobuf/proto"
  14. flow "github.com/tendermint/tendermint/libs/flowrate"
  15. "github.com/tendermint/tendermint/libs/log"
  16. tmmath "github.com/tendermint/tendermint/libs/math"
  17. "github.com/tendermint/tendermint/libs/protoio"
  18. "github.com/tendermint/tendermint/libs/service"
  19. tmsync "github.com/tendermint/tendermint/libs/sync"
  20. "github.com/tendermint/tendermint/libs/timer"
  21. tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  22. )
  23. const (
  24. defaultMaxPacketMsgPayloadSize = 1024
  25. numBatchPacketMsgs = 10
  26. minReadBufferSize = 1024
  27. minWriteBufferSize = 65536
  28. updateStats = 2 * time.Second
  29. // some of these defaults are written in the user config
  30. // flushThrottle, sendRate, recvRate
  31. // TODO: remove values present in config
  32. defaultFlushThrottle = 100 * time.Millisecond
  33. defaultSendQueueCapacity = 1
  34. defaultRecvBufferCapacity = 4096
  35. defaultRecvMessageCapacity = 22020096 // 21MB
  36. defaultSendRate = int64(512000) // 500KB/s
  37. defaultRecvRate = int64(512000) // 500KB/s
  38. defaultSendTimeout = 10 * time.Second
  39. defaultPingInterval = 60 * time.Second
  40. defaultPongTimeout = 45 * time.Second
  41. )
  42. type receiveCbFunc func(chID byte, msgBytes []byte)
  43. type errorCbFunc func(interface{})
  44. /*
  45. Each peer has one `MConnection` (multiplex connection) instance.
  46. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  47. several messages along a single channel of communication.
  48. Each `MConnection` handles message transmission on multiple abstract communication
  49. `Channel`s. Each channel has a globally unique byte id.
  50. The byte id and the relative priorities of each `Channel` are configured upon
  51. initialization of the connection.
  52. There are two methods for sending messages:
  53. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  54. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  55. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  56. successfully queued for the channel with the given id byte `chID`, or until the
  57. request times out. The message `msg` is serialized using Protobuf.
  58. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  59. channel's queue is full.
  60. Inbound message bytes are handled with an onReceive callback function.
  61. */
  62. type MConnection struct {
  63. service.BaseService
  64. conn net.Conn
  65. bufConnReader *bufio.Reader
  66. bufConnWriter *bufio.Writer
  67. sendMonitor *flow.Monitor
  68. recvMonitor *flow.Monitor
  69. send chan struct{}
  70. pong chan struct{}
  71. channels []*Channel
  72. channelsIdx map[byte]*Channel
  73. onReceive receiveCbFunc
  74. onError errorCbFunc
  75. errored uint32
  76. config MConnConfig
  77. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  78. // doneSendRoutine is closed when the sendRoutine actually quits.
  79. quitSendRoutine chan struct{}
  80. doneSendRoutine chan struct{}
  81. // Closing quitRecvRouting will cause the recvRouting to eventually quit.
  82. quitRecvRoutine chan struct{}
  83. // used to ensure FlushStop and OnStop
  84. // are safe to call concurrently.
  85. stopMtx tmsync.Mutex
  86. flushTimer *timer.ThrottleTimer // flush writes as necessary but throttled.
  87. pingTimer *time.Ticker // send pings periodically
  88. // close conn if pong is not received in pongTimeout
  89. pongTimer *time.Timer
  90. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  91. chStatsTimer *time.Ticker // update channel stats periodically
  92. created time.Time // time of creation
  93. _maxPacketMsgSize int
  94. }
  95. // MConnConfig is a MConnection configuration.
  96. type MConnConfig struct {
  97. SendRate int64 `mapstructure:"send_rate"`
  98. RecvRate int64 `mapstructure:"recv_rate"`
  99. // Maximum payload size
  100. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  101. // Interval to flush writes (throttled)
  102. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  103. // Interval to send pings
  104. PingInterval time.Duration `mapstructure:"ping_interval"`
  105. // Maximum wait time for pongs
  106. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  107. }
  108. // DefaultMConnConfig returns the default config.
  109. func DefaultMConnConfig() MConnConfig {
  110. return MConnConfig{
  111. SendRate: defaultSendRate,
  112. RecvRate: defaultRecvRate,
  113. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  114. FlushThrottle: defaultFlushThrottle,
  115. PingInterval: defaultPingInterval,
  116. PongTimeout: defaultPongTimeout,
  117. }
  118. }
  119. // NewMConnection wraps net.Conn and creates multiplex connection
  120. func NewMConnection(
  121. conn net.Conn,
  122. chDescs []*ChannelDescriptor,
  123. onReceive receiveCbFunc,
  124. onError errorCbFunc,
  125. ) *MConnection {
  126. return NewMConnectionWithConfig(
  127. conn,
  128. chDescs,
  129. onReceive,
  130. onError,
  131. DefaultMConnConfig())
  132. }
  133. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  134. func NewMConnectionWithConfig(
  135. conn net.Conn,
  136. chDescs []*ChannelDescriptor,
  137. onReceive receiveCbFunc,
  138. onError errorCbFunc,
  139. config MConnConfig,
  140. ) *MConnection {
  141. if config.PongTimeout >= config.PingInterval {
  142. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  143. }
  144. mconn := &MConnection{
  145. conn: conn,
  146. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  147. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  148. sendMonitor: flow.New(0, 0),
  149. recvMonitor: flow.New(0, 0),
  150. send: make(chan struct{}, 1),
  151. pong: make(chan struct{}, 1),
  152. onReceive: onReceive,
  153. onError: onError,
  154. config: config,
  155. created: time.Now(),
  156. }
  157. // Create channels
  158. var channelsIdx = map[byte]*Channel{}
  159. var channels = []*Channel{}
  160. for _, desc := range chDescs {
  161. channel := newChannel(mconn, *desc)
  162. channelsIdx[channel.desc.ID] = channel
  163. channels = append(channels, channel)
  164. }
  165. mconn.channels = channels
  166. mconn.channelsIdx = channelsIdx
  167. mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn)
  168. // maxPacketMsgSize() is a bit heavy, so call just once
  169. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  170. return mconn
  171. }
  172. func (c *MConnection) SetLogger(l log.Logger) {
  173. c.BaseService.SetLogger(l)
  174. for _, ch := range c.channels {
  175. ch.SetLogger(l)
  176. }
  177. }
  178. // OnStart implements BaseService
  179. func (c *MConnection) OnStart() error {
  180. if err := c.BaseService.OnStart(); err != nil {
  181. return err
  182. }
  183. c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle)
  184. c.pingTimer = time.NewTicker(c.config.PingInterval)
  185. c.pongTimeoutCh = make(chan bool, 1)
  186. c.chStatsTimer = time.NewTicker(updateStats)
  187. c.quitSendRoutine = make(chan struct{})
  188. c.doneSendRoutine = make(chan struct{})
  189. c.quitRecvRoutine = make(chan struct{})
  190. go c.sendRoutine()
  191. go c.recvRoutine()
  192. return nil
  193. }
  194. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  195. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  196. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  197. func (c *MConnection) stopServices() (alreadyStopped bool) {
  198. c.stopMtx.Lock()
  199. defer c.stopMtx.Unlock()
  200. select {
  201. case <-c.quitSendRoutine:
  202. // already quit
  203. return true
  204. default:
  205. }
  206. select {
  207. case <-c.quitRecvRoutine:
  208. // already quit
  209. return true
  210. default:
  211. }
  212. c.BaseService.OnStop()
  213. c.flushTimer.Stop()
  214. c.pingTimer.Stop()
  215. c.chStatsTimer.Stop()
  216. // inform the recvRouting that we are shutting down
  217. close(c.quitRecvRoutine)
  218. close(c.quitSendRoutine)
  219. return false
  220. }
  221. // FlushStop replicates the logic of OnStop.
  222. // It additionally ensures that all successful
  223. // .Send() calls will get flushed before closing
  224. // the connection.
  225. func (c *MConnection) FlushStop() {
  226. if c.stopServices() {
  227. return
  228. }
  229. // this block is unique to FlushStop
  230. {
  231. // wait until the sendRoutine exits
  232. // so we dont race on calling sendSomePacketMsgs
  233. <-c.doneSendRoutine
  234. // Send and flush all pending msgs.
  235. // Since sendRoutine has exited, we can call this
  236. // safely
  237. eof := c.sendSomePacketMsgs()
  238. for !eof {
  239. eof = c.sendSomePacketMsgs()
  240. }
  241. c.flush()
  242. // Now we can close the connection
  243. }
  244. c.conn.Close()
  245. // We can't close pong safely here because
  246. // recvRoutine may write to it after we've stopped.
  247. // Though it doesn't need to get closed at all,
  248. // we close it @ recvRoutine.
  249. // c.Stop()
  250. }
  251. // OnStop implements BaseService
  252. func (c *MConnection) OnStop() {
  253. if c.stopServices() {
  254. return
  255. }
  256. c.conn.Close()
  257. // We can't close pong safely here because
  258. // recvRoutine may write to it after we've stopped.
  259. // Though it doesn't need to get closed at all,
  260. // we close it @ recvRoutine.
  261. }
  262. func (c *MConnection) String() string {
  263. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  264. }
  265. func (c *MConnection) flush() {
  266. c.Logger.Debug("Flush", "conn", c)
  267. err := c.bufConnWriter.Flush()
  268. if err != nil {
  269. c.Logger.Debug("MConnection flush failed", "err", err)
  270. }
  271. }
  272. // Catch panics, usually caused by remote disconnects.
  273. func (c *MConnection) _recover() {
  274. if r := recover(); r != nil {
  275. c.Logger.Error("MConnection panicked", "err", r, "stack", string(debug.Stack()))
  276. c.stopForError(fmt.Errorf("recovered from panic: %v", r))
  277. }
  278. }
  279. func (c *MConnection) stopForError(r interface{}) {
  280. if err := c.Stop(); err != nil {
  281. c.Logger.Error("Error stopping connection", "err", err)
  282. }
  283. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  284. if c.onError != nil {
  285. c.onError(r)
  286. }
  287. }
  288. }
  289. // Queues a message to be sent to channel.
  290. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  291. if !c.IsRunning() {
  292. return false
  293. }
  294. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  295. // Send message to channel.
  296. channel, ok := c.channelsIdx[chID]
  297. if !ok {
  298. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  299. return false
  300. }
  301. success := channel.sendBytes(msgBytes)
  302. if success {
  303. // Wake up sendRoutine if necessary
  304. select {
  305. case c.send <- struct{}{}:
  306. default:
  307. }
  308. } else {
  309. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  310. }
  311. return success
  312. }
  313. // Queues a message to be sent to channel.
  314. // Nonblocking, returns true if successful.
  315. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  316. if !c.IsRunning() {
  317. return false
  318. }
  319. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  320. // Send message to channel.
  321. channel, ok := c.channelsIdx[chID]
  322. if !ok {
  323. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  324. return false
  325. }
  326. ok = channel.trySendBytes(msgBytes)
  327. if ok {
  328. // Wake up sendRoutine if necessary
  329. select {
  330. case c.send <- struct{}{}:
  331. default:
  332. }
  333. }
  334. return ok
  335. }
  336. // CanSend returns true if you can send more data onto the chID, false
  337. // otherwise. Use only as a heuristic.
  338. func (c *MConnection) CanSend(chID byte) bool {
  339. if !c.IsRunning() {
  340. return false
  341. }
  342. channel, ok := c.channelsIdx[chID]
  343. if !ok {
  344. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  345. return false
  346. }
  347. return channel.canSend()
  348. }
  349. // sendRoutine polls for packets to send from channels.
  350. func (c *MConnection) sendRoutine() {
  351. defer c._recover()
  352. protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter)
  353. FOR_LOOP:
  354. for {
  355. var _n int
  356. var err error
  357. SELECTION:
  358. select {
  359. case <-c.flushTimer.Ch:
  360. // NOTE: flushTimer.Set() must be called every time
  361. // something is written to .bufConnWriter.
  362. c.flush()
  363. case <-c.chStatsTimer.C:
  364. for _, channel := range c.channels {
  365. channel.updateStats()
  366. }
  367. case <-c.pingTimer.C:
  368. c.Logger.Debug("Send Ping")
  369. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  370. if err != nil {
  371. c.Logger.Error("Failed to send PacketPing", "err", err)
  372. break SELECTION
  373. }
  374. c.sendMonitor.Update(_n)
  375. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  376. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  377. select {
  378. case c.pongTimeoutCh <- true:
  379. default:
  380. }
  381. })
  382. c.flush()
  383. case timeout := <-c.pongTimeoutCh:
  384. if timeout {
  385. c.Logger.Debug("Pong timeout")
  386. err = errors.New("pong timeout")
  387. } else {
  388. c.stopPongTimer()
  389. }
  390. case <-c.pong:
  391. c.Logger.Debug("Send Pong")
  392. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  393. if err != nil {
  394. c.Logger.Error("Failed to send PacketPong", "err", err)
  395. break SELECTION
  396. }
  397. c.sendMonitor.Update(_n)
  398. c.flush()
  399. case <-c.quitSendRoutine:
  400. break FOR_LOOP
  401. case <-c.send:
  402. // Send some PacketMsgs
  403. eof := c.sendSomePacketMsgs()
  404. if !eof {
  405. // Keep sendRoutine awake.
  406. select {
  407. case c.send <- struct{}{}:
  408. default:
  409. }
  410. }
  411. }
  412. if !c.IsRunning() {
  413. break FOR_LOOP
  414. }
  415. if err != nil {
  416. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  417. c.stopForError(err)
  418. break FOR_LOOP
  419. }
  420. }
  421. // Cleanup
  422. c.stopPongTimer()
  423. close(c.doneSendRoutine)
  424. }
  425. // Returns true if messages from channels were exhausted.
  426. // Blocks in accordance to .sendMonitor throttling.
  427. func (c *MConnection) sendSomePacketMsgs() bool {
  428. // Block until .sendMonitor says we can write.
  429. // Once we're ready we send more than we asked for,
  430. // but amortized it should even out.
  431. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  432. // Now send some PacketMsgs.
  433. for i := 0; i < numBatchPacketMsgs; i++ {
  434. if c.sendPacketMsg() {
  435. return true
  436. }
  437. }
  438. return false
  439. }
  440. // Returns true if messages from channels were exhausted.
  441. func (c *MConnection) sendPacketMsg() bool {
  442. // Choose a channel to create a PacketMsg from.
  443. // The chosen channel will be the one whose recentlySent/priority is the least.
  444. var leastRatio float32 = math.MaxFloat32
  445. var leastChannel *Channel
  446. for _, channel := range c.channels {
  447. // If nothing to send, skip this channel
  448. if !channel.isSendPending() {
  449. continue
  450. }
  451. // Get ratio, and keep track of lowest ratio.
  452. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  453. if ratio < leastRatio {
  454. leastRatio = ratio
  455. leastChannel = channel
  456. }
  457. }
  458. // Nothing to send?
  459. if leastChannel == nil {
  460. return true
  461. }
  462. // c.Logger.Info("Found a msgPacket to send")
  463. // Make & send a PacketMsg from this channel
  464. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  465. if err != nil {
  466. c.Logger.Error("Failed to write PacketMsg", "err", err)
  467. c.stopForError(err)
  468. return true
  469. }
  470. c.sendMonitor.Update(_n)
  471. c.flushTimer.Set()
  472. return false
  473. }
  474. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  475. // After a whole message has been assembled, it's pushed to onReceive().
  476. // Blocks depending on how the connection is throttled.
  477. // Otherwise, it never blocks.
  478. func (c *MConnection) recvRoutine() {
  479. defer c._recover()
  480. protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)
  481. FOR_LOOP:
  482. for {
  483. // Block until .recvMonitor says we can read.
  484. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  485. // Peek into bufConnReader for debugging
  486. /*
  487. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  488. bz, err := c.bufConnReader.Peek(tmmath.MinInt(numBytes, 100))
  489. if err == nil {
  490. // return
  491. } else {
  492. c.Logger.Debug("Error peeking connection buffer", "err", err)
  493. // return nil
  494. }
  495. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  496. }
  497. */
  498. // Read packet type
  499. var packet tmp2p.Packet
  500. err := protoReader.ReadMsg(&packet)
  501. if err != nil {
  502. // stopServices was invoked and we are shutting down
  503. // receiving is excpected to fail since we will close the connection
  504. select {
  505. case <-c.quitRecvRoutine:
  506. break FOR_LOOP
  507. default:
  508. }
  509. if c.IsRunning() {
  510. if err == io.EOF {
  511. c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
  512. } else {
  513. c.Logger.Debug("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  514. }
  515. c.stopForError(err)
  516. }
  517. break FOR_LOOP
  518. }
  519. // Read more depending on packet type.
  520. switch pkt := packet.Sum.(type) {
  521. case *tmp2p.Packet_PacketPing:
  522. // TODO: prevent abuse, as they cause flush()'s.
  523. // https://github.com/tendermint/tendermint/issues/1190
  524. c.Logger.Debug("Receive Ping")
  525. select {
  526. case c.pong <- struct{}{}:
  527. default:
  528. // never block
  529. }
  530. case *tmp2p.Packet_PacketPong:
  531. c.Logger.Debug("Receive Pong")
  532. select {
  533. case c.pongTimeoutCh <- false:
  534. default:
  535. // never block
  536. }
  537. case *tmp2p.Packet_PacketMsg:
  538. channel, ok := c.channelsIdx[byte(pkt.PacketMsg.ChannelID)]
  539. if !ok || channel == nil {
  540. err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
  541. c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
  542. c.stopForError(err)
  543. break FOR_LOOP
  544. }
  545. msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
  546. if err != nil {
  547. if c.IsRunning() {
  548. c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
  549. c.stopForError(err)
  550. }
  551. break FOR_LOOP
  552. }
  553. if msgBytes != nil {
  554. c.Logger.Debug("Received bytes", "chID", pkt.PacketMsg.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  555. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  556. c.onReceive(byte(pkt.PacketMsg.ChannelID), msgBytes)
  557. }
  558. default:
  559. err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
  560. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  561. c.stopForError(err)
  562. break FOR_LOOP
  563. }
  564. }
  565. // Cleanup
  566. close(c.pong)
  567. for range c.pong {
  568. // Drain
  569. }
  570. }
  571. // not goroutine-safe
  572. func (c *MConnection) stopPongTimer() {
  573. if c.pongTimer != nil {
  574. _ = c.pongTimer.Stop()
  575. c.pongTimer = nil
  576. }
  577. }
  578. // maxPacketMsgSize returns a maximum size of PacketMsg
  579. func (c *MConnection) maxPacketMsgSize() int {
  580. bz, err := proto.Marshal(mustWrapPacket(&tmp2p.PacketMsg{
  581. ChannelID: 0x01,
  582. EOF: true,
  583. Data: make([]byte, c.config.MaxPacketMsgPayloadSize),
  584. }))
  585. if err != nil {
  586. panic(err)
  587. }
  588. return len(bz)
  589. }
  590. type ConnectionStatus struct {
  591. Duration time.Duration
  592. SendMonitor flow.Status
  593. RecvMonitor flow.Status
  594. Channels []ChannelStatus
  595. }
  596. type ChannelStatus struct {
  597. ID byte
  598. SendQueueCapacity int
  599. SendQueueSize int
  600. Priority int
  601. RecentlySent int64
  602. }
  603. func (c *MConnection) Status() ConnectionStatus {
  604. var status ConnectionStatus
  605. status.Duration = time.Since(c.created)
  606. status.SendMonitor = c.sendMonitor.Status()
  607. status.RecvMonitor = c.recvMonitor.Status()
  608. status.Channels = make([]ChannelStatus, len(c.channels))
  609. for i, channel := range c.channels {
  610. status.Channels[i] = ChannelStatus{
  611. ID: channel.desc.ID,
  612. SendQueueCapacity: cap(channel.sendQueue),
  613. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  614. Priority: channel.desc.Priority,
  615. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  616. }
  617. }
  618. return status
  619. }
  620. //-----------------------------------------------------------------------------
  621. type ChannelDescriptor struct {
  622. ID byte
  623. Priority int
  624. SendQueueCapacity int
  625. RecvBufferCapacity int
  626. RecvMessageCapacity int
  627. }
  628. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  629. if chDesc.SendQueueCapacity == 0 {
  630. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  631. }
  632. if chDesc.RecvBufferCapacity == 0 {
  633. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  634. }
  635. if chDesc.RecvMessageCapacity == 0 {
  636. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  637. }
  638. filled = chDesc
  639. return
  640. }
  641. // TODO: lowercase.
  642. // NOTE: not goroutine-safe.
  643. type Channel struct {
  644. conn *MConnection
  645. desc ChannelDescriptor
  646. sendQueue chan []byte
  647. sendQueueSize int32 // atomic.
  648. recving []byte
  649. sending []byte
  650. recentlySent int64 // exponential moving average
  651. maxPacketMsgPayloadSize int
  652. Logger log.Logger
  653. }
  654. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  655. desc = desc.FillDefaults()
  656. if desc.Priority <= 0 {
  657. panic("Channel default priority must be a positive integer")
  658. }
  659. return &Channel{
  660. conn: conn,
  661. desc: desc,
  662. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  663. recving: make([]byte, 0, desc.RecvBufferCapacity),
  664. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  665. }
  666. }
  667. func (ch *Channel) SetLogger(l log.Logger) {
  668. ch.Logger = l
  669. }
  670. // Queues message to send to this channel.
  671. // Goroutine-safe
  672. // Times out (and returns false) after defaultSendTimeout
  673. func (ch *Channel) sendBytes(bytes []byte) bool {
  674. select {
  675. case ch.sendQueue <- bytes:
  676. atomic.AddInt32(&ch.sendQueueSize, 1)
  677. return true
  678. case <-time.After(defaultSendTimeout):
  679. return false
  680. }
  681. }
  682. // Queues message to send to this channel.
  683. // Nonblocking, returns true if successful.
  684. // Goroutine-safe
  685. func (ch *Channel) trySendBytes(bytes []byte) bool {
  686. select {
  687. case ch.sendQueue <- bytes:
  688. atomic.AddInt32(&ch.sendQueueSize, 1)
  689. return true
  690. default:
  691. return false
  692. }
  693. }
  694. // Goroutine-safe
  695. func (ch *Channel) loadSendQueueSize() (size int) {
  696. return int(atomic.LoadInt32(&ch.sendQueueSize))
  697. }
  698. // Goroutine-safe
  699. // Use only as a heuristic.
  700. func (ch *Channel) canSend() bool {
  701. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  702. }
  703. // Returns true if any PacketMsgs are pending to be sent.
  704. // Call before calling nextPacketMsg()
  705. // Goroutine-safe
  706. func (ch *Channel) isSendPending() bool {
  707. if len(ch.sending) == 0 {
  708. if len(ch.sendQueue) == 0 {
  709. return false
  710. }
  711. ch.sending = <-ch.sendQueue
  712. }
  713. return true
  714. }
  715. // Creates a new PacketMsg to send.
  716. // Not goroutine-safe
  717. func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg {
  718. packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)}
  719. maxSize := ch.maxPacketMsgPayloadSize
  720. packet.Data = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))]
  721. if len(ch.sending) <= maxSize {
  722. packet.EOF = true
  723. ch.sending = nil
  724. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  725. } else {
  726. packet.EOF = false
  727. ch.sending = ch.sending[tmmath.MinInt(maxSize, len(ch.sending)):]
  728. }
  729. return packet
  730. }
  731. // Writes next PacketMsg to w and updates c.recentlySent.
  732. // Not goroutine-safe
  733. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) {
  734. packet := ch.nextPacketMsg()
  735. n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet))
  736. atomic.AddInt64(&ch.recentlySent, int64(n))
  737. return
  738. }
  739. // Handles incoming PacketMsgs. It returns a message bytes if message is
  740. // complete, which is owned by the caller and will not be modified.
  741. // Not goroutine-safe
  742. func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
  743. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  744. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data)
  745. if recvCap < recvReceived {
  746. return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  747. }
  748. ch.recving = append(ch.recving, packet.Data...)
  749. if packet.EOF {
  750. msgBytes := ch.recving
  751. ch.recving = make([]byte, 0, ch.desc.RecvBufferCapacity)
  752. return msgBytes, nil
  753. }
  754. return nil, nil
  755. }
  756. // Call this periodically to update stats for throttling purposes.
  757. // Not goroutine-safe
  758. func (ch *Channel) updateStats() {
  759. // Exponential decay of stats.
  760. // TODO: optimize.
  761. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  762. }
  763. //----------------------------------------
  764. // Packet
  765. // mustWrapPacket takes a packet kind (oneof) and wraps it in a tmp2p.Packet message.
  766. func mustWrapPacket(pb proto.Message) *tmp2p.Packet {
  767. var msg tmp2p.Packet
  768. switch pb := pb.(type) {
  769. case *tmp2p.Packet: // already a packet
  770. msg = *pb
  771. case *tmp2p.PacketPing:
  772. msg = tmp2p.Packet{
  773. Sum: &tmp2p.Packet_PacketPing{
  774. PacketPing: pb,
  775. },
  776. }
  777. case *tmp2p.PacketPong:
  778. msg = tmp2p.Packet{
  779. Sum: &tmp2p.Packet_PacketPong{
  780. PacketPong: pb,
  781. },
  782. }
  783. case *tmp2p.PacketMsg:
  784. msg = tmp2p.Packet{
  785. Sum: &tmp2p.Packet_PacketMsg{
  786. PacketMsg: pb,
  787. },
  788. }
  789. default:
  790. panic(fmt.Errorf("unknown packet type %T", pb))
  791. }
  792. return &msg
  793. }