You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

695 lines
18 KiB

9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. wire "github.com/tendermint/go-wire"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. flow "github.com/tendermint/tmlibs/flowrate"
  14. )
  15. const (
  16. numBatchMsgPackets = 10
  17. minReadBufferSize = 1024
  18. minWriteBufferSize = 65536
  19. updateState = 2 * time.Second
  20. pingTimeout = 40 * time.Second
  21. // flushThrottle used here as a default.
  22. // overwritten by the user config.
  23. // TODO: remove
  24. flushThrottle = 100 * time.Millisecond
  25. defaultSendQueueCapacity = 1
  26. defaultSendRate = int64(512000) // 500KB/s
  27. defaultRecvBufferCapacity = 4096
  28. defaultRecvMessageCapacity = 22020096 // 21MB
  29. defaultRecvRate = int64(512000) // 500KB/s
  30. defaultSendTimeout = 10 * time.Second
  31. )
  32. type receiveCbFunc func(chID byte, msgBytes []byte)
  33. type errorCbFunc func(interface{})
  34. /*
  35. Each peer has one `MConnection` (multiplex connection) instance.
  36. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  37. several messages along a single channel of communication.
  38. Each `MConnection` handles message transmission on multiple abstract communication
  39. `Channel`s. Each channel has a globally unique byte id.
  40. The byte id and the relative priorities of each `Channel` are configured upon
  41. initialization of the connection.
  42. There are two methods for sending messages:
  43. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  44. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  45. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  46. for the channel with the given id byte `chID`, or until the request times out.
  47. The message `msg` is serialized using the `tendermint/wire` submodule's
  48. `WriteBinary()` reflection routine.
  49. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  50. queue is full.
  51. Inbound message bytes are handled with an onReceive callback function.
  52. */
  53. type MConnection struct {
  54. cmn.BaseService
  55. conn net.Conn
  56. bufReader *bufio.Reader
  57. bufWriter *bufio.Writer
  58. sendMonitor *flow.Monitor
  59. recvMonitor *flow.Monitor
  60. send chan struct{}
  61. pong chan struct{}
  62. channels []*Channel
  63. channelsIdx map[byte]*Channel
  64. onReceive receiveCbFunc
  65. onError errorCbFunc
  66. errored uint32
  67. config *MConnConfig
  68. quit chan struct{}
  69. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  70. pingTimer *cmn.RepeatTimer // send pings periodically
  71. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  72. LocalAddress *NetAddress
  73. RemoteAddress *NetAddress
  74. }
  75. // MConnConfig is a MConnection configuration.
  76. type MConnConfig struct {
  77. SendRate int64 `mapstructure:"send_rate"`
  78. RecvRate int64 `mapstructure:"recv_rate"`
  79. flushThrottle time.Duration
  80. }
  81. // DefaultMConnConfig returns the default config.
  82. func DefaultMConnConfig() *MConnConfig {
  83. return &MConnConfig{
  84. SendRate: defaultSendRate,
  85. RecvRate: defaultRecvRate,
  86. flushThrottle: flushThrottle,
  87. }
  88. }
  89. // NewMConnection wraps net.Conn and creates multiplex connection
  90. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  91. return NewMConnectionWithConfig(
  92. conn,
  93. chDescs,
  94. onReceive,
  95. onError,
  96. DefaultMConnConfig())
  97. }
  98. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  99. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
  100. mconn := &MConnection{
  101. conn: conn,
  102. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  103. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  104. sendMonitor: flow.New(0, 0),
  105. recvMonitor: flow.New(0, 0),
  106. send: make(chan struct{}, 1),
  107. pong: make(chan struct{}),
  108. onReceive: onReceive,
  109. onError: onError,
  110. config: config,
  111. LocalAddress: NewNetAddress(conn.LocalAddr()),
  112. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  113. }
  114. // Create channels
  115. var channelsIdx = map[byte]*Channel{}
  116. var channels = []*Channel{}
  117. for _, desc := range chDescs {
  118. descCopy := *desc // copy the desc else unsafe access across connections
  119. channel := newChannel(mconn, &descCopy)
  120. channelsIdx[channel.id] = channel
  121. channels = append(channels, channel)
  122. }
  123. mconn.channels = channels
  124. mconn.channelsIdx = channelsIdx
  125. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  126. return mconn
  127. }
  128. // OnStart implements BaseService
  129. func (c *MConnection) OnStart() error {
  130. c.BaseService.OnStart()
  131. c.quit = make(chan struct{})
  132. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle)
  133. c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
  134. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
  135. go c.sendRoutine()
  136. go c.recvRoutine()
  137. return nil
  138. }
  139. // OnStop implements BaseService
  140. func (c *MConnection) OnStop() {
  141. c.BaseService.OnStop()
  142. c.flushTimer.Stop()
  143. c.pingTimer.Stop()
  144. c.chStatsTimer.Stop()
  145. if c.quit != nil {
  146. close(c.quit)
  147. }
  148. c.conn.Close()
  149. // We can't close pong safely here because
  150. // recvRoutine may write to it after we've stopped.
  151. // Though it doesn't need to get closed at all,
  152. // we close it @ recvRoutine.
  153. // close(c.pong)
  154. }
  155. func (c *MConnection) String() string {
  156. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  157. }
  158. func (c *MConnection) flush() {
  159. c.Logger.Debug("Flush", "conn", c)
  160. err := c.bufWriter.Flush()
  161. if err != nil {
  162. c.Logger.Error("MConnection flush failed", "err", err)
  163. }
  164. }
  165. // Catch panics, usually caused by remote disconnects.
  166. func (c *MConnection) _recover() {
  167. if r := recover(); r != nil {
  168. stack := debug.Stack()
  169. err := cmn.StackError{r, stack}
  170. c.stopForError(err)
  171. }
  172. }
  173. func (c *MConnection) stopForError(r interface{}) {
  174. c.Stop()
  175. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  176. if c.onError != nil {
  177. c.onError(r)
  178. }
  179. }
  180. }
  181. // Queues a message to be sent to channel.
  182. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  183. if !c.IsRunning() {
  184. return false
  185. }
  186. c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  187. // Send message to channel.
  188. channel, ok := c.channelsIdx[chID]
  189. if !ok {
  190. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  191. return false
  192. }
  193. success := channel.sendBytes(wire.BinaryBytes(msg))
  194. if success {
  195. // Wake up sendRoutine if necessary
  196. select {
  197. case c.send <- struct{}{}:
  198. default:
  199. }
  200. } else {
  201. c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
  202. }
  203. return success
  204. }
  205. // Queues a message to be sent to channel.
  206. // Nonblocking, returns true if successful.
  207. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  208. if !c.IsRunning() {
  209. return false
  210. }
  211. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  212. // Send message to channel.
  213. channel, ok := c.channelsIdx[chID]
  214. if !ok {
  215. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  216. return false
  217. }
  218. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  219. if ok {
  220. // Wake up sendRoutine if necessary
  221. select {
  222. case c.send <- struct{}{}:
  223. default:
  224. }
  225. }
  226. return ok
  227. }
  228. // CanSend returns true if you can send more data onto the chID, false
  229. // otherwise. Use only as a heuristic.
  230. func (c *MConnection) CanSend(chID byte) bool {
  231. if !c.IsRunning() {
  232. return false
  233. }
  234. channel, ok := c.channelsIdx[chID]
  235. if !ok {
  236. c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
  237. return false
  238. }
  239. return channel.canSend()
  240. }
  241. // sendRoutine polls for packets to send from channels.
  242. func (c *MConnection) sendRoutine() {
  243. defer c._recover()
  244. FOR_LOOP:
  245. for {
  246. var n int
  247. var err error
  248. select {
  249. case <-c.flushTimer.Ch:
  250. // NOTE: flushTimer.Set() must be called every time
  251. // something is written to .bufWriter.
  252. c.flush()
  253. case <-c.chStatsTimer.Ch:
  254. for _, channel := range c.channels {
  255. channel.updateStats()
  256. }
  257. case <-c.pingTimer.Ch:
  258. c.Logger.Debug("Send Ping")
  259. wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  260. c.sendMonitor.Update(int(n))
  261. c.flush()
  262. case <-c.pong:
  263. c.Logger.Debug("Send Pong")
  264. wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  265. c.sendMonitor.Update(int(n))
  266. c.flush()
  267. case <-c.quit:
  268. break FOR_LOOP
  269. case <-c.send:
  270. // Send some msgPackets
  271. eof := c.sendSomeMsgPackets()
  272. if !eof {
  273. // Keep sendRoutine awake.
  274. select {
  275. case c.send <- struct{}{}:
  276. default:
  277. }
  278. }
  279. }
  280. if !c.IsRunning() {
  281. break FOR_LOOP
  282. }
  283. if err != nil {
  284. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  285. c.stopForError(err)
  286. break FOR_LOOP
  287. }
  288. }
  289. // Cleanup
  290. }
  291. // Returns true if messages from channels were exhausted.
  292. // Blocks in accordance to .sendMonitor throttling.
  293. func (c *MConnection) sendSomeMsgPackets() bool {
  294. // Block until .sendMonitor says we can write.
  295. // Once we're ready we send more than we asked for,
  296. // but amortized it should even out.
  297. c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
  298. // Now send some msgPackets.
  299. for i := 0; i < numBatchMsgPackets; i++ {
  300. if c.sendMsgPacket() {
  301. return true
  302. }
  303. }
  304. return false
  305. }
  306. // Returns true if messages from channels were exhausted.
  307. func (c *MConnection) sendMsgPacket() bool {
  308. // Choose a channel to create a msgPacket from.
  309. // The chosen channel will be the one whose recentlySent/priority is the least.
  310. var leastRatio float32 = math.MaxFloat32
  311. var leastChannel *Channel
  312. for _, channel := range c.channels {
  313. // If nothing to send, skip this channel
  314. if !channel.isSendPending() {
  315. continue
  316. }
  317. // Get ratio, and keep track of lowest ratio.
  318. ratio := float32(channel.recentlySent) / float32(channel.priority)
  319. if ratio < leastRatio {
  320. leastRatio = ratio
  321. leastChannel = channel
  322. }
  323. }
  324. // Nothing to send?
  325. if leastChannel == nil {
  326. return true
  327. } else {
  328. // c.Logger.Info("Found a msgPacket to send")
  329. }
  330. // Make & send a msgPacket from this channel
  331. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  332. if err != nil {
  333. c.Logger.Error("Failed to write msgPacket", "err", err)
  334. c.stopForError(err)
  335. return true
  336. }
  337. c.sendMonitor.Update(int(n))
  338. c.flushTimer.Set()
  339. return false
  340. }
  341. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  342. // After a whole message has been assembled, it's pushed to onReceive().
  343. // Blocks depending on how the connection is throttled.
  344. func (c *MConnection) recvRoutine() {
  345. defer c._recover()
  346. FOR_LOOP:
  347. for {
  348. // Block until .recvMonitor says we can read.
  349. c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
  350. /*
  351. // Peek into bufReader for debugging
  352. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  353. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  354. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  355. if err == nil {
  356. return bytes
  357. } else {
  358. log.Warn("Error peeking connection buffer", "err", err)
  359. return nil
  360. }
  361. }})
  362. }
  363. */
  364. // Read packet type
  365. var n int
  366. var err error
  367. pktType := wire.ReadByte(c.bufReader, &n, &err)
  368. c.recvMonitor.Update(int(n))
  369. if err != nil {
  370. if c.IsRunning() {
  371. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  372. c.stopForError(err)
  373. }
  374. break FOR_LOOP
  375. }
  376. // Read more depending on packet type.
  377. switch pktType {
  378. case packetTypePing:
  379. // TODO: prevent abuse, as they cause flush()'s.
  380. c.Logger.Debug("Receive Ping")
  381. c.pong <- struct{}{}
  382. case packetTypePong:
  383. // do nothing
  384. c.Logger.Debug("Receive Pong")
  385. case packetTypeMsg:
  386. pkt, n, err := msgPacket{}, int(0), error(nil)
  387. wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
  388. c.recvMonitor.Update(int(n))
  389. if err != nil {
  390. if c.IsRunning() {
  391. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  392. c.stopForError(err)
  393. }
  394. break FOR_LOOP
  395. }
  396. channel, ok := c.channelsIdx[pkt.ChannelID]
  397. if !ok || channel == nil {
  398. cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
  399. }
  400. msgBytes, err := channel.recvMsgPacket(pkt)
  401. if err != nil {
  402. if c.IsRunning() {
  403. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  404. c.stopForError(err)
  405. }
  406. break FOR_LOOP
  407. }
  408. if msgBytes != nil {
  409. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  410. c.onReceive(pkt.ChannelID, msgBytes)
  411. }
  412. default:
  413. cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
  414. }
  415. // TODO: shouldn't this go in the sendRoutine?
  416. // Better to send a ping packet when *we* haven't sent anything for a while.
  417. c.pingTimer.Reset()
  418. }
  419. // Cleanup
  420. close(c.pong)
  421. for range c.pong {
  422. // Drain
  423. }
  424. }
  425. type ConnectionStatus struct {
  426. SendMonitor flow.Status
  427. RecvMonitor flow.Status
  428. Channels []ChannelStatus
  429. }
  430. type ChannelStatus struct {
  431. ID byte
  432. SendQueueCapacity int
  433. SendQueueSize int
  434. Priority int
  435. RecentlySent int64
  436. }
  437. func (c *MConnection) Status() ConnectionStatus {
  438. var status ConnectionStatus
  439. status.SendMonitor = c.sendMonitor.Status()
  440. status.RecvMonitor = c.recvMonitor.Status()
  441. status.Channels = make([]ChannelStatus, len(c.channels))
  442. for i, channel := range c.channels {
  443. status.Channels[i] = ChannelStatus{
  444. ID: channel.id,
  445. SendQueueCapacity: cap(channel.sendQueue),
  446. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  447. Priority: channel.priority,
  448. RecentlySent: channel.recentlySent,
  449. }
  450. }
  451. return status
  452. }
  453. //-----------------------------------------------------------------------------
  454. type ChannelDescriptor struct {
  455. ID byte
  456. Priority int
  457. SendQueueCapacity int
  458. RecvBufferCapacity int
  459. RecvMessageCapacity int
  460. }
  461. func (chDesc *ChannelDescriptor) FillDefaults() {
  462. if chDesc.SendQueueCapacity == 0 {
  463. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  464. }
  465. if chDesc.RecvBufferCapacity == 0 {
  466. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  467. }
  468. if chDesc.RecvMessageCapacity == 0 {
  469. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  470. }
  471. }
  472. // TODO: lowercase.
  473. // NOTE: not goroutine-safe.
  474. type Channel struct {
  475. conn *MConnection
  476. desc *ChannelDescriptor
  477. id byte
  478. sendQueue chan []byte
  479. sendQueueSize int32 // atomic.
  480. recving []byte
  481. sending []byte
  482. priority int
  483. recentlySent int64 // exponential moving average
  484. }
  485. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  486. desc.FillDefaults()
  487. if desc.Priority <= 0 {
  488. cmn.PanicSanity("Channel default priority must be a postive integer")
  489. }
  490. return &Channel{
  491. conn: conn,
  492. desc: desc,
  493. id: desc.ID,
  494. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  495. recving: make([]byte, 0, desc.RecvBufferCapacity),
  496. priority: desc.Priority,
  497. }
  498. }
  499. // Queues message to send to this channel.
  500. // Goroutine-safe
  501. // Times out (and returns false) after defaultSendTimeout
  502. func (ch *Channel) sendBytes(bytes []byte) bool {
  503. select {
  504. case ch.sendQueue <- bytes:
  505. atomic.AddInt32(&ch.sendQueueSize, 1)
  506. return true
  507. case <-time.After(defaultSendTimeout):
  508. return false
  509. }
  510. }
  511. // Queues message to send to this channel.
  512. // Nonblocking, returns true if successful.
  513. // Goroutine-safe
  514. func (ch *Channel) trySendBytes(bytes []byte) bool {
  515. select {
  516. case ch.sendQueue <- bytes:
  517. atomic.AddInt32(&ch.sendQueueSize, 1)
  518. return true
  519. default:
  520. return false
  521. }
  522. }
  523. // Goroutine-safe
  524. func (ch *Channel) loadSendQueueSize() (size int) {
  525. return int(atomic.LoadInt32(&ch.sendQueueSize))
  526. }
  527. // Goroutine-safe
  528. // Use only as a heuristic.
  529. func (ch *Channel) canSend() bool {
  530. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  531. }
  532. // Returns true if any msgPackets are pending to be sent.
  533. // Call before calling nextMsgPacket()
  534. // Goroutine-safe
  535. func (ch *Channel) isSendPending() bool {
  536. if len(ch.sending) == 0 {
  537. if len(ch.sendQueue) == 0 {
  538. return false
  539. }
  540. ch.sending = <-ch.sendQueue
  541. }
  542. return true
  543. }
  544. // Creates a new msgPacket to send.
  545. // Not goroutine-safe
  546. func (ch *Channel) nextMsgPacket() msgPacket {
  547. packet := msgPacket{}
  548. packet.ChannelID = byte(ch.id)
  549. packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
  550. if len(ch.sending) <= maxMsgPacketPayloadSize {
  551. packet.EOF = byte(0x01)
  552. ch.sending = nil
  553. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  554. } else {
  555. packet.EOF = byte(0x00)
  556. ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
  557. }
  558. return packet
  559. }
  560. // Writes next msgPacket to w.
  561. // Not goroutine-safe
  562. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  563. packet := ch.nextMsgPacket()
  564. // log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  565. wire.WriteByte(packetTypeMsg, w, &n, &err)
  566. wire.WriteBinary(packet, w, &n, &err)
  567. if err == nil {
  568. ch.recentlySent += int64(n)
  569. }
  570. return
  571. }
  572. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  573. // Not goroutine-safe
  574. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  575. // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  576. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  577. return nil, wire.ErrBinaryReadOverflow
  578. }
  579. ch.recving = append(ch.recving, packet.Bytes...)
  580. if packet.EOF == byte(0x01) {
  581. msgBytes := ch.recving
  582. // clear the slice without re-allocating.
  583. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  584. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  585. // at which point the recving slice stops being used and should be garbage collected
  586. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  587. return msgBytes, nil
  588. }
  589. return nil, nil
  590. }
  591. // Call this periodically to update stats for throttling purposes.
  592. // Not goroutine-safe
  593. func (ch *Channel) updateStats() {
  594. // Exponential decay of stats.
  595. // TODO: optimize.
  596. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  597. }
  598. //-----------------------------------------------------------------------------
  599. const (
  600. maxMsgPacketPayloadSize = 1024
  601. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  602. maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  603. packetTypePing = byte(0x01)
  604. packetTypePong = byte(0x02)
  605. packetTypeMsg = byte(0x03)
  606. )
  607. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  608. type msgPacket struct {
  609. ChannelID byte
  610. EOF byte // 1 means message ends here.
  611. Bytes []byte
  612. }
  613. func (p msgPacket) String() string {
  614. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  615. }