You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

915 lines
24 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "runtime/debug"
  11. "sync/atomic"
  12. "time"
  13. "github.com/gogo/protobuf/proto"
  14. flow "github.com/tendermint/tendermint/libs/flowrate"
  15. "github.com/tendermint/tendermint/libs/log"
  16. tmmath "github.com/tendermint/tendermint/libs/math"
  17. "github.com/tendermint/tendermint/libs/protoio"
  18. "github.com/tendermint/tendermint/libs/service"
  19. tmsync "github.com/tendermint/tendermint/libs/sync"
  20. "github.com/tendermint/tendermint/libs/timer"
  21. tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  22. )
  23. const (
  24. defaultMaxPacketMsgPayloadSize = 1024
  25. numBatchPacketMsgs = 10
  26. minReadBufferSize = 1024
  27. minWriteBufferSize = 65536
  28. updateStats = 2 * time.Second
  29. // some of these defaults are written in the user config
  30. // flushThrottle, sendRate, recvRate
  31. // TODO: remove values present in config
  32. defaultFlushThrottle = 100 * time.Millisecond
  33. defaultSendQueueCapacity = 1
  34. defaultRecvBufferCapacity = 4096
  35. defaultRecvMessageCapacity = 22020096 // 21MB
  36. defaultSendRate = int64(512000) // 500KB/s
  37. defaultRecvRate = int64(512000) // 500KB/s
  38. defaultSendTimeout = 10 * time.Second
  39. defaultPingInterval = 60 * time.Second
  40. defaultPongTimeout = 45 * time.Second
  41. )
  42. type receiveCbFunc func(chID byte, msgBytes []byte)
  43. type errorCbFunc func(interface{})
  44. /*
  45. Each peer has one `MConnection` (multiplex connection) instance.
  46. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  47. several messages along a single channel of communication.
  48. Each `MConnection` handles message transmission on multiple abstract communication
  49. `Channel`s. Each channel has a globally unique byte id.
  50. The byte id and the relative priorities of each `Channel` are configured upon
  51. initialization of the connection.
  52. There are two methods for sending messages:
  53. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  54. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  55. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  56. successfully queued for the channel with the given id byte `chID`, or until the
  57. request times out. The message `msg` is serialized using Protobuf.
  58. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  59. channel's queue is full.
  60. Inbound message bytes are handled with an onReceive callback function.
  61. */
  62. type MConnection struct {
  63. service.BaseService
  64. conn net.Conn
  65. bufConnReader *bufio.Reader
  66. bufConnWriter *bufio.Writer
  67. sendMonitor *flow.Monitor
  68. recvMonitor *flow.Monitor
  69. send chan struct{}
  70. pong chan struct{}
  71. channels []*Channel
  72. channelsIdx map[byte]*Channel
  73. onReceive receiveCbFunc
  74. onError errorCbFunc
  75. errored uint32
  76. config MConnConfig
  77. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  78. // doneSendRoutine is closed when the sendRoutine actually quits.
  79. quitSendRoutine chan struct{}
  80. doneSendRoutine chan struct{}
  81. // Closing quitRecvRouting will cause the recvRouting to eventually quit.
  82. quitRecvRoutine chan struct{}
  83. // used to ensure FlushStop and OnStop
  84. // are safe to call concurrently.
  85. stopMtx tmsync.Mutex
  86. flushTimer *timer.ThrottleTimer // flush writes as necessary but throttled.
  87. pingTimer *time.Ticker // send pings periodically
  88. // close conn if pong is not received in pongTimeout
  89. pongTimer *time.Timer
  90. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  91. chStatsTimer *time.Ticker // update channel stats periodically
  92. created time.Time // time of creation
  93. _maxPacketMsgSize int
  94. }
  95. // MConnConfig is a MConnection configuration.
  96. type MConnConfig struct {
  97. SendRate int64 `mapstructure:"send_rate"`
  98. RecvRate int64 `mapstructure:"recv_rate"`
  99. // Maximum payload size
  100. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  101. // Interval to flush writes (throttled)
  102. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  103. // Interval to send pings
  104. PingInterval time.Duration `mapstructure:"ping_interval"`
  105. // Maximum wait time for pongs
  106. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  107. }
  108. // DefaultMConnConfig returns the default config.
  109. func DefaultMConnConfig() MConnConfig {
  110. return MConnConfig{
  111. SendRate: defaultSendRate,
  112. RecvRate: defaultRecvRate,
  113. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  114. FlushThrottle: defaultFlushThrottle,
  115. PingInterval: defaultPingInterval,
  116. PongTimeout: defaultPongTimeout,
  117. }
  118. }
  119. // NewMConnection wraps net.Conn and creates multiplex connection
  120. func NewMConnection(
  121. conn net.Conn,
  122. chDescs []*ChannelDescriptor,
  123. onReceive receiveCbFunc,
  124. onError errorCbFunc,
  125. ) *MConnection {
  126. return NewMConnectionWithConfig(
  127. conn,
  128. chDescs,
  129. onReceive,
  130. onError,
  131. DefaultMConnConfig())
  132. }
  133. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  134. func NewMConnectionWithConfig(
  135. conn net.Conn,
  136. chDescs []*ChannelDescriptor,
  137. onReceive receiveCbFunc,
  138. onError errorCbFunc,
  139. config MConnConfig,
  140. ) *MConnection {
  141. if config.PongTimeout >= config.PingInterval {
  142. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  143. }
  144. mconn := &MConnection{
  145. conn: conn,
  146. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  147. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  148. sendMonitor: flow.New(0, 0),
  149. recvMonitor: flow.New(0, 0),
  150. send: make(chan struct{}, 1),
  151. pong: make(chan struct{}, 1),
  152. onReceive: onReceive,
  153. onError: onError,
  154. config: config,
  155. created: time.Now(),
  156. }
  157. // Create channels
  158. var channelsIdx = map[byte]*Channel{}
  159. var channels = []*Channel{}
  160. for _, desc := range chDescs {
  161. channel := newChannel(mconn, *desc)
  162. channelsIdx[channel.desc.ID] = channel
  163. channels = append(channels, channel)
  164. }
  165. mconn.channels = channels
  166. mconn.channelsIdx = channelsIdx
  167. mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn)
  168. // maxPacketMsgSize() is a bit heavy, so call just once
  169. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  170. return mconn
  171. }
  172. func (c *MConnection) SetLogger(l log.Logger) {
  173. c.BaseService.SetLogger(l)
  174. for _, ch := range c.channels {
  175. ch.SetLogger(l)
  176. }
  177. }
  178. // OnStart implements BaseService
  179. func (c *MConnection) OnStart() error {
  180. if err := c.BaseService.OnStart(); err != nil {
  181. return err
  182. }
  183. c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle)
  184. c.pingTimer = time.NewTicker(c.config.PingInterval)
  185. c.pongTimeoutCh = make(chan bool, 1)
  186. c.chStatsTimer = time.NewTicker(updateStats)
  187. c.quitSendRoutine = make(chan struct{})
  188. c.doneSendRoutine = make(chan struct{})
  189. c.quitRecvRoutine = make(chan struct{})
  190. go c.sendRoutine()
  191. go c.recvRoutine()
  192. return nil
  193. }
  194. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  195. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  196. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  197. func (c *MConnection) stopServices() (alreadyStopped bool) {
  198. c.stopMtx.Lock()
  199. defer c.stopMtx.Unlock()
  200. select {
  201. case <-c.quitSendRoutine:
  202. // already quit
  203. return true
  204. default:
  205. }
  206. select {
  207. case <-c.quitRecvRoutine:
  208. // already quit
  209. return true
  210. default:
  211. }
  212. c.BaseService.OnStop()
  213. c.flushTimer.Stop()
  214. c.pingTimer.Stop()
  215. c.chStatsTimer.Stop()
  216. // inform the recvRouting that we are shutting down
  217. close(c.quitRecvRoutine)
  218. close(c.quitSendRoutine)
  219. return false
  220. }
  221. // FlushStop replicates the logic of OnStop.
  222. // It additionally ensures that all successful
  223. // .Send() calls will get flushed before closing
  224. // the connection.
  225. func (c *MConnection) FlushStop() {
  226. if c.stopServices() {
  227. return
  228. }
  229. // this block is unique to FlushStop
  230. {
  231. // wait until the sendRoutine exits
  232. // so we dont race on calling sendSomePacketMsgs
  233. <-c.doneSendRoutine
  234. // Send and flush all pending msgs.
  235. // Since sendRoutine has exited, we can call this
  236. // safely
  237. eof := c.sendSomePacketMsgs()
  238. for !eof {
  239. eof = c.sendSomePacketMsgs()
  240. }
  241. c.flush()
  242. // Now we can close the connection
  243. }
  244. c.conn.Close()
  245. // We can't close pong safely here because
  246. // recvRoutine may write to it after we've stopped.
  247. // Though it doesn't need to get closed at all,
  248. // we close it @ recvRoutine.
  249. // c.Stop()
  250. }
  251. // OnStop implements BaseService
  252. func (c *MConnection) OnStop() {
  253. if c.stopServices() {
  254. return
  255. }
  256. c.conn.Close()
  257. // We can't close pong safely here because
  258. // recvRoutine may write to it after we've stopped.
  259. // Though it doesn't need to get closed at all,
  260. // we close it @ recvRoutine.
  261. }
  262. func (c *MConnection) String() string {
  263. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  264. }
  265. func (c *MConnection) flush() {
  266. c.Logger.Debug("Flush", "conn", c)
  267. err := c.bufConnWriter.Flush()
  268. if err != nil {
  269. c.Logger.Debug("MConnection flush failed", "err", err)
  270. }
  271. }
  272. // Catch panics, usually caused by remote disconnects.
  273. func (c *MConnection) _recover() {
  274. if r := recover(); r != nil {
  275. c.Logger.Error("MConnection panicked", "err", r, "stack", string(debug.Stack()))
  276. c.stopForError(fmt.Errorf("recovered from panic: %v", r))
  277. }
  278. }
  279. func (c *MConnection) stopForError(r interface{}) {
  280. if err := c.Stop(); err != nil {
  281. c.Logger.Error("Error stopping connection", "err", err)
  282. }
  283. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  284. if c.onError != nil {
  285. c.onError(r)
  286. }
  287. }
  288. }
  289. // Queues a message to be sent to channel.
  290. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  291. if !c.IsRunning() {
  292. return false
  293. }
  294. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  295. // Send message to channel.
  296. channel, ok := c.channelsIdx[chID]
  297. if !ok {
  298. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  299. return false
  300. }
  301. success := channel.sendBytes(msgBytes)
  302. if success {
  303. // Wake up sendRoutine if necessary
  304. select {
  305. case c.send <- struct{}{}:
  306. default:
  307. }
  308. } else {
  309. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  310. }
  311. return success
  312. }
  313. // Queues a message to be sent to channel.
  314. // Nonblocking, returns true if successful.
  315. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  316. if !c.IsRunning() {
  317. return false
  318. }
  319. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  320. // Send message to channel.
  321. channel, ok := c.channelsIdx[chID]
  322. if !ok {
  323. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  324. return false
  325. }
  326. ok = channel.trySendBytes(msgBytes)
  327. if ok {
  328. // Wake up sendRoutine if necessary
  329. select {
  330. case c.send <- struct{}{}:
  331. default:
  332. }
  333. }
  334. return ok
  335. }
  336. // CanSend returns true if you can send more data onto the chID, false
  337. // otherwise. Use only as a heuristic.
  338. func (c *MConnection) CanSend(chID byte) bool {
  339. if !c.IsRunning() {
  340. return false
  341. }
  342. channel, ok := c.channelsIdx[chID]
  343. if !ok {
  344. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  345. return false
  346. }
  347. return channel.canSend()
  348. }
  349. // sendRoutine polls for packets to send from channels.
  350. func (c *MConnection) sendRoutine() {
  351. defer c._recover()
  352. protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter)
  353. FOR_LOOP:
  354. for {
  355. var _n int
  356. var err error
  357. SELECTION:
  358. select {
  359. case <-c.flushTimer.Ch:
  360. // NOTE: flushTimer.Set() must be called every time
  361. // something is written to .bufConnWriter.
  362. c.flush()
  363. case <-c.chStatsTimer.C:
  364. for _, channel := range c.channels {
  365. channel.updateStats()
  366. }
  367. case <-c.pingTimer.C:
  368. c.Logger.Debug("Send Ping")
  369. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  370. if err != nil {
  371. c.Logger.Error("Failed to send PacketPing", "err", err)
  372. break SELECTION
  373. }
  374. c.sendMonitor.Update(_n)
  375. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  376. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  377. select {
  378. case c.pongTimeoutCh <- true:
  379. default:
  380. }
  381. })
  382. c.flush()
  383. case timeout := <-c.pongTimeoutCh:
  384. if timeout {
  385. c.Logger.Debug("Pong timeout")
  386. err = errors.New("pong timeout")
  387. } else {
  388. c.stopPongTimer()
  389. }
  390. case <-c.pong:
  391. c.Logger.Debug("Send Pong")
  392. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  393. if err != nil {
  394. c.Logger.Error("Failed to send PacketPong", "err", err)
  395. break SELECTION
  396. }
  397. c.sendMonitor.Update(_n)
  398. c.flush()
  399. case <-c.quitSendRoutine:
  400. break FOR_LOOP
  401. case <-c.send:
  402. // Send some PacketMsgs
  403. eof := c.sendSomePacketMsgs()
  404. if !eof {
  405. // Keep sendRoutine awake.
  406. select {
  407. case c.send <- struct{}{}:
  408. default:
  409. }
  410. }
  411. }
  412. if !c.IsRunning() {
  413. break FOR_LOOP
  414. }
  415. if err != nil {
  416. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  417. c.stopForError(err)
  418. break FOR_LOOP
  419. }
  420. }
  421. // Cleanup
  422. c.stopPongTimer()
  423. close(c.doneSendRoutine)
  424. }
  425. // Returns true if messages from channels were exhausted.
  426. // Blocks in accordance to .sendMonitor throttling.
  427. func (c *MConnection) sendSomePacketMsgs() bool {
  428. // Block until .sendMonitor says we can write.
  429. // Once we're ready we send more than we asked for,
  430. // but amortized it should even out.
  431. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  432. // Now send some PacketMsgs.
  433. for i := 0; i < numBatchPacketMsgs; i++ {
  434. if c.sendPacketMsg() {
  435. return true
  436. }
  437. }
  438. return false
  439. }
  440. // Returns true if messages from channels were exhausted.
  441. func (c *MConnection) sendPacketMsg() bool {
  442. // Choose a channel to create a PacketMsg from.
  443. // The chosen channel will be the one whose recentlySent/priority is the least.
  444. var leastRatio float32 = math.MaxFloat32
  445. var leastChannel *Channel
  446. for _, channel := range c.channels {
  447. // If nothing to send, skip this channel
  448. if !channel.isSendPending() {
  449. continue
  450. }
  451. // Get ratio, and keep track of lowest ratio.
  452. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  453. if ratio < leastRatio {
  454. leastRatio = ratio
  455. leastChannel = channel
  456. }
  457. }
  458. // Nothing to send?
  459. if leastChannel == nil {
  460. return true
  461. }
  462. // c.Logger.Info("Found a msgPacket to send")
  463. // Make & send a PacketMsg from this channel
  464. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  465. if err != nil {
  466. c.Logger.Error("Failed to write PacketMsg", "err", err)
  467. c.stopForError(err)
  468. return true
  469. }
  470. c.sendMonitor.Update(_n)
  471. c.flushTimer.Set()
  472. return false
  473. }
  474. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  475. // After a whole message has been assembled, it's pushed to onReceive().
  476. // Blocks depending on how the connection is throttled.
  477. // Otherwise, it never blocks.
  478. func (c *MConnection) recvRoutine() {
  479. defer c._recover()
  480. protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)
  481. FOR_LOOP:
  482. for {
  483. // Block until .recvMonitor says we can read.
  484. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  485. // Peek into bufConnReader for debugging
  486. /*
  487. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  488. bz, err := c.bufConnReader.Peek(tmmath.MinInt(numBytes, 100))
  489. if err == nil {
  490. // return
  491. } else {
  492. c.Logger.Debug("Error peeking connection buffer", "err", err)
  493. // return nil
  494. }
  495. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  496. }
  497. */
  498. // Read packet type
  499. var packet tmp2p.Packet
  500. _n, err := protoReader.ReadMsg(&packet)
  501. c.recvMonitor.Update(_n)
  502. if err != nil {
  503. // stopServices was invoked and we are shutting down
  504. // receiving is excpected to fail since we will close the connection
  505. select {
  506. case <-c.quitRecvRoutine:
  507. break FOR_LOOP
  508. default:
  509. }
  510. if c.IsRunning() {
  511. if err == io.EOF {
  512. c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
  513. } else {
  514. c.Logger.Debug("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  515. }
  516. c.stopForError(err)
  517. }
  518. break FOR_LOOP
  519. }
  520. // Read more depending on packet type.
  521. switch pkt := packet.Sum.(type) {
  522. case *tmp2p.Packet_PacketPing:
  523. // TODO: prevent abuse, as they cause flush()'s.
  524. // https://github.com/tendermint/tendermint/issues/1190
  525. c.Logger.Debug("Receive Ping")
  526. select {
  527. case c.pong <- struct{}{}:
  528. default:
  529. // never block
  530. }
  531. case *tmp2p.Packet_PacketPong:
  532. c.Logger.Debug("Receive Pong")
  533. select {
  534. case c.pongTimeoutCh <- false:
  535. default:
  536. // never block
  537. }
  538. case *tmp2p.Packet_PacketMsg:
  539. channel, ok := c.channelsIdx[byte(pkt.PacketMsg.ChannelID)]
  540. if !ok || channel == nil {
  541. err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
  542. c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
  543. c.stopForError(err)
  544. break FOR_LOOP
  545. }
  546. msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
  547. if err != nil {
  548. if c.IsRunning() {
  549. c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
  550. c.stopForError(err)
  551. }
  552. break FOR_LOOP
  553. }
  554. if msgBytes != nil {
  555. c.Logger.Debug("Received bytes", "chID", pkt.PacketMsg.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  556. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  557. c.onReceive(byte(pkt.PacketMsg.ChannelID), msgBytes)
  558. }
  559. default:
  560. err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
  561. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  562. c.stopForError(err)
  563. break FOR_LOOP
  564. }
  565. }
  566. // Cleanup
  567. close(c.pong)
  568. for range c.pong {
  569. // Drain
  570. }
  571. }
  572. // not goroutine-safe
  573. func (c *MConnection) stopPongTimer() {
  574. if c.pongTimer != nil {
  575. _ = c.pongTimer.Stop()
  576. c.pongTimer = nil
  577. }
  578. }
  579. // maxPacketMsgSize returns a maximum size of PacketMsg
  580. func (c *MConnection) maxPacketMsgSize() int {
  581. bz, err := proto.Marshal(mustWrapPacket(&tmp2p.PacketMsg{
  582. ChannelID: 0x01,
  583. EOF: true,
  584. Data: make([]byte, c.config.MaxPacketMsgPayloadSize),
  585. }))
  586. if err != nil {
  587. panic(err)
  588. }
  589. return len(bz)
  590. }
  591. type ConnectionStatus struct {
  592. Duration time.Duration
  593. SendMonitor flow.Status
  594. RecvMonitor flow.Status
  595. Channels []ChannelStatus
  596. }
  597. type ChannelStatus struct {
  598. ID byte
  599. SendQueueCapacity int
  600. SendQueueSize int
  601. Priority int
  602. RecentlySent int64
  603. }
  604. func (c *MConnection) Status() ConnectionStatus {
  605. var status ConnectionStatus
  606. status.Duration = time.Since(c.created)
  607. status.SendMonitor = c.sendMonitor.Status()
  608. status.RecvMonitor = c.recvMonitor.Status()
  609. status.Channels = make([]ChannelStatus, len(c.channels))
  610. for i, channel := range c.channels {
  611. status.Channels[i] = ChannelStatus{
  612. ID: channel.desc.ID,
  613. SendQueueCapacity: cap(channel.sendQueue),
  614. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  615. Priority: channel.desc.Priority,
  616. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  617. }
  618. }
  619. return status
  620. }
  621. //-----------------------------------------------------------------------------
  622. type ChannelDescriptor struct {
  623. ID byte
  624. Priority int
  625. SendQueueCapacity int
  626. RecvBufferCapacity int
  627. RecvMessageCapacity int
  628. }
  629. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  630. if chDesc.SendQueueCapacity == 0 {
  631. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  632. }
  633. if chDesc.RecvBufferCapacity == 0 {
  634. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  635. }
  636. if chDesc.RecvMessageCapacity == 0 {
  637. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  638. }
  639. filled = chDesc
  640. return
  641. }
  642. // TODO: lowercase.
  643. // NOTE: not goroutine-safe.
  644. type Channel struct {
  645. conn *MConnection
  646. desc ChannelDescriptor
  647. sendQueue chan []byte
  648. sendQueueSize int32 // atomic.
  649. recving []byte
  650. sending []byte
  651. recentlySent int64 // exponential moving average
  652. maxPacketMsgPayloadSize int
  653. Logger log.Logger
  654. }
  655. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  656. desc = desc.FillDefaults()
  657. if desc.Priority <= 0 {
  658. panic("Channel default priority must be a positive integer")
  659. }
  660. return &Channel{
  661. conn: conn,
  662. desc: desc,
  663. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  664. recving: make([]byte, 0, desc.RecvBufferCapacity),
  665. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  666. }
  667. }
  668. func (ch *Channel) SetLogger(l log.Logger) {
  669. ch.Logger = l
  670. }
  671. // Queues message to send to this channel.
  672. // Goroutine-safe
  673. // Times out (and returns false) after defaultSendTimeout
  674. func (ch *Channel) sendBytes(bytes []byte) bool {
  675. select {
  676. case ch.sendQueue <- bytes:
  677. atomic.AddInt32(&ch.sendQueueSize, 1)
  678. return true
  679. case <-time.After(defaultSendTimeout):
  680. return false
  681. }
  682. }
  683. // Queues message to send to this channel.
  684. // Nonblocking, returns true if successful.
  685. // Goroutine-safe
  686. func (ch *Channel) trySendBytes(bytes []byte) bool {
  687. select {
  688. case ch.sendQueue <- bytes:
  689. atomic.AddInt32(&ch.sendQueueSize, 1)
  690. return true
  691. default:
  692. return false
  693. }
  694. }
  695. // Goroutine-safe
  696. func (ch *Channel) loadSendQueueSize() (size int) {
  697. return int(atomic.LoadInt32(&ch.sendQueueSize))
  698. }
  699. // Goroutine-safe
  700. // Use only as a heuristic.
  701. func (ch *Channel) canSend() bool {
  702. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  703. }
  704. // Returns true if any PacketMsgs are pending to be sent.
  705. // Call before calling nextPacketMsg()
  706. // Goroutine-safe
  707. func (ch *Channel) isSendPending() bool {
  708. if len(ch.sending) == 0 {
  709. if len(ch.sendQueue) == 0 {
  710. return false
  711. }
  712. ch.sending = <-ch.sendQueue
  713. }
  714. return true
  715. }
  716. // Creates a new PacketMsg to send.
  717. // Not goroutine-safe
  718. func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg {
  719. packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)}
  720. maxSize := ch.maxPacketMsgPayloadSize
  721. packet.Data = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))]
  722. if len(ch.sending) <= maxSize {
  723. packet.EOF = true
  724. ch.sending = nil
  725. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  726. } else {
  727. packet.EOF = false
  728. ch.sending = ch.sending[tmmath.MinInt(maxSize, len(ch.sending)):]
  729. }
  730. return packet
  731. }
  732. // Writes next PacketMsg to w and updates c.recentlySent.
  733. // Not goroutine-safe
  734. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) {
  735. packet := ch.nextPacketMsg()
  736. n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet))
  737. atomic.AddInt64(&ch.recentlySent, int64(n))
  738. return
  739. }
  740. // Handles incoming PacketMsgs. It returns a message bytes if message is
  741. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  742. // Not goroutine-safe
  743. func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
  744. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  745. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data)
  746. if recvCap < recvReceived {
  747. return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  748. }
  749. ch.recving = append(ch.recving, packet.Data...)
  750. if packet.EOF {
  751. msgBytes := ch.recving
  752. // clear the slice without re-allocating.
  753. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  754. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  755. // at which point the recving slice stops being used and should be garbage collected
  756. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  757. return msgBytes, nil
  758. }
  759. return nil, nil
  760. }
  761. // Call this periodically to update stats for throttling purposes.
  762. // Not goroutine-safe
  763. func (ch *Channel) updateStats() {
  764. // Exponential decay of stats.
  765. // TODO: optimize.
  766. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  767. }
  768. //----------------------------------------
  769. // Packet
  770. // mustWrapPacket takes a packet kind (oneof) and wraps it in a tmp2p.Packet message.
  771. func mustWrapPacket(pb proto.Message) *tmp2p.Packet {
  772. var msg tmp2p.Packet
  773. switch pb := pb.(type) {
  774. case *tmp2p.Packet: // already a packet
  775. msg = *pb
  776. case *tmp2p.PacketPing:
  777. msg = tmp2p.Packet{
  778. Sum: &tmp2p.Packet_PacketPing{
  779. PacketPing: pb,
  780. },
  781. }
  782. case *tmp2p.PacketPong:
  783. msg = tmp2p.Packet{
  784. Sum: &tmp2p.Packet_PacketPong{
  785. PacketPong: pb,
  786. },
  787. }
  788. case *tmp2p.PacketMsg:
  789. msg = tmp2p.Packet{
  790. Sum: &tmp2p.Packet_PacketMsg{
  791. PacketMsg: pb,
  792. },
  793. }
  794. default:
  795. panic(fmt.Errorf("unknown packet type %T", pb))
  796. }
  797. return &msg
  798. }