You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

686 lines
18 KiB

9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. wire "github.com/tendermint/go-wire"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. flow "github.com/tendermint/tmlibs/flowrate"
  14. )
  15. const (
  16. numBatchMsgPackets = 10
  17. minReadBufferSize = 1024
  18. minWriteBufferSize = 65536
  19. updateState = 2 * time.Second
  20. pingTimeout = 40 * time.Second
  21. flushThrottle = 100 * time.Millisecond
  22. defaultSendQueueCapacity = 1
  23. defaultSendRate = int64(512000) // 500KB/s
  24. defaultRecvBufferCapacity = 4096
  25. defaultRecvMessageCapacity = 22020096 // 21MB
  26. defaultRecvRate = int64(512000) // 500KB/s
  27. defaultSendTimeout = 10 * time.Second
  28. )
  29. type receiveCbFunc func(chID byte, msgBytes []byte)
  30. type errorCbFunc func(interface{})
  31. /*
  32. Each peer has one `MConnection` (multiplex connection) instance.
  33. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  34. several messages along a single channel of communication.
  35. Each `MConnection` handles message transmission on multiple abstract communication
  36. `Channel`s. Each channel has a globally unique byte id.
  37. The byte id and the relative priorities of each `Channel` are configured upon
  38. initialization of the connection.
  39. There are two methods for sending messages:
  40. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  41. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  42. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  43. for the channel with the given id byte `chID`, or until the request times out.
  44. The message `msg` is serialized using the `tendermint/wire` submodule's
  45. `WriteBinary()` reflection routine.
  46. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  47. queue is full.
  48. Inbound message bytes are handled with an onReceive callback function.
  49. */
  50. type MConnection struct {
  51. cmn.BaseService
  52. conn net.Conn
  53. bufReader *bufio.Reader
  54. bufWriter *bufio.Writer
  55. sendMonitor *flow.Monitor
  56. recvMonitor *flow.Monitor
  57. send chan struct{}
  58. pong chan struct{}
  59. channels []*Channel
  60. channelsIdx map[byte]*Channel
  61. onReceive receiveCbFunc
  62. onError errorCbFunc
  63. errored uint32
  64. config *MConnConfig
  65. quit chan struct{}
  66. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  67. pingTimer *cmn.RepeatTimer // send pings periodically
  68. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  69. LocalAddress *NetAddress
  70. RemoteAddress *NetAddress
  71. }
  72. // MConnConfig is a MConnection configuration.
  73. type MConnConfig struct {
  74. SendRate int64 `mapstructure:"send_rate"`
  75. RecvRate int64 `mapstructure:"recv_rate"`
  76. }
  77. // DefaultMConnConfig returns the default config.
  78. func DefaultMConnConfig() *MConnConfig {
  79. return &MConnConfig{
  80. SendRate: defaultSendRate,
  81. RecvRate: defaultRecvRate,
  82. }
  83. }
  84. // NewMConnection wraps net.Conn and creates multiplex connection
  85. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  86. return NewMConnectionWithConfig(
  87. conn,
  88. chDescs,
  89. onReceive,
  90. onError,
  91. DefaultMConnConfig())
  92. }
  93. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  94. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
  95. mconn := &MConnection{
  96. conn: conn,
  97. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  98. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  99. sendMonitor: flow.New(0, 0),
  100. recvMonitor: flow.New(0, 0),
  101. send: make(chan struct{}, 1),
  102. pong: make(chan struct{}),
  103. onReceive: onReceive,
  104. onError: onError,
  105. config: config,
  106. LocalAddress: NewNetAddress(conn.LocalAddr()),
  107. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  108. }
  109. // Create channels
  110. var channelsIdx = map[byte]*Channel{}
  111. var channels = []*Channel{}
  112. for _, desc := range chDescs {
  113. descCopy := *desc // copy the desc else unsafe access across connections
  114. channel := newChannel(mconn, &descCopy)
  115. channelsIdx[channel.id] = channel
  116. channels = append(channels, channel)
  117. }
  118. mconn.channels = channels
  119. mconn.channelsIdx = channelsIdx
  120. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  121. return mconn
  122. }
  123. func (c *MConnection) OnStart() error {
  124. c.BaseService.OnStart()
  125. c.quit = make(chan struct{})
  126. c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
  127. c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
  128. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
  129. go c.sendRoutine()
  130. go c.recvRoutine()
  131. return nil
  132. }
  133. func (c *MConnection) OnStop() {
  134. c.BaseService.OnStop()
  135. c.flushTimer.Stop()
  136. c.pingTimer.Stop()
  137. c.chStatsTimer.Stop()
  138. if c.quit != nil {
  139. close(c.quit)
  140. }
  141. c.conn.Close()
  142. // We can't close pong safely here because
  143. // recvRoutine may write to it after we've stopped.
  144. // Though it doesn't need to get closed at all,
  145. // we close it @ recvRoutine.
  146. // close(c.pong)
  147. }
  148. func (c *MConnection) String() string {
  149. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  150. }
  151. func (c *MConnection) flush() {
  152. c.Logger.Debug("Flush", "conn", c)
  153. err := c.bufWriter.Flush()
  154. if err != nil {
  155. c.Logger.Error("MConnection flush failed", "error", err)
  156. }
  157. }
  158. // Catch panics, usually caused by remote disconnects.
  159. func (c *MConnection) _recover() {
  160. if r := recover(); r != nil {
  161. stack := debug.Stack()
  162. err := cmn.StackError{r, stack}
  163. c.stopForError(err)
  164. }
  165. }
  166. func (c *MConnection) stopForError(r interface{}) {
  167. c.Stop()
  168. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  169. if c.onError != nil {
  170. c.onError(r)
  171. }
  172. }
  173. }
  174. // Queues a message to be sent to channel.
  175. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  176. if !c.IsRunning() {
  177. return false
  178. }
  179. c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  180. // Send message to channel.
  181. channel, ok := c.channelsIdx[chID]
  182. if !ok {
  183. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  184. return false
  185. }
  186. success := channel.sendBytes(wire.BinaryBytes(msg))
  187. if success {
  188. // Wake up sendRoutine if necessary
  189. select {
  190. case c.send <- struct{}{}:
  191. default:
  192. }
  193. } else {
  194. c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
  195. }
  196. return success
  197. }
  198. // Queues a message to be sent to channel.
  199. // Nonblocking, returns true if successful.
  200. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  201. if !c.IsRunning() {
  202. return false
  203. }
  204. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  205. // Send message to channel.
  206. channel, ok := c.channelsIdx[chID]
  207. if !ok {
  208. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  209. return false
  210. }
  211. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  212. if ok {
  213. // Wake up sendRoutine if necessary
  214. select {
  215. case c.send <- struct{}{}:
  216. default:
  217. }
  218. }
  219. return ok
  220. }
  221. // CanSend returns true if you can send more data onto the chID, false
  222. // otherwise. Use only as a heuristic.
  223. func (c *MConnection) CanSend(chID byte) bool {
  224. if !c.IsRunning() {
  225. return false
  226. }
  227. channel, ok := c.channelsIdx[chID]
  228. if !ok {
  229. c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
  230. return false
  231. }
  232. return channel.canSend()
  233. }
  234. // sendRoutine polls for packets to send from channels.
  235. func (c *MConnection) sendRoutine() {
  236. defer c._recover()
  237. FOR_LOOP:
  238. for {
  239. var n int
  240. var err error
  241. select {
  242. case <-c.flushTimer.Ch:
  243. // NOTE: flushTimer.Set() must be called every time
  244. // something is written to .bufWriter.
  245. c.flush()
  246. case <-c.chStatsTimer.Ch:
  247. for _, channel := range c.channels {
  248. channel.updateStats()
  249. }
  250. case <-c.pingTimer.Ch:
  251. c.Logger.Debug("Send Ping")
  252. wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  253. c.sendMonitor.Update(int(n))
  254. c.flush()
  255. case <-c.pong:
  256. c.Logger.Debug("Send Pong")
  257. wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  258. c.sendMonitor.Update(int(n))
  259. c.flush()
  260. case <-c.quit:
  261. break FOR_LOOP
  262. case <-c.send:
  263. // Send some msgPackets
  264. eof := c.sendSomeMsgPackets()
  265. if !eof {
  266. // Keep sendRoutine awake.
  267. select {
  268. case c.send <- struct{}{}:
  269. default:
  270. }
  271. }
  272. }
  273. if !c.IsRunning() {
  274. break FOR_LOOP
  275. }
  276. if err != nil {
  277. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "error", err)
  278. c.stopForError(err)
  279. break FOR_LOOP
  280. }
  281. }
  282. // Cleanup
  283. }
  284. // Returns true if messages from channels were exhausted.
  285. // Blocks in accordance to .sendMonitor throttling.
  286. func (c *MConnection) sendSomeMsgPackets() bool {
  287. // Block until .sendMonitor says we can write.
  288. // Once we're ready we send more than we asked for,
  289. // but amortized it should even out.
  290. c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
  291. // Now send some msgPackets.
  292. for i := 0; i < numBatchMsgPackets; i++ {
  293. if c.sendMsgPacket() {
  294. return true
  295. }
  296. }
  297. return false
  298. }
  299. // Returns true if messages from channels were exhausted.
  300. func (c *MConnection) sendMsgPacket() bool {
  301. // Choose a channel to create a msgPacket from.
  302. // The chosen channel will be the one whose recentlySent/priority is the least.
  303. var leastRatio float32 = math.MaxFloat32
  304. var leastChannel *Channel
  305. for _, channel := range c.channels {
  306. // If nothing to send, skip this channel
  307. if !channel.isSendPending() {
  308. continue
  309. }
  310. // Get ratio, and keep track of lowest ratio.
  311. ratio := float32(channel.recentlySent) / float32(channel.priority)
  312. if ratio < leastRatio {
  313. leastRatio = ratio
  314. leastChannel = channel
  315. }
  316. }
  317. // Nothing to send?
  318. if leastChannel == nil {
  319. return true
  320. } else {
  321. // c.Logger.Info("Found a msgPacket to send")
  322. }
  323. // Make & send a msgPacket from this channel
  324. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  325. if err != nil {
  326. c.Logger.Error("Failed to write msgPacket", "error", err)
  327. c.stopForError(err)
  328. return true
  329. }
  330. c.sendMonitor.Update(int(n))
  331. c.flushTimer.Set()
  332. return false
  333. }
  334. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  335. // After a whole message has been assembled, it's pushed to onReceive().
  336. // Blocks depending on how the connection is throttled.
  337. func (c *MConnection) recvRoutine() {
  338. defer c._recover()
  339. FOR_LOOP:
  340. for {
  341. // Block until .recvMonitor says we can read.
  342. c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
  343. /*
  344. // Peek into bufReader for debugging
  345. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  346. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  347. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  348. if err == nil {
  349. return bytes
  350. } else {
  351. log.Warn("Error peeking connection buffer", "error", err)
  352. return nil
  353. }
  354. }})
  355. }
  356. */
  357. // Read packet type
  358. var n int
  359. var err error
  360. pktType := wire.ReadByte(c.bufReader, &n, &err)
  361. c.recvMonitor.Update(int(n))
  362. if err != nil {
  363. if c.IsRunning() {
  364. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err)
  365. c.stopForError(err)
  366. }
  367. break FOR_LOOP
  368. }
  369. // Read more depending on packet type.
  370. switch pktType {
  371. case packetTypePing:
  372. // TODO: prevent abuse, as they cause flush()'s.
  373. c.Logger.Debug("Receive Ping")
  374. c.pong <- struct{}{}
  375. case packetTypePong:
  376. // do nothing
  377. c.Logger.Debug("Receive Pong")
  378. case packetTypeMsg:
  379. pkt, n, err := msgPacket{}, int(0), error(nil)
  380. wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
  381. c.recvMonitor.Update(int(n))
  382. if err != nil {
  383. if c.IsRunning() {
  384. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "error", err)
  385. c.stopForError(err)
  386. }
  387. break FOR_LOOP
  388. }
  389. channel, ok := c.channelsIdx[pkt.ChannelID]
  390. if !ok || channel == nil {
  391. cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
  392. }
  393. msgBytes, err := channel.recvMsgPacket(pkt)
  394. if err != nil {
  395. if c.IsRunning() {
  396. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "error", err)
  397. c.stopForError(err)
  398. }
  399. break FOR_LOOP
  400. }
  401. if msgBytes != nil {
  402. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  403. c.onReceive(pkt.ChannelID, msgBytes)
  404. }
  405. default:
  406. cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
  407. }
  408. // TODO: shouldn't this go in the sendRoutine?
  409. // Better to send a ping packet when *we* haven't sent anything for a while.
  410. c.pingTimer.Reset()
  411. }
  412. // Cleanup
  413. close(c.pong)
  414. for _ = range c.pong {
  415. // Drain
  416. }
  417. }
  418. type ConnectionStatus struct {
  419. SendMonitor flow.Status
  420. RecvMonitor flow.Status
  421. Channels []ChannelStatus
  422. }
  423. type ChannelStatus struct {
  424. ID byte
  425. SendQueueCapacity int
  426. SendQueueSize int
  427. Priority int
  428. RecentlySent int64
  429. }
  430. func (c *MConnection) Status() ConnectionStatus {
  431. var status ConnectionStatus
  432. status.SendMonitor = c.sendMonitor.Status()
  433. status.RecvMonitor = c.recvMonitor.Status()
  434. status.Channels = make([]ChannelStatus, len(c.channels))
  435. for i, channel := range c.channels {
  436. status.Channels[i] = ChannelStatus{
  437. ID: channel.id,
  438. SendQueueCapacity: cap(channel.sendQueue),
  439. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  440. Priority: channel.priority,
  441. RecentlySent: channel.recentlySent,
  442. }
  443. }
  444. return status
  445. }
  446. //-----------------------------------------------------------------------------
  447. type ChannelDescriptor struct {
  448. ID byte
  449. Priority int
  450. SendQueueCapacity int
  451. RecvBufferCapacity int
  452. RecvMessageCapacity int
  453. }
  454. func (chDesc *ChannelDescriptor) FillDefaults() {
  455. if chDesc.SendQueueCapacity == 0 {
  456. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  457. }
  458. if chDesc.RecvBufferCapacity == 0 {
  459. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  460. }
  461. if chDesc.RecvMessageCapacity == 0 {
  462. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  463. }
  464. }
  465. // TODO: lowercase.
  466. // NOTE: not goroutine-safe.
  467. type Channel struct {
  468. conn *MConnection
  469. desc *ChannelDescriptor
  470. id byte
  471. sendQueue chan []byte
  472. sendQueueSize int32 // atomic.
  473. recving []byte
  474. sending []byte
  475. priority int
  476. recentlySent int64 // exponential moving average
  477. }
  478. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  479. desc.FillDefaults()
  480. if desc.Priority <= 0 {
  481. cmn.PanicSanity("Channel default priority must be a postive integer")
  482. }
  483. return &Channel{
  484. conn: conn,
  485. desc: desc,
  486. id: desc.ID,
  487. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  488. recving: make([]byte, 0, desc.RecvBufferCapacity),
  489. priority: desc.Priority,
  490. }
  491. }
  492. // Queues message to send to this channel.
  493. // Goroutine-safe
  494. // Times out (and returns false) after defaultSendTimeout
  495. func (ch *Channel) sendBytes(bytes []byte) bool {
  496. select {
  497. case ch.sendQueue <- bytes:
  498. atomic.AddInt32(&ch.sendQueueSize, 1)
  499. return true
  500. case <-time.After(defaultSendTimeout):
  501. return false
  502. }
  503. }
  504. // Queues message to send to this channel.
  505. // Nonblocking, returns true if successful.
  506. // Goroutine-safe
  507. func (ch *Channel) trySendBytes(bytes []byte) bool {
  508. select {
  509. case ch.sendQueue <- bytes:
  510. atomic.AddInt32(&ch.sendQueueSize, 1)
  511. return true
  512. default:
  513. return false
  514. }
  515. }
  516. // Goroutine-safe
  517. func (ch *Channel) loadSendQueueSize() (size int) {
  518. return int(atomic.LoadInt32(&ch.sendQueueSize))
  519. }
  520. // Goroutine-safe
  521. // Use only as a heuristic.
  522. func (ch *Channel) canSend() bool {
  523. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  524. }
  525. // Returns true if any msgPackets are pending to be sent.
  526. // Call before calling nextMsgPacket()
  527. // Goroutine-safe
  528. func (ch *Channel) isSendPending() bool {
  529. if len(ch.sending) == 0 {
  530. if len(ch.sendQueue) == 0 {
  531. return false
  532. }
  533. ch.sending = <-ch.sendQueue
  534. }
  535. return true
  536. }
  537. // Creates a new msgPacket to send.
  538. // Not goroutine-safe
  539. func (ch *Channel) nextMsgPacket() msgPacket {
  540. packet := msgPacket{}
  541. packet.ChannelID = byte(ch.id)
  542. packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
  543. if len(ch.sending) <= maxMsgPacketPayloadSize {
  544. packet.EOF = byte(0x01)
  545. ch.sending = nil
  546. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  547. } else {
  548. packet.EOF = byte(0x00)
  549. ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
  550. }
  551. return packet
  552. }
  553. // Writes next msgPacket to w.
  554. // Not goroutine-safe
  555. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  556. packet := ch.nextMsgPacket()
  557. // log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  558. wire.WriteByte(packetTypeMsg, w, &n, &err)
  559. wire.WriteBinary(packet, w, &n, &err)
  560. if err == nil {
  561. ch.recentlySent += int64(n)
  562. }
  563. return
  564. }
  565. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  566. // Not goroutine-safe
  567. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  568. // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  569. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  570. return nil, wire.ErrBinaryReadOverflow
  571. }
  572. ch.recving = append(ch.recving, packet.Bytes...)
  573. if packet.EOF == byte(0x01) {
  574. msgBytes := ch.recving
  575. // clear the slice without re-allocating.
  576. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  577. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  578. // at which point the recving slice stops being used and should be garbage collected
  579. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  580. return msgBytes, nil
  581. }
  582. return nil, nil
  583. }
  584. // Call this periodically to update stats for throttling purposes.
  585. // Not goroutine-safe
  586. func (ch *Channel) updateStats() {
  587. // Exponential decay of stats.
  588. // TODO: optimize.
  589. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  590. }
  591. //-----------------------------------------------------------------------------
  592. const (
  593. maxMsgPacketPayloadSize = 1024
  594. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  595. maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  596. packetTypePing = byte(0x01)
  597. packetTypePong = byte(0x02)
  598. packetTypeMsg = byte(0x03)
  599. )
  600. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  601. type msgPacket struct {
  602. ChannelID byte
  603. EOF byte // 1 means message ends here.
  604. Bytes []byte
  605. }
  606. func (p msgPacket) String() string {
  607. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  608. }