You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

716 lines
19 KiB

9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. wire "github.com/tendermint/go-wire"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. flow "github.com/tendermint/tmlibs/flowrate"
  14. )
  15. const (
  16. numBatchMsgPackets = 10
  17. minReadBufferSize = 1024
  18. minWriteBufferSize = 65536
  19. updateState = 2 * time.Second
  20. pingTimeout = 40 * time.Second
  21. // some of these defaults are written in the user config
  22. // flushThrottle, sendRate, recvRate
  23. // TODO: remove values present in config
  24. defaultFlushThrottle = 100 * time.Millisecond
  25. defaultSendQueueCapacity = 1
  26. defaultRecvBufferCapacity = 4096
  27. defaultRecvMessageCapacity = 22020096 // 21MB
  28. defaultSendRate = int64(512000) // 500KB/s
  29. defaultRecvRate = int64(512000) // 500KB/s
  30. defaultSendTimeout = 10 * time.Second
  31. )
  32. type receiveCbFunc func(chID byte, msgBytes []byte)
  33. type errorCbFunc func(interface{})
  34. /*
  35. Each peer has one `MConnection` (multiplex connection) instance.
  36. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  37. several messages along a single channel of communication.
  38. Each `MConnection` handles message transmission on multiple abstract communication
  39. `Channel`s. Each channel has a globally unique byte id.
  40. The byte id and the relative priorities of each `Channel` are configured upon
  41. initialization of the connection.
  42. There are two methods for sending messages:
  43. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  44. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  45. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  46. for the channel with the given id byte `chID`, or until the request times out.
  47. The message `msg` is serialized using the `tendermint/wire` submodule's
  48. `WriteBinary()` reflection routine.
  49. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  50. queue is full.
  51. Inbound message bytes are handled with an onReceive callback function.
  52. */
  53. type MConnection struct {
  54. cmn.BaseService
  55. conn net.Conn
  56. bufReader *bufio.Reader
  57. bufWriter *bufio.Writer
  58. sendMonitor *flow.Monitor
  59. recvMonitor *flow.Monitor
  60. send chan struct{}
  61. pong chan struct{}
  62. channels []*Channel
  63. channelsIdx map[byte]*Channel
  64. onReceive receiveCbFunc
  65. onError errorCbFunc
  66. errored uint32
  67. config *MConnConfig
  68. quit chan struct{}
  69. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  70. pingTimer *cmn.RepeatTimer // send pings periodically
  71. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  72. LocalAddress *NetAddress
  73. RemoteAddress *NetAddress
  74. }
  75. // MConnConfig is a MConnection configuration.
  76. type MConnConfig struct {
  77. SendRate int64 `mapstructure:"send_rate"`
  78. RecvRate int64 `mapstructure:"recv_rate"`
  79. maxMsgPacketPayloadSize int
  80. flushThrottle time.Duration
  81. }
  82. func (cfg *MConnConfig) maxMsgPacketTotalSize() int {
  83. return cfg.maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  84. }
  85. // DefaultMConnConfig returns the default config.
  86. func DefaultMConnConfig() *MConnConfig {
  87. return &MConnConfig{
  88. SendRate: defaultSendRate,
  89. RecvRate: defaultRecvRate,
  90. maxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize,
  91. flushThrottle: defaultFlushThrottle,
  92. }
  93. }
  94. // NewMConnection wraps net.Conn and creates multiplex connection
  95. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  96. return NewMConnectionWithConfig(
  97. conn,
  98. chDescs,
  99. onReceive,
  100. onError,
  101. DefaultMConnConfig())
  102. }
  103. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  104. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
  105. mconn := &MConnection{
  106. conn: conn,
  107. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  108. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  109. sendMonitor: flow.New(0, 0),
  110. recvMonitor: flow.New(0, 0),
  111. send: make(chan struct{}, 1),
  112. pong: make(chan struct{}),
  113. onReceive: onReceive,
  114. onError: onError,
  115. config: config,
  116. LocalAddress: NewNetAddress(conn.LocalAddr()),
  117. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  118. }
  119. // Create channels
  120. var channelsIdx = map[byte]*Channel{}
  121. var channels = []*Channel{}
  122. for _, desc := range chDescs {
  123. descCopy := *desc // copy the desc else unsafe access across connections
  124. channel := newChannel(mconn, &descCopy)
  125. channelsIdx[channel.id] = channel
  126. channels = append(channels, channel)
  127. }
  128. mconn.channels = channels
  129. mconn.channelsIdx = channelsIdx
  130. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  131. return mconn
  132. }
  133. // OnStart implements BaseService
  134. func (c *MConnection) OnStart() error {
  135. c.BaseService.OnStart()
  136. c.quit = make(chan struct{})
  137. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle)
  138. c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
  139. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
  140. go c.sendRoutine()
  141. go c.recvRoutine()
  142. return nil
  143. }
  144. // OnStop implements BaseService
  145. func (c *MConnection) OnStop() {
  146. c.BaseService.OnStop()
  147. c.flushTimer.Stop()
  148. c.pingTimer.Stop()
  149. c.chStatsTimer.Stop()
  150. if c.quit != nil {
  151. close(c.quit)
  152. }
  153. c.conn.Close()
  154. // We can't close pong safely here because
  155. // recvRoutine may write to it after we've stopped.
  156. // Though it doesn't need to get closed at all,
  157. // we close it @ recvRoutine.
  158. // close(c.pong)
  159. }
  160. func (c *MConnection) String() string {
  161. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  162. }
  163. func (c *MConnection) flush() {
  164. c.Logger.Debug("Flush", "conn", c)
  165. err := c.bufWriter.Flush()
  166. if err != nil {
  167. c.Logger.Error("MConnection flush failed", "err", err)
  168. }
  169. }
  170. // Catch panics, usually caused by remote disconnects.
  171. func (c *MConnection) _recover() {
  172. if r := recover(); r != nil {
  173. stack := debug.Stack()
  174. err := cmn.StackError{r, stack}
  175. c.stopForError(err)
  176. }
  177. }
  178. func (c *MConnection) stopForError(r interface{}) {
  179. c.Stop()
  180. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  181. if c.onError != nil {
  182. c.onError(r)
  183. }
  184. }
  185. }
  186. // Queues a message to be sent to channel.
  187. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  188. if !c.IsRunning() {
  189. return false
  190. }
  191. c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  192. // Send message to channel.
  193. channel, ok := c.channelsIdx[chID]
  194. if !ok {
  195. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  196. return false
  197. }
  198. success := channel.sendBytes(wire.BinaryBytes(msg))
  199. if success {
  200. // Wake up sendRoutine if necessary
  201. select {
  202. case c.send <- struct{}{}:
  203. default:
  204. }
  205. } else {
  206. c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
  207. }
  208. return success
  209. }
  210. // Queues a message to be sent to channel.
  211. // Nonblocking, returns true if successful.
  212. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  213. if !c.IsRunning() {
  214. return false
  215. }
  216. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  217. // Send message to channel.
  218. channel, ok := c.channelsIdx[chID]
  219. if !ok {
  220. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  221. return false
  222. }
  223. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  224. if ok {
  225. // Wake up sendRoutine if necessary
  226. select {
  227. case c.send <- struct{}{}:
  228. default:
  229. }
  230. }
  231. return ok
  232. }
  233. // CanSend returns true if you can send more data onto the chID, false
  234. // otherwise. Use only as a heuristic.
  235. func (c *MConnection) CanSend(chID byte) bool {
  236. if !c.IsRunning() {
  237. return false
  238. }
  239. channel, ok := c.channelsIdx[chID]
  240. if !ok {
  241. c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
  242. return false
  243. }
  244. return channel.canSend()
  245. }
  246. // sendRoutine polls for packets to send from channels.
  247. func (c *MConnection) sendRoutine() {
  248. defer c._recover()
  249. FOR_LOOP:
  250. for {
  251. var n int
  252. var err error
  253. select {
  254. case <-c.flushTimer.Ch:
  255. // NOTE: flushTimer.Set() must be called every time
  256. // something is written to .bufWriter.
  257. c.flush()
  258. case <-c.chStatsTimer.Ch:
  259. for _, channel := range c.channels {
  260. channel.updateStats()
  261. }
  262. case <-c.pingTimer.Ch:
  263. c.Logger.Debug("Send Ping")
  264. wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  265. c.sendMonitor.Update(int(n))
  266. c.flush()
  267. case <-c.pong:
  268. c.Logger.Debug("Send Pong")
  269. wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  270. c.sendMonitor.Update(int(n))
  271. c.flush()
  272. case <-c.quit:
  273. break FOR_LOOP
  274. case <-c.send:
  275. // Send some msgPackets
  276. eof := c.sendSomeMsgPackets()
  277. if !eof {
  278. // Keep sendRoutine awake.
  279. select {
  280. case c.send <- struct{}{}:
  281. default:
  282. }
  283. }
  284. }
  285. if !c.IsRunning() {
  286. break FOR_LOOP
  287. }
  288. if err != nil {
  289. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  290. c.stopForError(err)
  291. break FOR_LOOP
  292. }
  293. }
  294. // Cleanup
  295. }
  296. // Returns true if messages from channels were exhausted.
  297. // Blocks in accordance to .sendMonitor throttling.
  298. func (c *MConnection) sendSomeMsgPackets() bool {
  299. // Block until .sendMonitor says we can write.
  300. // Once we're ready we send more than we asked for,
  301. // but amortized it should even out.
  302. c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true)
  303. // Now send some msgPackets.
  304. for i := 0; i < numBatchMsgPackets; i++ {
  305. if c.sendMsgPacket() {
  306. return true
  307. }
  308. }
  309. return false
  310. }
  311. // Returns true if messages from channels were exhausted.
  312. func (c *MConnection) sendMsgPacket() bool {
  313. // Choose a channel to create a msgPacket from.
  314. // The chosen channel will be the one whose recentlySent/priority is the least.
  315. var leastRatio float32 = math.MaxFloat32
  316. var leastChannel *Channel
  317. for _, channel := range c.channels {
  318. // If nothing to send, skip this channel
  319. if !channel.isSendPending() {
  320. continue
  321. }
  322. // Get ratio, and keep track of lowest ratio.
  323. ratio := float32(channel.recentlySent) / float32(channel.priority)
  324. if ratio < leastRatio {
  325. leastRatio = ratio
  326. leastChannel = channel
  327. }
  328. }
  329. // Nothing to send?
  330. if leastChannel == nil {
  331. return true
  332. } else {
  333. // c.Logger.Info("Found a msgPacket to send")
  334. }
  335. // Make & send a msgPacket from this channel
  336. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  337. if err != nil {
  338. c.Logger.Error("Failed to write msgPacket", "err", err)
  339. c.stopForError(err)
  340. return true
  341. }
  342. c.sendMonitor.Update(int(n))
  343. c.flushTimer.Set()
  344. return false
  345. }
  346. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  347. // After a whole message has been assembled, it's pushed to onReceive().
  348. // Blocks depending on how the connection is throttled.
  349. func (c *MConnection) recvRoutine() {
  350. defer c._recover()
  351. FOR_LOOP:
  352. for {
  353. // Block until .recvMonitor says we can read.
  354. c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true)
  355. /*
  356. // Peek into bufReader for debugging
  357. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  358. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  359. bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
  360. if err == nil {
  361. return bytes
  362. } else {
  363. log.Warn("Error peeking connection buffer", "err", err)
  364. return nil
  365. }
  366. }})
  367. }
  368. */
  369. // Read packet type
  370. var n int
  371. var err error
  372. pktType := wire.ReadByte(c.bufReader, &n, &err)
  373. c.recvMonitor.Update(int(n))
  374. if err != nil {
  375. if c.IsRunning() {
  376. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  377. c.stopForError(err)
  378. }
  379. break FOR_LOOP
  380. }
  381. // Read more depending on packet type.
  382. switch pktType {
  383. case packetTypePing:
  384. // TODO: prevent abuse, as they cause flush()'s.
  385. c.Logger.Debug("Receive Ping")
  386. c.pong <- struct{}{}
  387. case packetTypePong:
  388. // do nothing
  389. c.Logger.Debug("Receive Pong")
  390. case packetTypeMsg:
  391. pkt, n, err := msgPacket{}, int(0), error(nil)
  392. wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err)
  393. c.recvMonitor.Update(int(n))
  394. if err != nil {
  395. if c.IsRunning() {
  396. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  397. c.stopForError(err)
  398. }
  399. break FOR_LOOP
  400. }
  401. channel, ok := c.channelsIdx[pkt.ChannelID]
  402. if !ok || channel == nil {
  403. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  404. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  405. c.stopForError(err)
  406. }
  407. msgBytes, err := channel.recvMsgPacket(pkt)
  408. if err != nil {
  409. if c.IsRunning() {
  410. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  411. c.stopForError(err)
  412. }
  413. break FOR_LOOP
  414. }
  415. if msgBytes != nil {
  416. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  417. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  418. c.onReceive(pkt.ChannelID, msgBytes)
  419. }
  420. default:
  421. err := fmt.Errorf("Unknown message type %X", pktType)
  422. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  423. c.stopForError(err)
  424. }
  425. // TODO: shouldn't this go in the sendRoutine?
  426. // Better to send a ping packet when *we* haven't sent anything for a while.
  427. c.pingTimer.Reset()
  428. }
  429. // Cleanup
  430. close(c.pong)
  431. for range c.pong {
  432. // Drain
  433. }
  434. }
  435. type ConnectionStatus struct {
  436. SendMonitor flow.Status
  437. RecvMonitor flow.Status
  438. Channels []ChannelStatus
  439. }
  440. type ChannelStatus struct {
  441. ID byte
  442. SendQueueCapacity int
  443. SendQueueSize int
  444. Priority int
  445. RecentlySent int64
  446. }
  447. func (c *MConnection) Status() ConnectionStatus {
  448. var status ConnectionStatus
  449. status.SendMonitor = c.sendMonitor.Status()
  450. status.RecvMonitor = c.recvMonitor.Status()
  451. status.Channels = make([]ChannelStatus, len(c.channels))
  452. for i, channel := range c.channels {
  453. status.Channels[i] = ChannelStatus{
  454. ID: channel.id,
  455. SendQueueCapacity: cap(channel.sendQueue),
  456. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  457. Priority: channel.priority,
  458. RecentlySent: channel.recentlySent,
  459. }
  460. }
  461. return status
  462. }
  463. //-----------------------------------------------------------------------------
  464. type ChannelDescriptor struct {
  465. ID byte
  466. Priority int
  467. SendQueueCapacity int
  468. RecvBufferCapacity int
  469. RecvMessageCapacity int
  470. }
  471. func (chDesc *ChannelDescriptor) FillDefaults() {
  472. if chDesc.SendQueueCapacity == 0 {
  473. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  474. }
  475. if chDesc.RecvBufferCapacity == 0 {
  476. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  477. }
  478. if chDesc.RecvMessageCapacity == 0 {
  479. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  480. }
  481. }
  482. // TODO: lowercase.
  483. // NOTE: not goroutine-safe.
  484. type Channel struct {
  485. conn *MConnection
  486. desc *ChannelDescriptor
  487. id byte
  488. sendQueue chan []byte
  489. sendQueueSize int32 // atomic.
  490. recving []byte
  491. sending []byte
  492. priority int
  493. recentlySent int64 // exponential moving average
  494. maxMsgPacketPayloadSize int
  495. }
  496. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  497. desc.FillDefaults()
  498. if desc.Priority <= 0 {
  499. cmn.PanicSanity("Channel default priority must be a postive integer")
  500. }
  501. return &Channel{
  502. conn: conn,
  503. desc: desc,
  504. id: desc.ID,
  505. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  506. recving: make([]byte, 0, desc.RecvBufferCapacity),
  507. priority: desc.Priority,
  508. maxMsgPacketPayloadSize: conn.config.maxMsgPacketPayloadSize,
  509. }
  510. }
  511. // Queues message to send to this channel.
  512. // Goroutine-safe
  513. // Times out (and returns false) after defaultSendTimeout
  514. func (ch *Channel) sendBytes(bytes []byte) bool {
  515. select {
  516. case ch.sendQueue <- bytes:
  517. atomic.AddInt32(&ch.sendQueueSize, 1)
  518. return true
  519. case <-time.After(defaultSendTimeout):
  520. return false
  521. }
  522. }
  523. // Queues message to send to this channel.
  524. // Nonblocking, returns true if successful.
  525. // Goroutine-safe
  526. func (ch *Channel) trySendBytes(bytes []byte) bool {
  527. select {
  528. case ch.sendQueue <- bytes:
  529. atomic.AddInt32(&ch.sendQueueSize, 1)
  530. return true
  531. default:
  532. return false
  533. }
  534. }
  535. // Goroutine-safe
  536. func (ch *Channel) loadSendQueueSize() (size int) {
  537. return int(atomic.LoadInt32(&ch.sendQueueSize))
  538. }
  539. // Goroutine-safe
  540. // Use only as a heuristic.
  541. func (ch *Channel) canSend() bool {
  542. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  543. }
  544. // Returns true if any msgPackets are pending to be sent.
  545. // Call before calling nextMsgPacket()
  546. // Goroutine-safe
  547. func (ch *Channel) isSendPending() bool {
  548. if len(ch.sending) == 0 {
  549. if len(ch.sendQueue) == 0 {
  550. return false
  551. }
  552. ch.sending = <-ch.sendQueue
  553. }
  554. return true
  555. }
  556. // Creates a new msgPacket to send.
  557. // Not goroutine-safe
  558. func (ch *Channel) nextMsgPacket() msgPacket {
  559. packet := msgPacket{}
  560. packet.ChannelID = byte(ch.id)
  561. maxSize := ch.maxMsgPacketPayloadSize
  562. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  563. if len(ch.sending) <= maxSize {
  564. packet.EOF = byte(0x01)
  565. ch.sending = nil
  566. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  567. } else {
  568. packet.EOF = byte(0x00)
  569. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  570. }
  571. return packet
  572. }
  573. // Writes next msgPacket to w.
  574. // Not goroutine-safe
  575. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  576. packet := ch.nextMsgPacket()
  577. // log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  578. writeMsgPacketTo(packet, w, &n, &err)
  579. if err == nil {
  580. ch.recentlySent += int64(n)
  581. }
  582. return
  583. }
  584. func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) {
  585. wire.WriteByte(packetTypeMsg, w, n, err)
  586. wire.WriteBinary(packet, w, n, err)
  587. }
  588. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  589. // Not goroutine-safe
  590. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  591. // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  592. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  593. return nil, wire.ErrBinaryReadOverflow
  594. }
  595. ch.recving = append(ch.recving, packet.Bytes...)
  596. if packet.EOF == byte(0x01) {
  597. msgBytes := ch.recving
  598. // clear the slice without re-allocating.
  599. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  600. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  601. // at which point the recving slice stops being used and should be garbage collected
  602. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  603. return msgBytes, nil
  604. }
  605. return nil, nil
  606. }
  607. // Call this periodically to update stats for throttling purposes.
  608. // Not goroutine-safe
  609. func (ch *Channel) updateStats() {
  610. // Exponential decay of stats.
  611. // TODO: optimize.
  612. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  613. }
  614. //-----------------------------------------------------------------------------
  615. const (
  616. defaultMaxMsgPacketPayloadSize = 1024
  617. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  618. packetTypePing = byte(0x01)
  619. packetTypePong = byte(0x02)
  620. packetTypeMsg = byte(0x03)
  621. )
  622. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  623. type msgPacket struct {
  624. ChannelID byte
  625. EOF byte // 1 means message ends here.
  626. Bytes []byte
  627. }
  628. func (p msgPacket) String() string {
  629. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  630. }