You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

730 lines
20 KiB

9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. wire "github.com/tendermint/go-wire"
  12. tmlegacy "github.com/tendermint/go-wire/nowriter/tmlegacy"
  13. cmn "github.com/tendermint/tmlibs/common"
  14. flow "github.com/tendermint/tmlibs/flowrate"
  15. "github.com/tendermint/tmlibs/log"
  16. )
  17. var legacy = tmlegacy.TMEncoderLegacy{}
  18. const (
  19. numBatchMsgPackets = 10
  20. minReadBufferSize = 1024
  21. minWriteBufferSize = 65536
  22. updateStats = 2 * time.Second
  23. pingTimeout = 40 * time.Second
  24. // some of these defaults are written in the user config
  25. // flushThrottle, sendRate, recvRate
  26. // TODO: remove values present in config
  27. defaultFlushThrottle = 100 * time.Millisecond
  28. defaultSendQueueCapacity = 1
  29. defaultRecvBufferCapacity = 4096
  30. defaultRecvMessageCapacity = 22020096 // 21MB
  31. defaultSendRate = int64(512000) // 500KB/s
  32. defaultRecvRate = int64(512000) // 500KB/s
  33. defaultSendTimeout = 10 * time.Second
  34. )
  35. type receiveCbFunc func(chID byte, msgBytes []byte)
  36. type errorCbFunc func(interface{})
  37. /*
  38. Each peer has one `MConnection` (multiplex connection) instance.
  39. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  40. several messages along a single channel of communication.
  41. Each `MConnection` handles message transmission on multiple abstract communication
  42. `Channel`s. Each channel has a globally unique byte id.
  43. The byte id and the relative priorities of each `Channel` are configured upon
  44. initialization of the connection.
  45. There are two methods for sending messages:
  46. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  47. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  48. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  49. for the channel with the given id byte `chID`, or until the request times out.
  50. The message `msg` is serialized using the `tendermint/wire` submodule's
  51. `WriteBinary()` reflection routine.
  52. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  53. queue is full.
  54. Inbound message bytes are handled with an onReceive callback function.
  55. */
  56. type MConnection struct {
  57. cmn.BaseService
  58. conn net.Conn
  59. bufReader *bufio.Reader
  60. bufWriter *bufio.Writer
  61. sendMonitor *flow.Monitor
  62. recvMonitor *flow.Monitor
  63. send chan struct{}
  64. pong chan struct{}
  65. channels []*Channel
  66. channelsIdx map[byte]*Channel
  67. onReceive receiveCbFunc
  68. onError errorCbFunc
  69. errored uint32
  70. config *MConnConfig
  71. quit chan struct{}
  72. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  73. pingTimer *cmn.RepeatTimer // send pings periodically
  74. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  75. created time.Time // time of creation
  76. }
  77. // MConnConfig is a MConnection configuration.
  78. type MConnConfig struct {
  79. SendRate int64 `mapstructure:"send_rate"`
  80. RecvRate int64 `mapstructure:"recv_rate"`
  81. maxMsgPacketPayloadSize int
  82. flushThrottle time.Duration
  83. }
  84. func (cfg *MConnConfig) maxMsgPacketTotalSize() int {
  85. return cfg.maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  86. }
  87. // DefaultMConnConfig returns the default config.
  88. func DefaultMConnConfig() *MConnConfig {
  89. return &MConnConfig{
  90. SendRate: defaultSendRate,
  91. RecvRate: defaultRecvRate,
  92. maxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize,
  93. flushThrottle: defaultFlushThrottle,
  94. }
  95. }
  96. // NewMConnection wraps net.Conn and creates multiplex connection
  97. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  98. return NewMConnectionWithConfig(
  99. conn,
  100. chDescs,
  101. onReceive,
  102. onError,
  103. DefaultMConnConfig())
  104. }
  105. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  106. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
  107. mconn := &MConnection{
  108. conn: conn,
  109. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  110. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  111. sendMonitor: flow.New(0, 0),
  112. recvMonitor: flow.New(0, 0),
  113. send: make(chan struct{}, 1),
  114. pong: make(chan struct{}),
  115. onReceive: onReceive,
  116. onError: onError,
  117. config: config,
  118. }
  119. // Create channels
  120. var channelsIdx = map[byte]*Channel{}
  121. var channels = []*Channel{}
  122. for _, desc := range chDescs {
  123. channel := newChannel(mconn, *desc)
  124. channelsIdx[channel.desc.ID] = channel
  125. channels = append(channels, channel)
  126. }
  127. mconn.channels = channels
  128. mconn.channelsIdx = channelsIdx
  129. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  130. return mconn
  131. }
  132. func (c *MConnection) SetLogger(l log.Logger) {
  133. c.BaseService.SetLogger(l)
  134. for _, ch := range c.channels {
  135. ch.SetLogger(l)
  136. }
  137. }
  138. // OnStart implements BaseService
  139. func (c *MConnection) OnStart() error {
  140. if err := c.BaseService.OnStart(); err != nil {
  141. return err
  142. }
  143. c.quit = make(chan struct{})
  144. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle)
  145. c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
  146. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
  147. go c.sendRoutine()
  148. go c.recvRoutine()
  149. return nil
  150. }
  151. // OnStop implements BaseService
  152. func (c *MConnection) OnStop() {
  153. c.BaseService.OnStop()
  154. c.flushTimer.Stop()
  155. c.pingTimer.Stop()
  156. c.chStatsTimer.Stop()
  157. if c.quit != nil {
  158. close(c.quit)
  159. }
  160. c.conn.Close() // nolint: errcheck
  161. // We can't close pong safely here because
  162. // recvRoutine may write to it after we've stopped.
  163. // Though it doesn't need to get closed at all,
  164. // we close it @ recvRoutine.
  165. // close(c.pong)
  166. }
  167. func (c *MConnection) String() string {
  168. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  169. }
  170. func (c *MConnection) flush() {
  171. c.Logger.Debug("Flush", "conn", c)
  172. err := c.bufWriter.Flush()
  173. if err != nil {
  174. c.Logger.Error("MConnection flush failed", "err", err)
  175. }
  176. }
  177. // Catch panics, usually caused by remote disconnects.
  178. func (c *MConnection) _recover() {
  179. if r := recover(); r != nil {
  180. stack := debug.Stack()
  181. err := cmn.StackError{r, stack}
  182. c.stopForError(err)
  183. }
  184. }
  185. func (c *MConnection) stopForError(r interface{}) {
  186. c.Stop()
  187. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  188. if c.onError != nil {
  189. c.onError(r)
  190. }
  191. }
  192. }
  193. // Queues a message to be sent to channel.
  194. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  195. if !c.IsRunning() {
  196. return false
  197. }
  198. c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  199. // Send message to channel.
  200. channel, ok := c.channelsIdx[chID]
  201. if !ok {
  202. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  203. return false
  204. }
  205. success := channel.sendBytes(wire.BinaryBytes(msg))
  206. if success {
  207. // Wake up sendRoutine if necessary
  208. select {
  209. case c.send <- struct{}{}:
  210. default:
  211. }
  212. } else {
  213. c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
  214. }
  215. return success
  216. }
  217. // Queues a message to be sent to channel.
  218. // Nonblocking, returns true if successful.
  219. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  220. if !c.IsRunning() {
  221. return false
  222. }
  223. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  224. // Send message to channel.
  225. channel, ok := c.channelsIdx[chID]
  226. if !ok {
  227. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  228. return false
  229. }
  230. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  231. if ok {
  232. // Wake up sendRoutine if necessary
  233. select {
  234. case c.send <- struct{}{}:
  235. default:
  236. }
  237. }
  238. return ok
  239. }
  240. // CanSend returns true if you can send more data onto the chID, false
  241. // otherwise. Use only as a heuristic.
  242. func (c *MConnection) CanSend(chID byte) bool {
  243. if !c.IsRunning() {
  244. return false
  245. }
  246. channel, ok := c.channelsIdx[chID]
  247. if !ok {
  248. c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
  249. return false
  250. }
  251. return channel.canSend()
  252. }
  253. // sendRoutine polls for packets to send from channels.
  254. func (c *MConnection) sendRoutine() {
  255. defer c._recover()
  256. FOR_LOOP:
  257. for {
  258. var n int
  259. var err error
  260. select {
  261. case <-c.flushTimer.Ch:
  262. // NOTE: flushTimer.Set() must be called every time
  263. // something is written to .bufWriter.
  264. c.flush()
  265. case <-c.chStatsTimer.Chan():
  266. for _, channel := range c.channels {
  267. channel.updateStats()
  268. }
  269. case <-c.pingTimer.Chan():
  270. c.Logger.Debug("Send Ping")
  271. legacy.WriteOctet(packetTypePing, c.bufWriter, &n, &err)
  272. c.sendMonitor.Update(int(n))
  273. c.flush()
  274. case <-c.pong:
  275. c.Logger.Debug("Send Pong")
  276. legacy.WriteOctet(packetTypePong, c.bufWriter, &n, &err)
  277. c.sendMonitor.Update(int(n))
  278. c.flush()
  279. case <-c.quit:
  280. break FOR_LOOP
  281. case <-c.send:
  282. // Send some msgPackets
  283. eof := c.sendSomeMsgPackets()
  284. if !eof {
  285. // Keep sendRoutine awake.
  286. select {
  287. case c.send <- struct{}{}:
  288. default:
  289. }
  290. }
  291. }
  292. if !c.IsRunning() {
  293. break FOR_LOOP
  294. }
  295. if err != nil {
  296. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  297. c.stopForError(err)
  298. break FOR_LOOP
  299. }
  300. }
  301. // Cleanup
  302. }
  303. // Returns true if messages from channels were exhausted.
  304. // Blocks in accordance to .sendMonitor throttling.
  305. func (c *MConnection) sendSomeMsgPackets() bool {
  306. // Block until .sendMonitor says we can write.
  307. // Once we're ready we send more than we asked for,
  308. // but amortized it should even out.
  309. c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true)
  310. // Now send some msgPackets.
  311. for i := 0; i < numBatchMsgPackets; i++ {
  312. if c.sendMsgPacket() {
  313. return true
  314. }
  315. }
  316. return false
  317. }
  318. // Returns true if messages from channels were exhausted.
  319. func (c *MConnection) sendMsgPacket() bool {
  320. // Choose a channel to create a msgPacket from.
  321. // The chosen channel will be the one whose recentlySent/priority is the least.
  322. var leastRatio float32 = math.MaxFloat32
  323. var leastChannel *Channel
  324. for _, channel := range c.channels {
  325. // If nothing to send, skip this channel
  326. if !channel.isSendPending() {
  327. continue
  328. }
  329. // Get ratio, and keep track of lowest ratio.
  330. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  331. if ratio < leastRatio {
  332. leastRatio = ratio
  333. leastChannel = channel
  334. }
  335. }
  336. // Nothing to send?
  337. if leastChannel == nil {
  338. return true
  339. } else {
  340. // c.Logger.Info("Found a msgPacket to send")
  341. }
  342. // Make & send a msgPacket from this channel
  343. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  344. if err != nil {
  345. c.Logger.Error("Failed to write msgPacket", "err", err)
  346. c.stopForError(err)
  347. return true
  348. }
  349. c.sendMonitor.Update(int(n))
  350. c.flushTimer.Set()
  351. return false
  352. }
  353. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  354. // After a whole message has been assembled, it's pushed to onReceive().
  355. // Blocks depending on how the connection is throttled.
  356. func (c *MConnection) recvRoutine() {
  357. defer c._recover()
  358. FOR_LOOP:
  359. for {
  360. // Block until .recvMonitor says we can read.
  361. c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true)
  362. /*
  363. // Peek into bufReader for debugging
  364. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  365. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  366. bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
  367. if err == nil {
  368. return bytes
  369. } else {
  370. log.Warn("Error peeking connection buffer", "err", err)
  371. return nil
  372. }
  373. }})
  374. }
  375. */
  376. // Read packet type
  377. var n int
  378. var err error
  379. pktType := wire.ReadByte(c.bufReader, &n, &err)
  380. c.recvMonitor.Update(int(n))
  381. if err != nil {
  382. if c.IsRunning() {
  383. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  384. c.stopForError(err)
  385. }
  386. break FOR_LOOP
  387. }
  388. // Read more depending on packet type.
  389. switch pktType {
  390. case packetTypePing:
  391. // TODO: prevent abuse, as they cause flush()'s.
  392. c.Logger.Debug("Receive Ping")
  393. c.pong <- struct{}{}
  394. case packetTypePong:
  395. // do nothing
  396. c.Logger.Debug("Receive Pong")
  397. case packetTypeMsg:
  398. pkt, n, err := msgPacket{}, int(0), error(nil)
  399. wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err)
  400. c.recvMonitor.Update(int(n))
  401. if err != nil {
  402. if c.IsRunning() {
  403. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  404. c.stopForError(err)
  405. }
  406. break FOR_LOOP
  407. }
  408. channel, ok := c.channelsIdx[pkt.ChannelID]
  409. if !ok || channel == nil {
  410. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  411. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  412. c.stopForError(err)
  413. }
  414. msgBytes, err := channel.recvMsgPacket(pkt)
  415. if err != nil {
  416. if c.IsRunning() {
  417. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  418. c.stopForError(err)
  419. }
  420. break FOR_LOOP
  421. }
  422. if msgBytes != nil {
  423. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  424. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  425. c.onReceive(pkt.ChannelID, msgBytes)
  426. }
  427. default:
  428. err := fmt.Errorf("Unknown message type %X", pktType)
  429. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  430. c.stopForError(err)
  431. }
  432. // TODO: shouldn't this go in the sendRoutine?
  433. // Better to send a ping packet when *we* haven't sent anything for a while.
  434. c.pingTimer.Reset()
  435. }
  436. // Cleanup
  437. close(c.pong)
  438. for range c.pong {
  439. // Drain
  440. }
  441. }
  442. type ConnectionStatus struct {
  443. Duration time.Duration
  444. SendMonitor flow.Status
  445. RecvMonitor flow.Status
  446. Channels []ChannelStatus
  447. }
  448. type ChannelStatus struct {
  449. ID byte
  450. SendQueueCapacity int
  451. SendQueueSize int
  452. Priority int
  453. RecentlySent int64
  454. }
  455. func (c *MConnection) Status() ConnectionStatus {
  456. var status ConnectionStatus
  457. status.Duration = time.Since(c.created)
  458. status.SendMonitor = c.sendMonitor.Status()
  459. status.RecvMonitor = c.recvMonitor.Status()
  460. status.Channels = make([]ChannelStatus, len(c.channels))
  461. for i, channel := range c.channels {
  462. status.Channels[i] = ChannelStatus{
  463. ID: channel.desc.ID,
  464. SendQueueCapacity: cap(channel.sendQueue),
  465. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  466. Priority: channel.desc.Priority,
  467. RecentlySent: channel.recentlySent,
  468. }
  469. }
  470. return status
  471. }
  472. //-----------------------------------------------------------------------------
  473. type ChannelDescriptor struct {
  474. ID byte
  475. Priority int
  476. SendQueueCapacity int
  477. RecvBufferCapacity int
  478. RecvMessageCapacity int
  479. }
  480. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  481. if chDesc.SendQueueCapacity == 0 {
  482. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  483. }
  484. if chDesc.RecvBufferCapacity == 0 {
  485. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  486. }
  487. if chDesc.RecvMessageCapacity == 0 {
  488. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  489. }
  490. filled = chDesc
  491. return
  492. }
  493. // TODO: lowercase.
  494. // NOTE: not goroutine-safe.
  495. type Channel struct {
  496. conn *MConnection
  497. desc ChannelDescriptor
  498. sendQueue chan []byte
  499. sendQueueSize int32 // atomic.
  500. recving []byte
  501. sending []byte
  502. recentlySent int64 // exponential moving average
  503. maxMsgPacketPayloadSize int
  504. Logger log.Logger
  505. }
  506. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  507. desc = desc.FillDefaults()
  508. if desc.Priority <= 0 {
  509. cmn.PanicSanity("Channel default priority must be a positive integer")
  510. }
  511. return &Channel{
  512. conn: conn,
  513. desc: desc,
  514. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  515. recving: make([]byte, 0, desc.RecvBufferCapacity),
  516. maxMsgPacketPayloadSize: conn.config.maxMsgPacketPayloadSize,
  517. }
  518. }
  519. func (ch *Channel) SetLogger(l log.Logger) {
  520. ch.Logger = l
  521. }
  522. // Queues message to send to this channel.
  523. // Goroutine-safe
  524. // Times out (and returns false) after defaultSendTimeout
  525. func (ch *Channel) sendBytes(bytes []byte) bool {
  526. select {
  527. case ch.sendQueue <- bytes:
  528. atomic.AddInt32(&ch.sendQueueSize, 1)
  529. return true
  530. case <-time.After(defaultSendTimeout):
  531. return false
  532. }
  533. }
  534. // Queues message to send to this channel.
  535. // Nonblocking, returns true if successful.
  536. // Goroutine-safe
  537. func (ch *Channel) trySendBytes(bytes []byte) bool {
  538. select {
  539. case ch.sendQueue <- bytes:
  540. atomic.AddInt32(&ch.sendQueueSize, 1)
  541. return true
  542. default:
  543. return false
  544. }
  545. }
  546. // Goroutine-safe
  547. func (ch *Channel) loadSendQueueSize() (size int) {
  548. return int(atomic.LoadInt32(&ch.sendQueueSize))
  549. }
  550. // Goroutine-safe
  551. // Use only as a heuristic.
  552. func (ch *Channel) canSend() bool {
  553. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  554. }
  555. // Returns true if any msgPackets are pending to be sent.
  556. // Call before calling nextMsgPacket()
  557. // Goroutine-safe
  558. func (ch *Channel) isSendPending() bool {
  559. if len(ch.sending) == 0 {
  560. if len(ch.sendQueue) == 0 {
  561. return false
  562. }
  563. ch.sending = <-ch.sendQueue
  564. }
  565. return true
  566. }
  567. // Creates a new msgPacket to send.
  568. // Not goroutine-safe
  569. func (ch *Channel) nextMsgPacket() msgPacket {
  570. packet := msgPacket{}
  571. packet.ChannelID = byte(ch.desc.ID)
  572. maxSize := ch.maxMsgPacketPayloadSize
  573. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  574. if len(ch.sending) <= maxSize {
  575. packet.EOF = byte(0x01)
  576. ch.sending = nil
  577. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  578. } else {
  579. packet.EOF = byte(0x00)
  580. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  581. }
  582. return packet
  583. }
  584. // Writes next msgPacket to w.
  585. // Not goroutine-safe
  586. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  587. packet := ch.nextMsgPacket()
  588. ch.Logger.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  589. writeMsgPacketTo(packet, w, &n, &err)
  590. if err == nil {
  591. ch.recentlySent += int64(n)
  592. }
  593. return
  594. }
  595. func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) {
  596. legacy.WriteOctet(packetTypeMsg, w, n, err)
  597. wire.WriteBinary(packet, w, n, err)
  598. }
  599. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  600. // Not goroutine-safe
  601. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  602. ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  603. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  604. return nil, wire.ErrBinaryReadOverflow
  605. }
  606. ch.recving = append(ch.recving, packet.Bytes...)
  607. if packet.EOF == byte(0x01) {
  608. msgBytes := ch.recving
  609. // clear the slice without re-allocating.
  610. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  611. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  612. // at which point the recving slice stops being used and should be garbage collected
  613. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  614. return msgBytes, nil
  615. }
  616. return nil, nil
  617. }
  618. // Call this periodically to update stats for throttling purposes.
  619. // Not goroutine-safe
  620. func (ch *Channel) updateStats() {
  621. // Exponential decay of stats.
  622. // TODO: optimize.
  623. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  624. }
  625. //-----------------------------------------------------------------------------
  626. const (
  627. defaultMaxMsgPacketPayloadSize = 1024
  628. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  629. packetTypePing = byte(0x01)
  630. packetTypePong = byte(0x02)
  631. packetTypeMsg = byte(0x03)
  632. )
  633. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  634. type msgPacket struct {
  635. ChannelID byte
  636. EOF byte // 1 means message ends here.
  637. Bytes []byte
  638. }
  639. func (p msgPacket) String() string {
  640. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  641. }