You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

741 lines
20 KiB

9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package conn
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. wire "github.com/tendermint/go-wire"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. flow "github.com/tendermint/tmlibs/flowrate"
  14. "github.com/tendermint/tmlibs/log"
  15. )
  16. const (
  17. numBatchMsgPackets = 10
  18. minReadBufferSize = 1024
  19. minWriteBufferSize = 65536
  20. updateStats = 2 * time.Second
  21. pingTimeout = 40 * time.Second
  22. // some of these defaults are written in the user config
  23. // flushThrottle, sendRate, recvRate
  24. // TODO: remove values present in config
  25. defaultFlushThrottle = 100 * time.Millisecond
  26. defaultSendQueueCapacity = 1
  27. defaultRecvBufferCapacity = 4096
  28. defaultRecvMessageCapacity = 22020096 // 21MB
  29. defaultSendRate = int64(512000) // 500KB/s
  30. defaultRecvRate = int64(512000) // 500KB/s
  31. defaultSendTimeout = 10 * time.Second
  32. )
  33. type receiveCbFunc func(chID byte, msgBytes []byte)
  34. type errorCbFunc func(interface{})
  35. /*
  36. Each peer has one `MConnection` (multiplex connection) instance.
  37. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  38. several messages along a single channel of communication.
  39. Each `MConnection` handles message transmission on multiple abstract communication
  40. `Channel`s. Each channel has a globally unique byte id.
  41. The byte id and the relative priorities of each `Channel` are configured upon
  42. initialization of the connection.
  43. There are two methods for sending messages:
  44. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  45. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  46. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  47. for the channel with the given id byte `chID`, or until the request times out.
  48. The message `msg` is serialized using the `tendermint/wire` submodule's
  49. `WriteBinary()` reflection routine.
  50. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  51. queue is full.
  52. Inbound message bytes are handled with an onReceive callback function.
  53. */
  54. type MConnection struct {
  55. cmn.BaseService
  56. conn net.Conn
  57. bufReader *bufio.Reader
  58. bufWriter *bufio.Writer
  59. sendMonitor *flow.Monitor
  60. recvMonitor *flow.Monitor
  61. send chan struct{}
  62. pong chan struct{}
  63. channels []*Channel
  64. channelsIdx map[byte]*Channel
  65. onReceive receiveCbFunc
  66. onError errorCbFunc
  67. errored uint32
  68. config *MConnConfig
  69. quit chan struct{}
  70. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  71. pingTimer *cmn.RepeatTimer // send pings periodically
  72. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  73. created time.Time // time of creation
  74. }
  75. // MConnConfig is a MConnection configuration.
  76. type MConnConfig struct {
  77. SendRate int64 `mapstructure:"send_rate"`
  78. RecvRate int64 `mapstructure:"recv_rate"`
  79. MaxMsgPacketPayloadSize int
  80. FlushThrottle time.Duration
  81. }
  82. func (cfg *MConnConfig) maxMsgPacketTotalSize() int {
  83. return cfg.MaxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  84. }
  85. // DefaultMConnConfig returns the default config.
  86. func DefaultMConnConfig() *MConnConfig {
  87. return &MConnConfig{
  88. SendRate: defaultSendRate,
  89. RecvRate: defaultRecvRate,
  90. MaxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize,
  91. FlushThrottle: defaultFlushThrottle,
  92. }
  93. }
  94. // NewMConnection wraps net.Conn and creates multiplex connection
  95. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  96. return NewMConnectionWithConfig(
  97. conn,
  98. chDescs,
  99. onReceive,
  100. onError,
  101. DefaultMConnConfig())
  102. }
  103. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  104. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
  105. mconn := &MConnection{
  106. conn: conn,
  107. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  108. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  109. sendMonitor: flow.New(0, 0),
  110. recvMonitor: flow.New(0, 0),
  111. send: make(chan struct{}, 1),
  112. pong: make(chan struct{}),
  113. onReceive: onReceive,
  114. onError: onError,
  115. config: config,
  116. }
  117. // Create channels
  118. var channelsIdx = map[byte]*Channel{}
  119. var channels = []*Channel{}
  120. for _, desc := range chDescs {
  121. channel := newChannel(mconn, *desc)
  122. channelsIdx[channel.desc.ID] = channel
  123. channels = append(channels, channel)
  124. }
  125. mconn.channels = channels
  126. mconn.channelsIdx = channelsIdx
  127. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  128. return mconn
  129. }
  130. func (c *MConnection) SetLogger(l log.Logger) {
  131. c.BaseService.SetLogger(l)
  132. for _, ch := range c.channels {
  133. ch.SetLogger(l)
  134. }
  135. }
  136. // OnStart implements BaseService
  137. func (c *MConnection) OnStart() error {
  138. if err := c.BaseService.OnStart(); err != nil {
  139. return err
  140. }
  141. c.quit = make(chan struct{})
  142. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
  143. c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
  144. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
  145. go c.sendRoutine()
  146. go c.recvRoutine()
  147. return nil
  148. }
  149. // OnStop implements BaseService
  150. func (c *MConnection) OnStop() {
  151. c.BaseService.OnStop()
  152. c.flushTimer.Stop()
  153. c.pingTimer.Stop()
  154. c.chStatsTimer.Stop()
  155. if c.quit != nil {
  156. close(c.quit)
  157. }
  158. c.conn.Close() // nolint: errcheck
  159. // We can't close pong safely here because
  160. // recvRoutine may write to it after we've stopped.
  161. // Though it doesn't need to get closed at all,
  162. // we close it @ recvRoutine.
  163. }
  164. func (c *MConnection) String() string {
  165. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  166. }
  167. func (c *MConnection) flush() {
  168. c.Logger.Debug("Flush", "conn", c)
  169. err := c.bufWriter.Flush()
  170. if err != nil {
  171. c.Logger.Error("MConnection flush failed", "err", err)
  172. }
  173. }
  174. // Catch panics, usually caused by remote disconnects.
  175. func (c *MConnection) _recover() {
  176. if r := recover(); r != nil {
  177. stack := debug.Stack()
  178. err := cmn.StackError{r, stack}
  179. c.stopForError(err)
  180. }
  181. }
  182. func (c *MConnection) stopForError(r interface{}) {
  183. c.Stop()
  184. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  185. if c.onError != nil {
  186. c.onError(r)
  187. }
  188. }
  189. }
  190. // Queues a message to be sent to channel.
  191. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  192. if !c.IsRunning() {
  193. return false
  194. }
  195. c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  196. // Send message to channel.
  197. channel, ok := c.channelsIdx[chID]
  198. if !ok {
  199. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  200. return false
  201. }
  202. success := channel.sendBytes(wire.BinaryBytes(msg))
  203. if success {
  204. // Wake up sendRoutine if necessary
  205. select {
  206. case c.send <- struct{}{}:
  207. default:
  208. }
  209. } else {
  210. c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
  211. }
  212. return success
  213. }
  214. // Queues a message to be sent to channel.
  215. // Nonblocking, returns true if successful.
  216. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  217. if !c.IsRunning() {
  218. return false
  219. }
  220. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  221. // Send message to channel.
  222. channel, ok := c.channelsIdx[chID]
  223. if !ok {
  224. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  225. return false
  226. }
  227. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  228. if ok {
  229. // Wake up sendRoutine if necessary
  230. select {
  231. case c.send <- struct{}{}:
  232. default:
  233. }
  234. }
  235. return ok
  236. }
  237. // CanSend returns true if you can send more data onto the chID, false
  238. // otherwise. Use only as a heuristic.
  239. func (c *MConnection) CanSend(chID byte) bool {
  240. if !c.IsRunning() {
  241. return false
  242. }
  243. channel, ok := c.channelsIdx[chID]
  244. if !ok {
  245. c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
  246. return false
  247. }
  248. return channel.canSend()
  249. }
  250. // sendRoutine polls for packets to send from channels.
  251. func (c *MConnection) sendRoutine() {
  252. defer c._recover()
  253. FOR_LOOP:
  254. for {
  255. var n int
  256. var err error
  257. select {
  258. case <-c.flushTimer.Ch:
  259. // NOTE: flushTimer.Set() must be called every time
  260. // something is written to .bufWriter.
  261. c.flush()
  262. case <-c.chStatsTimer.Chan():
  263. for _, channel := range c.channels {
  264. channel.updateStats()
  265. }
  266. case <-c.pingTimer.Chan():
  267. c.Logger.Debug("Send Ping")
  268. wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  269. c.sendMonitor.Update(int(n))
  270. c.flush()
  271. case <-c.pong:
  272. c.Logger.Debug("Send Pong")
  273. wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  274. c.sendMonitor.Update(int(n))
  275. c.flush()
  276. case <-c.quit:
  277. break FOR_LOOP
  278. case <-c.send:
  279. // Send some msgPackets
  280. eof := c.sendSomeMsgPackets()
  281. if !eof {
  282. // Keep sendRoutine awake.
  283. select {
  284. case c.send <- struct{}{}:
  285. default:
  286. }
  287. }
  288. }
  289. if !c.IsRunning() {
  290. break FOR_LOOP
  291. }
  292. if err != nil {
  293. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  294. c.stopForError(err)
  295. break FOR_LOOP
  296. }
  297. }
  298. // Cleanup
  299. }
  300. // Returns true if messages from channels were exhausted.
  301. // Blocks in accordance to .sendMonitor throttling.
  302. func (c *MConnection) sendSomeMsgPackets() bool {
  303. // Block until .sendMonitor says we can write.
  304. // Once we're ready we send more than we asked for,
  305. // but amortized it should even out.
  306. c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true)
  307. // Now send some msgPackets.
  308. for i := 0; i < numBatchMsgPackets; i++ {
  309. if c.sendMsgPacket() {
  310. return true
  311. }
  312. }
  313. return false
  314. }
  315. // Returns true if messages from channels were exhausted.
  316. func (c *MConnection) sendMsgPacket() bool {
  317. // Choose a channel to create a msgPacket from.
  318. // The chosen channel will be the one whose recentlySent/priority is the least.
  319. var leastRatio float32 = math.MaxFloat32
  320. var leastChannel *Channel
  321. for _, channel := range c.channels {
  322. // If nothing to send, skip this channel
  323. if !channel.isSendPending() {
  324. continue
  325. }
  326. // Get ratio, and keep track of lowest ratio.
  327. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  328. if ratio < leastRatio {
  329. leastRatio = ratio
  330. leastChannel = channel
  331. }
  332. }
  333. // Nothing to send?
  334. if leastChannel == nil {
  335. return true
  336. } else {
  337. // c.Logger.Info("Found a msgPacket to send")
  338. }
  339. // Make & send a msgPacket from this channel
  340. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  341. if err != nil {
  342. c.Logger.Error("Failed to write msgPacket", "err", err)
  343. c.stopForError(err)
  344. return true
  345. }
  346. c.sendMonitor.Update(int(n))
  347. c.flushTimer.Set()
  348. return false
  349. }
  350. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  351. // After a whole message has been assembled, it's pushed to onReceive().
  352. // Blocks depending on how the connection is throttled.
  353. // Otherwise, it never blocks.
  354. func (c *MConnection) recvRoutine() {
  355. defer c._recover()
  356. FOR_LOOP:
  357. for {
  358. // Block until .recvMonitor says we can read.
  359. c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true)
  360. /*
  361. // Peek into bufReader for debugging
  362. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  363. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  364. bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
  365. if err == nil {
  366. return bytes
  367. } else {
  368. log.Warn("Error peeking connection buffer", "err", err)
  369. return nil
  370. }
  371. }})
  372. }
  373. */
  374. // Read packet type
  375. var n int
  376. var err error
  377. pktType := wire.ReadByte(c.bufReader, &n, &err)
  378. c.recvMonitor.Update(int(n))
  379. if err != nil {
  380. if c.IsRunning() {
  381. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  382. c.stopForError(err)
  383. }
  384. break FOR_LOOP
  385. }
  386. // Read more depending on packet type.
  387. switch pktType {
  388. case packetTypePing:
  389. // TODO: prevent abuse, as they cause flush()'s.
  390. c.Logger.Debug("Receive Ping")
  391. select {
  392. case c.pong <- struct{}{}:
  393. default:
  394. // never block
  395. }
  396. case packetTypePong:
  397. // do nothing
  398. c.Logger.Debug("Receive Pong")
  399. case packetTypeMsg:
  400. pkt, n, err := msgPacket{}, int(0), error(nil)
  401. wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err)
  402. c.recvMonitor.Update(int(n))
  403. if err != nil {
  404. if c.IsRunning() {
  405. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  406. c.stopForError(err)
  407. }
  408. break FOR_LOOP
  409. }
  410. channel, ok := c.channelsIdx[pkt.ChannelID]
  411. if !ok || channel == nil {
  412. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  413. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  414. c.stopForError(err)
  415. break FOR_LOOP
  416. }
  417. msgBytes, err := channel.recvMsgPacket(pkt)
  418. if err != nil {
  419. if c.IsRunning() {
  420. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  421. c.stopForError(err)
  422. }
  423. break FOR_LOOP
  424. }
  425. if msgBytes != nil {
  426. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  427. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  428. c.onReceive(pkt.ChannelID, msgBytes)
  429. }
  430. default:
  431. err := fmt.Errorf("Unknown message type %X", pktType)
  432. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  433. c.stopForError(err)
  434. break FOR_LOOP
  435. }
  436. // TODO: don't bother with this "only ping when we havent heard from them".
  437. // lets just always ping every peer from the sendRoutine every 10s no matter what.
  438. // if they dont pong within pongTimeout, disconnect :)
  439. c.pingTimer.Reset()
  440. }
  441. // Cleanup
  442. close(c.pong)
  443. for range c.pong {
  444. // Drain
  445. }
  446. }
  447. type ConnectionStatus struct {
  448. Duration time.Duration
  449. SendMonitor flow.Status
  450. RecvMonitor flow.Status
  451. Channels []ChannelStatus
  452. }
  453. type ChannelStatus struct {
  454. ID byte
  455. SendQueueCapacity int
  456. SendQueueSize int
  457. Priority int
  458. RecentlySent int64
  459. }
  460. func (c *MConnection) Status() ConnectionStatus {
  461. var status ConnectionStatus
  462. status.Duration = time.Since(c.created)
  463. status.SendMonitor = c.sendMonitor.Status()
  464. status.RecvMonitor = c.recvMonitor.Status()
  465. status.Channels = make([]ChannelStatus, len(c.channels))
  466. for i, channel := range c.channels {
  467. status.Channels[i] = ChannelStatus{
  468. ID: channel.desc.ID,
  469. SendQueueCapacity: cap(channel.sendQueue),
  470. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  471. Priority: channel.desc.Priority,
  472. RecentlySent: channel.recentlySent,
  473. }
  474. }
  475. return status
  476. }
  477. //-----------------------------------------------------------------------------
  478. type ChannelDescriptor struct {
  479. ID byte
  480. Priority int
  481. SendQueueCapacity int
  482. RecvBufferCapacity int
  483. RecvMessageCapacity int
  484. }
  485. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  486. if chDesc.SendQueueCapacity == 0 {
  487. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  488. }
  489. if chDesc.RecvBufferCapacity == 0 {
  490. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  491. }
  492. if chDesc.RecvMessageCapacity == 0 {
  493. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  494. }
  495. filled = chDesc
  496. return
  497. }
  498. // TODO: lowercase.
  499. // NOTE: not goroutine-safe.
  500. type Channel struct {
  501. conn *MConnection
  502. desc ChannelDescriptor
  503. sendQueue chan []byte
  504. sendQueueSize int32 // atomic.
  505. recving []byte
  506. sending []byte
  507. recentlySent int64 // exponential moving average
  508. maxMsgPacketPayloadSize int
  509. Logger log.Logger
  510. }
  511. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  512. desc = desc.FillDefaults()
  513. if desc.Priority <= 0 {
  514. cmn.PanicSanity("Channel default priority must be a positive integer")
  515. }
  516. return &Channel{
  517. conn: conn,
  518. desc: desc,
  519. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  520. recving: make([]byte, 0, desc.RecvBufferCapacity),
  521. maxMsgPacketPayloadSize: conn.config.MaxMsgPacketPayloadSize,
  522. }
  523. }
  524. func (ch *Channel) SetLogger(l log.Logger) {
  525. ch.Logger = l
  526. }
  527. // Queues message to send to this channel.
  528. // Goroutine-safe
  529. // Times out (and returns false) after defaultSendTimeout
  530. func (ch *Channel) sendBytes(bytes []byte) bool {
  531. select {
  532. case ch.sendQueue <- bytes:
  533. atomic.AddInt32(&ch.sendQueueSize, 1)
  534. return true
  535. case <-time.After(defaultSendTimeout):
  536. return false
  537. }
  538. }
  539. // Queues message to send to this channel.
  540. // Nonblocking, returns true if successful.
  541. // Goroutine-safe
  542. func (ch *Channel) trySendBytes(bytes []byte) bool {
  543. select {
  544. case ch.sendQueue <- bytes:
  545. atomic.AddInt32(&ch.sendQueueSize, 1)
  546. return true
  547. default:
  548. return false
  549. }
  550. }
  551. // Goroutine-safe
  552. func (ch *Channel) loadSendQueueSize() (size int) {
  553. return int(atomic.LoadInt32(&ch.sendQueueSize))
  554. }
  555. // Goroutine-safe
  556. // Use only as a heuristic.
  557. func (ch *Channel) canSend() bool {
  558. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  559. }
  560. // Returns true if any msgPackets are pending to be sent.
  561. // Call before calling nextMsgPacket()
  562. // Goroutine-safe
  563. func (ch *Channel) isSendPending() bool {
  564. if len(ch.sending) == 0 {
  565. if len(ch.sendQueue) == 0 {
  566. return false
  567. }
  568. ch.sending = <-ch.sendQueue
  569. }
  570. return true
  571. }
  572. // Creates a new msgPacket to send.
  573. // Not goroutine-safe
  574. func (ch *Channel) nextMsgPacket() msgPacket {
  575. packet := msgPacket{}
  576. packet.ChannelID = byte(ch.desc.ID)
  577. maxSize := ch.maxMsgPacketPayloadSize
  578. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  579. if len(ch.sending) <= maxSize {
  580. packet.EOF = byte(0x01)
  581. ch.sending = nil
  582. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  583. } else {
  584. packet.EOF = byte(0x00)
  585. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  586. }
  587. return packet
  588. }
  589. // Writes next msgPacket to w.
  590. // Not goroutine-safe
  591. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  592. packet := ch.nextMsgPacket()
  593. ch.Logger.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  594. writeMsgPacketTo(packet, w, &n, &err)
  595. if err == nil {
  596. ch.recentlySent += int64(n)
  597. }
  598. return
  599. }
  600. func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) {
  601. wire.WriteByte(packetTypeMsg, w, n, err)
  602. wire.WriteBinary(packet, w, n, err)
  603. }
  604. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  605. // Not goroutine-safe
  606. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  607. ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  608. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  609. return nil, wire.ErrBinaryReadOverflow
  610. }
  611. ch.recving = append(ch.recving, packet.Bytes...)
  612. if packet.EOF == byte(0x01) {
  613. // TODO: document that these returned msgBytes will change under you after Receive finishes.
  614. // TODO: document it in the Reactor interface especially - implementations of a Reactor
  615. // can not keep these bytes around after the Receive completes without copying!
  616. // In general that's fine, because the first thing we do is unmarshal into a msg type and then
  617. // we never use the bytes again
  618. msgBytes := ch.recving
  619. // clear the slice without re-allocating.
  620. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  621. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  622. // at which point the recving slice stops being used and should be garbage collected
  623. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  624. return msgBytes, nil
  625. }
  626. return nil, nil
  627. }
  628. // Call this periodically to update stats for throttling purposes.
  629. // Not goroutine-safe
  630. func (ch *Channel) updateStats() {
  631. // Exponential decay of stats.
  632. // TODO: optimize.
  633. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  634. }
  635. //-----------------------------------------------------------------------------
  636. const (
  637. defaultMaxMsgPacketPayloadSize = 1024
  638. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  639. packetTypePing = byte(0x01)
  640. packetTypePong = byte(0x02)
  641. packetTypeMsg = byte(0x03)
  642. )
  643. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  644. type msgPacket struct {
  645. ChannelID byte
  646. EOF byte // 1 means message ends here.
  647. Bytes []byte
  648. }
  649. func (p msgPacket) String() string {
  650. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  651. }