You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

781 lines
21 KiB

9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "sync/atomic"
  10. "time"
  11. wire "github.com/tendermint/go-wire"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. flow "github.com/tendermint/tmlibs/flowrate"
  14. "github.com/tendermint/tmlibs/log"
  15. )
  16. const (
  17. numBatchMsgPackets = 10
  18. minReadBufferSize = 1024
  19. minWriteBufferSize = 65536
  20. updateStats = 2 * time.Second
  21. // some of these defaults are written in the user config
  22. // flushThrottle, sendRate, recvRate
  23. // TODO: remove values present in config
  24. defaultFlushThrottle = 100 * time.Millisecond
  25. defaultSendQueueCapacity = 1
  26. defaultRecvBufferCapacity = 4096
  27. defaultRecvMessageCapacity = 22020096 // 21MB
  28. defaultSendRate = int64(512000) // 500KB/s
  29. defaultRecvRate = int64(512000) // 500KB/s
  30. defaultSendTimeout = 10 * time.Second
  31. defaultPingInterval = 60 * time.Second
  32. defaultPongTimeout = 45 * time.Second
  33. )
  34. type receiveCbFunc func(chID byte, msgBytes []byte)
  35. type errorCbFunc func(interface{})
  36. /*
  37. Each peer has one `MConnection` (multiplex connection) instance.
  38. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  39. several messages along a single channel of communication.
  40. Each `MConnection` handles message transmission on multiple abstract communication
  41. `Channel`s. Each channel has a globally unique byte id.
  42. The byte id and the relative priorities of each `Channel` are configured upon
  43. initialization of the connection.
  44. There are two methods for sending messages:
  45. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  46. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  47. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  48. for the channel with the given id byte `chID`, or until the request times out.
  49. The message `msg` is serialized using the `tendermint/wire` submodule's
  50. `WriteBinary()` reflection routine.
  51. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  52. queue is full.
  53. Inbound message bytes are handled with an onReceive callback function.
  54. */
  55. type MConnection struct {
  56. cmn.BaseService
  57. conn net.Conn
  58. bufReader *bufio.Reader
  59. bufWriter *bufio.Writer
  60. sendMonitor *flow.Monitor
  61. recvMonitor *flow.Monitor
  62. send chan struct{}
  63. pong chan struct{}
  64. channels []*Channel
  65. channelsIdx map[byte]*Channel
  66. onReceive receiveCbFunc
  67. onError errorCbFunc
  68. errored uint32
  69. config *MConnConfig
  70. quit chan struct{}
  71. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  72. pingTimer *cmn.RepeatTimer // send pings periodically
  73. // close conn if pong is not received in pongTimeout
  74. pongTimer *time.Timer
  75. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  76. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  77. created time.Time // time of creation
  78. }
  79. // MConnConfig is a MConnection configuration.
  80. type MConnConfig struct {
  81. SendRate int64 `mapstructure:"send_rate"`
  82. RecvRate int64 `mapstructure:"recv_rate"`
  83. // Maximum payload size
  84. MaxMsgPacketPayloadSize int `mapstructure:"max_msg_packet_payload_size"`
  85. // Interval to flush writes (throttled)
  86. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  87. // Interval to send pings
  88. PingInterval time.Duration `mapstructure:"ping_interval"`
  89. // Maximum wait time for pongs
  90. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  91. }
  92. func (cfg *MConnConfig) maxMsgPacketTotalSize() int {
  93. return cfg.MaxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  94. }
  95. // DefaultMConnConfig returns the default config.
  96. func DefaultMConnConfig() *MConnConfig {
  97. return &MConnConfig{
  98. SendRate: defaultSendRate,
  99. RecvRate: defaultRecvRate,
  100. MaxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize,
  101. FlushThrottle: defaultFlushThrottle,
  102. PingInterval: defaultPingInterval,
  103. PongTimeout: defaultPongTimeout,
  104. }
  105. }
  106. // NewMConnection wraps net.Conn and creates multiplex connection
  107. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  108. return NewMConnectionWithConfig(
  109. conn,
  110. chDescs,
  111. onReceive,
  112. onError,
  113. DefaultMConnConfig())
  114. }
  115. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  116. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
  117. if config.PongTimeout >= config.PingInterval {
  118. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  119. }
  120. mconn := &MConnection{
  121. conn: conn,
  122. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  123. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  124. sendMonitor: flow.New(0, 0),
  125. recvMonitor: flow.New(0, 0),
  126. send: make(chan struct{}, 1),
  127. pong: make(chan struct{}, 1),
  128. onReceive: onReceive,
  129. onError: onError,
  130. config: config,
  131. }
  132. // Create channels
  133. var channelsIdx = map[byte]*Channel{}
  134. var channels = []*Channel{}
  135. for _, desc := range chDescs {
  136. channel := newChannel(mconn, *desc)
  137. channelsIdx[channel.desc.ID] = channel
  138. channels = append(channels, channel)
  139. }
  140. mconn.channels = channels
  141. mconn.channelsIdx = channelsIdx
  142. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  143. return mconn
  144. }
  145. func (c *MConnection) SetLogger(l log.Logger) {
  146. c.BaseService.SetLogger(l)
  147. for _, ch := range c.channels {
  148. ch.SetLogger(l)
  149. }
  150. }
  151. // OnStart implements BaseService
  152. func (c *MConnection) OnStart() error {
  153. if err := c.BaseService.OnStart(); err != nil {
  154. return err
  155. }
  156. c.quit = make(chan struct{})
  157. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
  158. c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval)
  159. c.pongTimeoutCh = make(chan bool, 1)
  160. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
  161. go c.sendRoutine()
  162. go c.recvRoutine()
  163. return nil
  164. }
  165. // OnStop implements BaseService
  166. func (c *MConnection) OnStop() {
  167. c.BaseService.OnStop()
  168. c.flushTimer.Stop()
  169. c.pingTimer.Stop()
  170. c.chStatsTimer.Stop()
  171. if c.quit != nil {
  172. close(c.quit)
  173. }
  174. c.conn.Close() // nolint: errcheck
  175. // We can't close pong safely here because
  176. // recvRoutine may write to it after we've stopped.
  177. // Though it doesn't need to get closed at all,
  178. // we close it @ recvRoutine.
  179. }
  180. func (c *MConnection) String() string {
  181. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  182. }
  183. func (c *MConnection) flush() {
  184. c.Logger.Debug("Flush", "conn", c)
  185. err := c.bufWriter.Flush()
  186. if err != nil {
  187. c.Logger.Error("MConnection flush failed", "err", err)
  188. }
  189. }
  190. // Catch panics, usually caused by remote disconnects.
  191. func (c *MConnection) _recover() {
  192. if r := recover(); r != nil {
  193. err := cmn.ErrorWrap(r, "recovered from panic")
  194. c.stopForError(err)
  195. }
  196. }
  197. func (c *MConnection) stopForError(r interface{}) {
  198. c.Stop()
  199. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  200. if c.onError != nil {
  201. c.onError(r)
  202. }
  203. }
  204. }
  205. // Queues a message to be sent to channel.
  206. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  207. if !c.IsRunning() {
  208. return false
  209. }
  210. c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  211. // Send message to channel.
  212. channel, ok := c.channelsIdx[chID]
  213. if !ok {
  214. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  215. return false
  216. }
  217. success := channel.sendBytes(wire.BinaryBytes(msg))
  218. if success {
  219. // Wake up sendRoutine if necessary
  220. select {
  221. case c.send <- struct{}{}:
  222. default:
  223. }
  224. } else {
  225. c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
  226. }
  227. return success
  228. }
  229. // Queues a message to be sent to channel.
  230. // Nonblocking, returns true if successful.
  231. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  232. if !c.IsRunning() {
  233. return false
  234. }
  235. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  236. // Send message to channel.
  237. channel, ok := c.channelsIdx[chID]
  238. if !ok {
  239. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  240. return false
  241. }
  242. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  243. if ok {
  244. // Wake up sendRoutine if necessary
  245. select {
  246. case c.send <- struct{}{}:
  247. default:
  248. }
  249. }
  250. return ok
  251. }
  252. // CanSend returns true if you can send more data onto the chID, false
  253. // otherwise. Use only as a heuristic.
  254. func (c *MConnection) CanSend(chID byte) bool {
  255. if !c.IsRunning() {
  256. return false
  257. }
  258. channel, ok := c.channelsIdx[chID]
  259. if !ok {
  260. c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
  261. return false
  262. }
  263. return channel.canSend()
  264. }
  265. // sendRoutine polls for packets to send from channels.
  266. func (c *MConnection) sendRoutine() {
  267. defer c._recover()
  268. FOR_LOOP:
  269. for {
  270. var n int
  271. var err error
  272. select {
  273. case <-c.flushTimer.Ch:
  274. // NOTE: flushTimer.Set() must be called every time
  275. // something is written to .bufWriter.
  276. c.flush()
  277. case <-c.chStatsTimer.Chan():
  278. for _, channel := range c.channels {
  279. channel.updateStats()
  280. }
  281. case <-c.pingTimer.Chan():
  282. c.Logger.Debug("Send Ping")
  283. wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  284. c.sendMonitor.Update(int(n))
  285. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  286. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  287. select {
  288. case c.pongTimeoutCh <- true:
  289. default:
  290. }
  291. })
  292. c.flush()
  293. case timeout := <-c.pongTimeoutCh:
  294. if timeout {
  295. c.Logger.Debug("Pong timeout")
  296. err = errors.New("pong timeout")
  297. } else {
  298. c.stopPongTimer()
  299. }
  300. case <-c.pong:
  301. c.Logger.Debug("Send Pong")
  302. wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  303. c.sendMonitor.Update(int(n))
  304. c.flush()
  305. case <-c.quit:
  306. break FOR_LOOP
  307. case <-c.send:
  308. // Send some msgPackets
  309. eof := c.sendSomeMsgPackets()
  310. if !eof {
  311. // Keep sendRoutine awake.
  312. select {
  313. case c.send <- struct{}{}:
  314. default:
  315. }
  316. }
  317. }
  318. if !c.IsRunning() {
  319. break FOR_LOOP
  320. }
  321. if err != nil {
  322. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  323. c.stopForError(err)
  324. break FOR_LOOP
  325. }
  326. }
  327. // Cleanup
  328. c.stopPongTimer()
  329. }
  330. // Returns true if messages from channels were exhausted.
  331. // Blocks in accordance to .sendMonitor throttling.
  332. func (c *MConnection) sendSomeMsgPackets() bool {
  333. // Block until .sendMonitor says we can write.
  334. // Once we're ready we send more than we asked for,
  335. // but amortized it should even out.
  336. c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true)
  337. // Now send some msgPackets.
  338. for i := 0; i < numBatchMsgPackets; i++ {
  339. if c.sendMsgPacket() {
  340. return true
  341. }
  342. }
  343. return false
  344. }
  345. // Returns true if messages from channels were exhausted.
  346. func (c *MConnection) sendMsgPacket() bool {
  347. // Choose a channel to create a msgPacket from.
  348. // The chosen channel will be the one whose recentlySent/priority is the least.
  349. var leastRatio float32 = math.MaxFloat32
  350. var leastChannel *Channel
  351. for _, channel := range c.channels {
  352. // If nothing to send, skip this channel
  353. if !channel.isSendPending() {
  354. continue
  355. }
  356. // Get ratio, and keep track of lowest ratio.
  357. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  358. if ratio < leastRatio {
  359. leastRatio = ratio
  360. leastChannel = channel
  361. }
  362. }
  363. // Nothing to send?
  364. if leastChannel == nil {
  365. return true
  366. }
  367. // c.Logger.Info("Found a msgPacket to send")
  368. // Make & send a msgPacket from this channel
  369. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  370. if err != nil {
  371. c.Logger.Error("Failed to write msgPacket", "err", err)
  372. c.stopForError(err)
  373. return true
  374. }
  375. c.sendMonitor.Update(int(n))
  376. c.flushTimer.Set()
  377. return false
  378. }
  379. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  380. // After a whole message has been assembled, it's pushed to onReceive().
  381. // Blocks depending on how the connection is throttled.
  382. // Otherwise, it never blocks.
  383. func (c *MConnection) recvRoutine() {
  384. defer c._recover()
  385. FOR_LOOP:
  386. for {
  387. // Block until .recvMonitor says we can read.
  388. c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true)
  389. /*
  390. // Peek into bufReader for debugging
  391. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  392. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  393. bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
  394. if err == nil {
  395. return bytes
  396. } else {
  397. log.Warn("Error peeking connection buffer", "err", err)
  398. return nil
  399. }
  400. }})
  401. }
  402. */
  403. // Read packet type
  404. var n int
  405. var err error
  406. pktType := wire.ReadByte(c.bufReader, &n, &err)
  407. c.recvMonitor.Update(int(n))
  408. if err != nil {
  409. if c.IsRunning() {
  410. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  411. c.stopForError(err)
  412. }
  413. break FOR_LOOP
  414. }
  415. // Read more depending on packet type.
  416. switch pktType {
  417. case packetTypePing:
  418. // TODO: prevent abuse, as they cause flush()'s.
  419. // https://github.com/tendermint/tendermint/issues/1190
  420. c.Logger.Debug("Receive Ping")
  421. select {
  422. case c.pong <- struct{}{}:
  423. default:
  424. // never block
  425. }
  426. case packetTypePong:
  427. c.Logger.Debug("Receive Pong")
  428. select {
  429. case c.pongTimeoutCh <- false:
  430. default:
  431. // never block
  432. }
  433. case packetTypeMsg:
  434. pkt, n, err := msgPacket{}, int(0), error(nil)
  435. wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err)
  436. c.recvMonitor.Update(int(n))
  437. if err != nil {
  438. if c.IsRunning() {
  439. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  440. c.stopForError(err)
  441. }
  442. break FOR_LOOP
  443. }
  444. channel, ok := c.channelsIdx[pkt.ChannelID]
  445. if !ok || channel == nil {
  446. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  447. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  448. c.stopForError(err)
  449. break FOR_LOOP
  450. }
  451. msgBytes, err := channel.recvMsgPacket(pkt)
  452. if err != nil {
  453. if c.IsRunning() {
  454. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  455. c.stopForError(err)
  456. }
  457. break FOR_LOOP
  458. }
  459. if msgBytes != nil {
  460. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  461. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  462. c.onReceive(pkt.ChannelID, msgBytes)
  463. }
  464. default:
  465. err := fmt.Errorf("Unknown message type %X", pktType)
  466. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  467. c.stopForError(err)
  468. break FOR_LOOP
  469. }
  470. }
  471. // Cleanup
  472. close(c.pong)
  473. for range c.pong {
  474. // Drain
  475. }
  476. }
  477. // not goroutine-safe
  478. func (c *MConnection) stopPongTimer() {
  479. if c.pongTimer != nil {
  480. if !c.pongTimer.Stop() {
  481. <-c.pongTimer.C
  482. }
  483. c.pongTimer = nil
  484. }
  485. }
  486. type ConnectionStatus struct {
  487. Duration time.Duration
  488. SendMonitor flow.Status
  489. RecvMonitor flow.Status
  490. Channels []ChannelStatus
  491. }
  492. type ChannelStatus struct {
  493. ID byte
  494. SendQueueCapacity int
  495. SendQueueSize int
  496. Priority int
  497. RecentlySent int64
  498. }
  499. func (c *MConnection) Status() ConnectionStatus {
  500. var status ConnectionStatus
  501. status.Duration = time.Since(c.created)
  502. status.SendMonitor = c.sendMonitor.Status()
  503. status.RecvMonitor = c.recvMonitor.Status()
  504. status.Channels = make([]ChannelStatus, len(c.channels))
  505. for i, channel := range c.channels {
  506. status.Channels[i] = ChannelStatus{
  507. ID: channel.desc.ID,
  508. SendQueueCapacity: cap(channel.sendQueue),
  509. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  510. Priority: channel.desc.Priority,
  511. RecentlySent: channel.recentlySent,
  512. }
  513. }
  514. return status
  515. }
  516. //-----------------------------------------------------------------------------
  517. type ChannelDescriptor struct {
  518. ID byte
  519. Priority int
  520. SendQueueCapacity int
  521. RecvBufferCapacity int
  522. RecvMessageCapacity int
  523. }
  524. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  525. if chDesc.SendQueueCapacity == 0 {
  526. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  527. }
  528. if chDesc.RecvBufferCapacity == 0 {
  529. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  530. }
  531. if chDesc.RecvMessageCapacity == 0 {
  532. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  533. }
  534. filled = chDesc
  535. return
  536. }
  537. // TODO: lowercase.
  538. // NOTE: not goroutine-safe.
  539. type Channel struct {
  540. conn *MConnection
  541. desc ChannelDescriptor
  542. sendQueue chan []byte
  543. sendQueueSize int32 // atomic.
  544. recving []byte
  545. sending []byte
  546. recentlySent int64 // exponential moving average
  547. maxMsgPacketPayloadSize int
  548. Logger log.Logger
  549. }
  550. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  551. desc = desc.FillDefaults()
  552. if desc.Priority <= 0 {
  553. cmn.PanicSanity("Channel default priority must be a positive integer")
  554. }
  555. return &Channel{
  556. conn: conn,
  557. desc: desc,
  558. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  559. recving: make([]byte, 0, desc.RecvBufferCapacity),
  560. maxMsgPacketPayloadSize: conn.config.MaxMsgPacketPayloadSize,
  561. }
  562. }
  563. func (ch *Channel) SetLogger(l log.Logger) {
  564. ch.Logger = l
  565. }
  566. // Queues message to send to this channel.
  567. // Goroutine-safe
  568. // Times out (and returns false) after defaultSendTimeout
  569. func (ch *Channel) sendBytes(bytes []byte) bool {
  570. select {
  571. case ch.sendQueue <- bytes:
  572. atomic.AddInt32(&ch.sendQueueSize, 1)
  573. return true
  574. case <-time.After(defaultSendTimeout):
  575. return false
  576. }
  577. }
  578. // Queues message to send to this channel.
  579. // Nonblocking, returns true if successful.
  580. // Goroutine-safe
  581. func (ch *Channel) trySendBytes(bytes []byte) bool {
  582. select {
  583. case ch.sendQueue <- bytes:
  584. atomic.AddInt32(&ch.sendQueueSize, 1)
  585. return true
  586. default:
  587. return false
  588. }
  589. }
  590. // Goroutine-safe
  591. func (ch *Channel) loadSendQueueSize() (size int) {
  592. return int(atomic.LoadInt32(&ch.sendQueueSize))
  593. }
  594. // Goroutine-safe
  595. // Use only as a heuristic.
  596. func (ch *Channel) canSend() bool {
  597. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  598. }
  599. // Returns true if any msgPackets are pending to be sent.
  600. // Call before calling nextMsgPacket()
  601. // Goroutine-safe
  602. func (ch *Channel) isSendPending() bool {
  603. if len(ch.sending) == 0 {
  604. if len(ch.sendQueue) == 0 {
  605. return false
  606. }
  607. ch.sending = <-ch.sendQueue
  608. }
  609. return true
  610. }
  611. // Creates a new msgPacket to send.
  612. // Not goroutine-safe
  613. func (ch *Channel) nextMsgPacket() msgPacket {
  614. packet := msgPacket{}
  615. packet.ChannelID = byte(ch.desc.ID)
  616. maxSize := ch.maxMsgPacketPayloadSize
  617. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  618. if len(ch.sending) <= maxSize {
  619. packet.EOF = byte(0x01)
  620. ch.sending = nil
  621. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  622. } else {
  623. packet.EOF = byte(0x00)
  624. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  625. }
  626. return packet
  627. }
  628. // Writes next msgPacket to w.
  629. // Not goroutine-safe
  630. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  631. packet := ch.nextMsgPacket()
  632. ch.Logger.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  633. writeMsgPacketTo(packet, w, &n, &err)
  634. if err == nil {
  635. ch.recentlySent += int64(n)
  636. }
  637. return
  638. }
  639. func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) {
  640. wire.WriteByte(packetTypeMsg, w, n, err)
  641. wire.WriteBinary(packet, w, n, err)
  642. }
  643. // Handles incoming msgPackets. It returns a message bytes if message is
  644. // complete. NOTE message bytes may change on next call to recvMsgPacket.
  645. // Not goroutine-safe
  646. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  647. ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  648. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  649. return nil, wire.ErrBinaryReadOverflow
  650. }
  651. ch.recving = append(ch.recving, packet.Bytes...)
  652. if packet.EOF == byte(0x01) {
  653. msgBytes := ch.recving
  654. // clear the slice without re-allocating.
  655. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  656. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  657. // at which point the recving slice stops being used and should be garbage collected
  658. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  659. return msgBytes, nil
  660. }
  661. return nil, nil
  662. }
  663. // Call this periodically to update stats for throttling purposes.
  664. // Not goroutine-safe
  665. func (ch *Channel) updateStats() {
  666. // Exponential decay of stats.
  667. // TODO: optimize.
  668. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  669. }
  670. //-----------------------------------------------------------------------------
  671. const (
  672. defaultMaxMsgPacketPayloadSize = 1024
  673. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  674. packetTypePing = byte(0x01)
  675. packetTypePong = byte(0x02)
  676. packetTypeMsg = byte(0x03)
  677. )
  678. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  679. type msgPacket struct {
  680. ChannelID byte
  681. EOF byte // 1 means message ends here.
  682. Bytes []byte
  683. }
  684. func (p msgPacket) String() string {
  685. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  686. }