You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

718 lines
19 KiB

9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
7 years ago
7 years ago
7 years ago
7 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. wire "github.com/tendermint/go-wire"
  12. tmlegacy "github.com/tendermint/go-wire/nowriter/tmlegacy"
  13. cmn "github.com/tendermint/tmlibs/common"
  14. flow "github.com/tendermint/tmlibs/flowrate"
  15. )
  16. var legacy = tmlegacy.TMEncoderLegacy{}
  17. const (
  18. numBatchMsgPackets = 10
  19. minReadBufferSize = 1024
  20. minWriteBufferSize = 65536
  21. updateState = 2 * time.Second
  22. pingTimeout = 40 * time.Second
  23. // some of these defaults are written in the user config
  24. // flushThrottle, sendRate, recvRate
  25. // TODO: remove values present in config
  26. defaultFlushThrottle = 100 * time.Millisecond
  27. defaultSendQueueCapacity = 1
  28. defaultRecvBufferCapacity = 4096
  29. defaultRecvMessageCapacity = 22020096 // 21MB
  30. defaultSendRate = int64(512000) // 500KB/s
  31. defaultRecvRate = int64(512000) // 500KB/s
  32. defaultSendTimeout = 10 * time.Second
  33. )
  34. type receiveCbFunc func(chID byte, msgBytes []byte)
  35. type errorCbFunc func(interface{})
  36. /*
  37. Each peer has one `MConnection` (multiplex connection) instance.
  38. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  39. several messages along a single channel of communication.
  40. Each `MConnection` handles message transmission on multiple abstract communication
  41. `Channel`s. Each channel has a globally unique byte id.
  42. The byte id and the relative priorities of each `Channel` are configured upon
  43. initialization of the connection.
  44. There are two methods for sending messages:
  45. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  46. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  47. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  48. for the channel with the given id byte `chID`, or until the request times out.
  49. The message `msg` is serialized using the `tendermint/wire` submodule's
  50. `WriteBinary()` reflection routine.
  51. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  52. queue is full.
  53. Inbound message bytes are handled with an onReceive callback function.
  54. */
  55. type MConnection struct {
  56. cmn.BaseService
  57. conn net.Conn
  58. bufReader *bufio.Reader
  59. bufWriter *bufio.Writer
  60. sendMonitor *flow.Monitor
  61. recvMonitor *flow.Monitor
  62. send chan struct{}
  63. pong chan struct{}
  64. channels []*Channel
  65. channelsIdx map[byte]*Channel
  66. onReceive receiveCbFunc
  67. onError errorCbFunc
  68. errored uint32
  69. config *MConnConfig
  70. quit chan struct{}
  71. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  72. pingTimer *cmn.RepeatTimer // send pings periodically
  73. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  74. LocalAddress *NetAddress
  75. RemoteAddress *NetAddress
  76. }
  77. // MConnConfig is a MConnection configuration.
  78. type MConnConfig struct {
  79. SendRate int64 `mapstructure:"send_rate"`
  80. RecvRate int64 `mapstructure:"recv_rate"`
  81. maxMsgPacketPayloadSize int
  82. flushThrottle time.Duration
  83. }
  84. func (cfg *MConnConfig) maxMsgPacketTotalSize() int {
  85. return cfg.maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  86. }
  87. // DefaultMConnConfig returns the default config.
  88. func DefaultMConnConfig() *MConnConfig {
  89. return &MConnConfig{
  90. SendRate: defaultSendRate,
  91. RecvRate: defaultRecvRate,
  92. maxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize,
  93. flushThrottle: defaultFlushThrottle,
  94. }
  95. }
  96. // NewMConnection wraps net.Conn and creates multiplex connection
  97. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  98. return NewMConnectionWithConfig(
  99. conn,
  100. chDescs,
  101. onReceive,
  102. onError,
  103. DefaultMConnConfig())
  104. }
  105. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  106. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
  107. mconn := &MConnection{
  108. conn: conn,
  109. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  110. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  111. sendMonitor: flow.New(0, 0),
  112. recvMonitor: flow.New(0, 0),
  113. send: make(chan struct{}, 1),
  114. pong: make(chan struct{}),
  115. onReceive: onReceive,
  116. onError: onError,
  117. config: config,
  118. LocalAddress: NewNetAddress(conn.LocalAddr()),
  119. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  120. }
  121. // Create channels
  122. var channelsIdx = map[byte]*Channel{}
  123. var channels = []*Channel{}
  124. for _, desc := range chDescs {
  125. channel := newChannel(mconn, *desc)
  126. channelsIdx[channel.desc.ID] = channel
  127. channels = append(channels, channel)
  128. }
  129. mconn.channels = channels
  130. mconn.channelsIdx = channelsIdx
  131. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  132. return mconn
  133. }
  134. // OnStart implements BaseService
  135. func (c *MConnection) OnStart() error {
  136. if err := c.BaseService.OnStart(); err != nil {
  137. return err
  138. }
  139. c.quit = make(chan struct{})
  140. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle)
  141. c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
  142. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
  143. go c.sendRoutine()
  144. go c.recvRoutine()
  145. return nil
  146. }
  147. // OnStop implements BaseService
  148. func (c *MConnection) OnStop() {
  149. c.BaseService.OnStop()
  150. c.flushTimer.Stop()
  151. c.pingTimer.Stop()
  152. c.chStatsTimer.Stop()
  153. if c.quit != nil {
  154. close(c.quit)
  155. }
  156. c.conn.Close() // nolint: errcheck
  157. // We can't close pong safely here because
  158. // recvRoutine may write to it after we've stopped.
  159. // Though it doesn't need to get closed at all,
  160. // we close it @ recvRoutine.
  161. // close(c.pong)
  162. }
  163. func (c *MConnection) String() string {
  164. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  165. }
  166. func (c *MConnection) flush() {
  167. c.Logger.Debug("Flush", "conn", c)
  168. err := c.bufWriter.Flush()
  169. if err != nil {
  170. c.Logger.Error("MConnection flush failed", "err", err)
  171. }
  172. }
  173. // Catch panics, usually caused by remote disconnects.
  174. func (c *MConnection) _recover() {
  175. if r := recover(); r != nil {
  176. stack := debug.Stack()
  177. err := cmn.StackError{r, stack}
  178. c.stopForError(err)
  179. }
  180. }
  181. func (c *MConnection) stopForError(r interface{}) {
  182. c.Stop()
  183. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  184. if c.onError != nil {
  185. c.onError(r)
  186. }
  187. }
  188. }
  189. // Queues a message to be sent to channel.
  190. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  191. if !c.IsRunning() {
  192. return false
  193. }
  194. c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  195. // Send message to channel.
  196. channel, ok := c.channelsIdx[chID]
  197. if !ok {
  198. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  199. return false
  200. }
  201. success := channel.sendBytes(wire.BinaryBytes(msg))
  202. if success {
  203. // Wake up sendRoutine if necessary
  204. select {
  205. case c.send <- struct{}{}:
  206. default:
  207. }
  208. } else {
  209. c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
  210. }
  211. return success
  212. }
  213. // Queues a message to be sent to channel.
  214. // Nonblocking, returns true if successful.
  215. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  216. if !c.IsRunning() {
  217. return false
  218. }
  219. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  220. // Send message to channel.
  221. channel, ok := c.channelsIdx[chID]
  222. if !ok {
  223. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  224. return false
  225. }
  226. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  227. if ok {
  228. // Wake up sendRoutine if necessary
  229. select {
  230. case c.send <- struct{}{}:
  231. default:
  232. }
  233. }
  234. return ok
  235. }
  236. // CanSend returns true if you can send more data onto the chID, false
  237. // otherwise. Use only as a heuristic.
  238. func (c *MConnection) CanSend(chID byte) bool {
  239. if !c.IsRunning() {
  240. return false
  241. }
  242. channel, ok := c.channelsIdx[chID]
  243. if !ok {
  244. c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
  245. return false
  246. }
  247. return channel.canSend()
  248. }
  249. // sendRoutine polls for packets to send from channels.
  250. func (c *MConnection) sendRoutine() {
  251. defer c._recover()
  252. FOR_LOOP:
  253. for {
  254. var n int
  255. var err error
  256. select {
  257. case <-c.flushTimer.Ch:
  258. // NOTE: flushTimer.Set() must be called every time
  259. // something is written to .bufWriter.
  260. c.flush()
  261. case <-c.chStatsTimer.Ch:
  262. for _, channel := range c.channels {
  263. channel.updateStats()
  264. }
  265. case <-c.pingTimer.Ch:
  266. c.Logger.Debug("Send Ping")
  267. legacy.WriteOctet(packetTypePing, c.bufWriter, &n, &err)
  268. c.sendMonitor.Update(int(n))
  269. c.flush()
  270. case <-c.pong:
  271. c.Logger.Debug("Send Pong")
  272. legacy.WriteOctet(packetTypePong, c.bufWriter, &n, &err)
  273. c.sendMonitor.Update(int(n))
  274. c.flush()
  275. case <-c.quit:
  276. break FOR_LOOP
  277. case <-c.send:
  278. // Send some msgPackets
  279. eof := c.sendSomeMsgPackets()
  280. if !eof {
  281. // Keep sendRoutine awake.
  282. select {
  283. case c.send <- struct{}{}:
  284. default:
  285. }
  286. }
  287. }
  288. if !c.IsRunning() {
  289. break FOR_LOOP
  290. }
  291. if err != nil {
  292. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  293. c.stopForError(err)
  294. break FOR_LOOP
  295. }
  296. }
  297. // Cleanup
  298. }
  299. // Returns true if messages from channels were exhausted.
  300. // Blocks in accordance to .sendMonitor throttling.
  301. func (c *MConnection) sendSomeMsgPackets() bool {
  302. // Block until .sendMonitor says we can write.
  303. // Once we're ready we send more than we asked for,
  304. // but amortized it should even out.
  305. c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true)
  306. // Now send some msgPackets.
  307. for i := 0; i < numBatchMsgPackets; i++ {
  308. if c.sendMsgPacket() {
  309. return true
  310. }
  311. }
  312. return false
  313. }
  314. // Returns true if messages from channels were exhausted.
  315. func (c *MConnection) sendMsgPacket() bool {
  316. // Choose a channel to create a msgPacket from.
  317. // The chosen channel will be the one whose recentlySent/priority is the least.
  318. var leastRatio float32 = math.MaxFloat32
  319. var leastChannel *Channel
  320. for _, channel := range c.channels {
  321. // If nothing to send, skip this channel
  322. if !channel.isSendPending() {
  323. continue
  324. }
  325. // Get ratio, and keep track of lowest ratio.
  326. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  327. if ratio < leastRatio {
  328. leastRatio = ratio
  329. leastChannel = channel
  330. }
  331. }
  332. // Nothing to send?
  333. if leastChannel == nil {
  334. return true
  335. } else {
  336. // c.Logger.Info("Found a msgPacket to send")
  337. }
  338. // Make & send a msgPacket from this channel
  339. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  340. if err != nil {
  341. c.Logger.Error("Failed to write msgPacket", "err", err)
  342. c.stopForError(err)
  343. return true
  344. }
  345. c.sendMonitor.Update(int(n))
  346. c.flushTimer.Set()
  347. return false
  348. }
  349. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  350. // After a whole message has been assembled, it's pushed to onReceive().
  351. // Blocks depending on how the connection is throttled.
  352. func (c *MConnection) recvRoutine() {
  353. defer c._recover()
  354. FOR_LOOP:
  355. for {
  356. // Block until .recvMonitor says we can read.
  357. c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true)
  358. /*
  359. // Peek into bufReader for debugging
  360. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  361. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  362. bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
  363. if err == nil {
  364. return bytes
  365. } else {
  366. log.Warn("Error peeking connection buffer", "err", err)
  367. return nil
  368. }
  369. }})
  370. }
  371. */
  372. // Read packet type
  373. var n int
  374. var err error
  375. pktType := wire.ReadByte(c.bufReader, &n, &err)
  376. c.recvMonitor.Update(int(n))
  377. if err != nil {
  378. if c.IsRunning() {
  379. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  380. c.stopForError(err)
  381. }
  382. break FOR_LOOP
  383. }
  384. // Read more depending on packet type.
  385. switch pktType {
  386. case packetTypePing:
  387. // TODO: prevent abuse, as they cause flush()'s.
  388. c.Logger.Debug("Receive Ping")
  389. c.pong <- struct{}{}
  390. case packetTypePong:
  391. // do nothing
  392. c.Logger.Debug("Receive Pong")
  393. case packetTypeMsg:
  394. pkt, n, err := msgPacket{}, int(0), error(nil)
  395. wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err)
  396. c.recvMonitor.Update(int(n))
  397. if err != nil {
  398. if c.IsRunning() {
  399. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  400. c.stopForError(err)
  401. }
  402. break FOR_LOOP
  403. }
  404. channel, ok := c.channelsIdx[pkt.ChannelID]
  405. if !ok || channel == nil {
  406. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  407. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  408. c.stopForError(err)
  409. }
  410. msgBytes, err := channel.recvMsgPacket(pkt)
  411. if err != nil {
  412. if c.IsRunning() {
  413. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  414. c.stopForError(err)
  415. }
  416. break FOR_LOOP
  417. }
  418. if msgBytes != nil {
  419. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  420. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  421. c.onReceive(pkt.ChannelID, msgBytes)
  422. }
  423. default:
  424. err := fmt.Errorf("Unknown message type %X", pktType)
  425. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  426. c.stopForError(err)
  427. }
  428. // TODO: shouldn't this go in the sendRoutine?
  429. // Better to send a ping packet when *we* haven't sent anything for a while.
  430. c.pingTimer.Reset()
  431. }
  432. // Cleanup
  433. close(c.pong)
  434. for range c.pong {
  435. // Drain
  436. }
  437. }
  438. type ConnectionStatus struct {
  439. SendMonitor flow.Status
  440. RecvMonitor flow.Status
  441. Channels []ChannelStatus
  442. }
  443. type ChannelStatus struct {
  444. ID byte
  445. SendQueueCapacity int
  446. SendQueueSize int
  447. Priority int
  448. RecentlySent int64
  449. }
  450. func (c *MConnection) Status() ConnectionStatus {
  451. var status ConnectionStatus
  452. status.SendMonitor = c.sendMonitor.Status()
  453. status.RecvMonitor = c.recvMonitor.Status()
  454. status.Channels = make([]ChannelStatus, len(c.channels))
  455. for i, channel := range c.channels {
  456. status.Channels[i] = ChannelStatus{
  457. ID: channel.desc.ID,
  458. SendQueueCapacity: cap(channel.sendQueue),
  459. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  460. Priority: channel.desc.Priority,
  461. RecentlySent: channel.recentlySent,
  462. }
  463. }
  464. return status
  465. }
  466. //-----------------------------------------------------------------------------
  467. type ChannelDescriptor struct {
  468. ID byte
  469. Priority int
  470. SendQueueCapacity int
  471. RecvBufferCapacity int
  472. RecvMessageCapacity int
  473. }
  474. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  475. if chDesc.SendQueueCapacity == 0 {
  476. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  477. }
  478. if chDesc.RecvBufferCapacity == 0 {
  479. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  480. }
  481. if chDesc.RecvMessageCapacity == 0 {
  482. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  483. }
  484. filled = chDesc
  485. return
  486. }
  487. // TODO: lowercase.
  488. // NOTE: not goroutine-safe.
  489. type Channel struct {
  490. conn *MConnection
  491. desc ChannelDescriptor
  492. sendQueue chan []byte
  493. sendQueueSize int32 // atomic.
  494. recving []byte
  495. sending []byte
  496. recentlySent int64 // exponential moving average
  497. maxMsgPacketPayloadSize int
  498. }
  499. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  500. desc = desc.FillDefaults()
  501. if desc.Priority <= 0 {
  502. cmn.PanicSanity("Channel default priority must be a positive integer")
  503. }
  504. return &Channel{
  505. conn: conn,
  506. desc: desc,
  507. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  508. recving: make([]byte, 0, desc.RecvBufferCapacity),
  509. maxMsgPacketPayloadSize: conn.config.maxMsgPacketPayloadSize,
  510. }
  511. }
  512. // Queues message to send to this channel.
  513. // Goroutine-safe
  514. // Times out (and returns false) after defaultSendTimeout
  515. func (ch *Channel) sendBytes(bytes []byte) bool {
  516. select {
  517. case ch.sendQueue <- bytes:
  518. atomic.AddInt32(&ch.sendQueueSize, 1)
  519. return true
  520. case <-time.After(defaultSendTimeout):
  521. return false
  522. }
  523. }
  524. // Queues message to send to this channel.
  525. // Nonblocking, returns true if successful.
  526. // Goroutine-safe
  527. func (ch *Channel) trySendBytes(bytes []byte) bool {
  528. select {
  529. case ch.sendQueue <- bytes:
  530. atomic.AddInt32(&ch.sendQueueSize, 1)
  531. return true
  532. default:
  533. return false
  534. }
  535. }
  536. // Goroutine-safe
  537. func (ch *Channel) loadSendQueueSize() (size int) {
  538. return int(atomic.LoadInt32(&ch.sendQueueSize))
  539. }
  540. // Goroutine-safe
  541. // Use only as a heuristic.
  542. func (ch *Channel) canSend() bool {
  543. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  544. }
  545. // Returns true if any msgPackets are pending to be sent.
  546. // Call before calling nextMsgPacket()
  547. // Goroutine-safe
  548. func (ch *Channel) isSendPending() bool {
  549. if len(ch.sending) == 0 {
  550. if len(ch.sendQueue) == 0 {
  551. return false
  552. }
  553. ch.sending = <-ch.sendQueue
  554. }
  555. return true
  556. }
  557. // Creates a new msgPacket to send.
  558. // Not goroutine-safe
  559. func (ch *Channel) nextMsgPacket() msgPacket {
  560. packet := msgPacket{}
  561. packet.ChannelID = byte(ch.desc.ID)
  562. maxSize := ch.maxMsgPacketPayloadSize
  563. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  564. if len(ch.sending) <= maxSize {
  565. packet.EOF = byte(0x01)
  566. ch.sending = nil
  567. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  568. } else {
  569. packet.EOF = byte(0x00)
  570. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  571. }
  572. return packet
  573. }
  574. // Writes next msgPacket to w.
  575. // Not goroutine-safe
  576. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  577. packet := ch.nextMsgPacket()
  578. // log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  579. writeMsgPacketTo(packet, w, &n, &err)
  580. if err == nil {
  581. ch.recentlySent += int64(n)
  582. }
  583. return
  584. }
  585. func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) {
  586. legacy.WriteOctet(packetTypeMsg, w, n, err)
  587. wire.WriteBinary(packet, w, n, err)
  588. }
  589. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  590. // Not goroutine-safe
  591. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  592. // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  593. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  594. return nil, wire.ErrBinaryReadOverflow
  595. }
  596. ch.recving = append(ch.recving, packet.Bytes...)
  597. if packet.EOF == byte(0x01) {
  598. msgBytes := ch.recving
  599. // clear the slice without re-allocating.
  600. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  601. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  602. // at which point the recving slice stops being used and should be garbage collected
  603. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  604. return msgBytes, nil
  605. }
  606. return nil, nil
  607. }
  608. // Call this periodically to update stats for throttling purposes.
  609. // Not goroutine-safe
  610. func (ch *Channel) updateStats() {
  611. // Exponential decay of stats.
  612. // TODO: optimize.
  613. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  614. }
  615. //-----------------------------------------------------------------------------
  616. const (
  617. defaultMaxMsgPacketPayloadSize = 1024
  618. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  619. packetTypePing = byte(0x01)
  620. packetTypePong = byte(0x02)
  621. packetTypeMsg = byte(0x03)
  622. )
  623. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  624. type msgPacket struct {
  625. ChannelID byte
  626. EOF byte // 1 means message ends here.
  627. Bytes []byte
  628. }
  629. func (p msgPacket) String() string {
  630. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  631. }