You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

784 lines
21 KiB

9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "runtime/debug"
  10. "sync/atomic"
  11. "time"
  12. wire "github.com/tendermint/go-wire"
  13. cmn "github.com/tendermint/tmlibs/common"
  14. flow "github.com/tendermint/tmlibs/flowrate"
  15. "github.com/tendermint/tmlibs/log"
  16. )
  17. const (
  18. numBatchMsgPackets = 10
  19. minReadBufferSize = 1024
  20. minWriteBufferSize = 65536
  21. updateStats = 2 * time.Second
  22. // some of these defaults are written in the user config
  23. // flushThrottle, sendRate, recvRate
  24. // TODO: remove values present in config
  25. defaultFlushThrottle = 100 * time.Millisecond
  26. defaultSendQueueCapacity = 1
  27. defaultRecvBufferCapacity = 4096
  28. defaultRecvMessageCapacity = 22020096 // 21MB
  29. defaultSendRate = int64(512000) // 500KB/s
  30. defaultRecvRate = int64(512000) // 500KB/s
  31. defaultSendTimeout = 10 * time.Second
  32. defaultPingInterval = 60 * time.Second
  33. defaultPongTimeout = 45 * time.Second
  34. )
  35. type receiveCbFunc func(chID byte, msgBytes []byte)
  36. type errorCbFunc func(interface{})
  37. /*
  38. Each peer has one `MConnection` (multiplex connection) instance.
  39. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  40. several messages along a single channel of communication.
  41. Each `MConnection` handles message transmission on multiple abstract communication
  42. `Channel`s. Each channel has a globally unique byte id.
  43. The byte id and the relative priorities of each `Channel` are configured upon
  44. initialization of the connection.
  45. There are two methods for sending messages:
  46. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  47. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  48. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  49. for the channel with the given id byte `chID`, or until the request times out.
  50. The message `msg` is serialized using the `tendermint/wire` submodule's
  51. `WriteBinary()` reflection routine.
  52. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  53. queue is full.
  54. Inbound message bytes are handled with an onReceive callback function.
  55. */
  56. type MConnection struct {
  57. cmn.BaseService
  58. conn net.Conn
  59. bufReader *bufio.Reader
  60. bufWriter *bufio.Writer
  61. sendMonitor *flow.Monitor
  62. recvMonitor *flow.Monitor
  63. send chan struct{}
  64. pong chan struct{}
  65. channels []*Channel
  66. channelsIdx map[byte]*Channel
  67. onReceive receiveCbFunc
  68. onError errorCbFunc
  69. errored uint32
  70. config *MConnConfig
  71. quit chan struct{}
  72. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  73. pingTimer *cmn.RepeatTimer // send pings periodically
  74. // close conn if pong is not received in pongTimeout
  75. pongTimer *time.Timer
  76. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  77. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  78. created time.Time // time of creation
  79. }
  80. // MConnConfig is a MConnection configuration.
  81. type MConnConfig struct {
  82. SendRate int64 `mapstructure:"send_rate"`
  83. RecvRate int64 `mapstructure:"recv_rate"`
  84. // Maximum payload size
  85. MaxMsgPacketPayloadSize int `mapstructure:"max_msg_packet_payload_size"`
  86. // Interval to flush writes (throttled)
  87. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  88. // Interval to send pings
  89. PingInterval time.Duration `mapstructure:"ping_interval"`
  90. // Maximum wait time for pongs
  91. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  92. }
  93. func (cfg *MConnConfig) maxMsgPacketTotalSize() int {
  94. return cfg.MaxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  95. }
  96. // DefaultMConnConfig returns the default config.
  97. func DefaultMConnConfig() *MConnConfig {
  98. return &MConnConfig{
  99. SendRate: defaultSendRate,
  100. RecvRate: defaultRecvRate,
  101. MaxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize,
  102. FlushThrottle: defaultFlushThrottle,
  103. PingInterval: defaultPingInterval,
  104. PongTimeout: defaultPongTimeout,
  105. }
  106. }
  107. // NewMConnection wraps net.Conn and creates multiplex connection
  108. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  109. return NewMConnectionWithConfig(
  110. conn,
  111. chDescs,
  112. onReceive,
  113. onError,
  114. DefaultMConnConfig())
  115. }
  116. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  117. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
  118. if config.PongTimeout >= config.PingInterval {
  119. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  120. }
  121. mconn := &MConnection{
  122. conn: conn,
  123. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  124. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  125. sendMonitor: flow.New(0, 0),
  126. recvMonitor: flow.New(0, 0),
  127. send: make(chan struct{}, 1),
  128. pong: make(chan struct{}, 1),
  129. onReceive: onReceive,
  130. onError: onError,
  131. config: config,
  132. }
  133. // Create channels
  134. var channelsIdx = map[byte]*Channel{}
  135. var channels = []*Channel{}
  136. for _, desc := range chDescs {
  137. channel := newChannel(mconn, *desc)
  138. channelsIdx[channel.desc.ID] = channel
  139. channels = append(channels, channel)
  140. }
  141. mconn.channels = channels
  142. mconn.channelsIdx = channelsIdx
  143. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  144. return mconn
  145. }
  146. func (c *MConnection) SetLogger(l log.Logger) {
  147. c.BaseService.SetLogger(l)
  148. for _, ch := range c.channels {
  149. ch.SetLogger(l)
  150. }
  151. }
  152. // OnStart implements BaseService
  153. func (c *MConnection) OnStart() error {
  154. if err := c.BaseService.OnStart(); err != nil {
  155. return err
  156. }
  157. c.quit = make(chan struct{})
  158. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
  159. c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval)
  160. c.pongTimeoutCh = make(chan bool, 1)
  161. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
  162. go c.sendRoutine()
  163. go c.recvRoutine()
  164. return nil
  165. }
  166. // OnStop implements BaseService
  167. func (c *MConnection) OnStop() {
  168. c.BaseService.OnStop()
  169. c.flushTimer.Stop()
  170. c.pingTimer.Stop()
  171. c.chStatsTimer.Stop()
  172. if c.quit != nil {
  173. close(c.quit)
  174. }
  175. c.conn.Close() // nolint: errcheck
  176. // We can't close pong safely here because
  177. // recvRoutine may write to it after we've stopped.
  178. // Though it doesn't need to get closed at all,
  179. // we close it @ recvRoutine.
  180. }
  181. func (c *MConnection) String() string {
  182. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  183. }
  184. func (c *MConnection) flush() {
  185. c.Logger.Debug("Flush", "conn", c)
  186. err := c.bufWriter.Flush()
  187. if err != nil {
  188. c.Logger.Error("MConnection flush failed", "err", err)
  189. }
  190. }
  191. // Catch panics, usually caused by remote disconnects.
  192. func (c *MConnection) _recover() {
  193. if r := recover(); r != nil {
  194. stack := debug.Stack()
  195. err := cmn.StackError{r, stack}
  196. c.stopForError(err)
  197. }
  198. }
  199. func (c *MConnection) stopForError(r interface{}) {
  200. c.Stop()
  201. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  202. if c.onError != nil {
  203. c.onError(r)
  204. }
  205. }
  206. }
  207. // Queues a message to be sent to channel.
  208. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  209. if !c.IsRunning() {
  210. return false
  211. }
  212. c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  213. // Send message to channel.
  214. channel, ok := c.channelsIdx[chID]
  215. if !ok {
  216. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  217. return false
  218. }
  219. success := channel.sendBytes(wire.BinaryBytes(msg))
  220. if success {
  221. // Wake up sendRoutine if necessary
  222. select {
  223. case c.send <- struct{}{}:
  224. default:
  225. }
  226. } else {
  227. c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
  228. }
  229. return success
  230. }
  231. // Queues a message to be sent to channel.
  232. // Nonblocking, returns true if successful.
  233. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  234. if !c.IsRunning() {
  235. return false
  236. }
  237. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  238. // Send message to channel.
  239. channel, ok := c.channelsIdx[chID]
  240. if !ok {
  241. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  242. return false
  243. }
  244. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  245. if ok {
  246. // Wake up sendRoutine if necessary
  247. select {
  248. case c.send <- struct{}{}:
  249. default:
  250. }
  251. }
  252. return ok
  253. }
  254. // CanSend returns true if you can send more data onto the chID, false
  255. // otherwise. Use only as a heuristic.
  256. func (c *MConnection) CanSend(chID byte) bool {
  257. if !c.IsRunning() {
  258. return false
  259. }
  260. channel, ok := c.channelsIdx[chID]
  261. if !ok {
  262. c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
  263. return false
  264. }
  265. return channel.canSend()
  266. }
  267. // sendRoutine polls for packets to send from channels.
  268. func (c *MConnection) sendRoutine() {
  269. defer c._recover()
  270. FOR_LOOP:
  271. for {
  272. var n int
  273. var err error
  274. select {
  275. case <-c.flushTimer.Ch:
  276. // NOTE: flushTimer.Set() must be called every time
  277. // something is written to .bufWriter.
  278. c.flush()
  279. case <-c.chStatsTimer.Chan():
  280. for _, channel := range c.channels {
  281. channel.updateStats()
  282. }
  283. case <-c.pingTimer.Chan():
  284. c.Logger.Debug("Send Ping")
  285. wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  286. c.sendMonitor.Update(int(n))
  287. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  288. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  289. select {
  290. case c.pongTimeoutCh <- true:
  291. default:
  292. }
  293. })
  294. c.flush()
  295. case timeout := <-c.pongTimeoutCh:
  296. if timeout {
  297. c.Logger.Debug("Pong timeout")
  298. err = errors.New("pong timeout")
  299. } else {
  300. c.stopPongTimer()
  301. }
  302. case <-c.pong:
  303. c.Logger.Debug("Send Pong")
  304. wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  305. c.sendMonitor.Update(int(n))
  306. c.flush()
  307. case <-c.quit:
  308. break FOR_LOOP
  309. case <-c.send:
  310. // Send some msgPackets
  311. eof := c.sendSomeMsgPackets()
  312. if !eof {
  313. // Keep sendRoutine awake.
  314. select {
  315. case c.send <- struct{}{}:
  316. default:
  317. }
  318. }
  319. }
  320. if !c.IsRunning() {
  321. break FOR_LOOP
  322. }
  323. if err != nil {
  324. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  325. c.stopForError(err)
  326. break FOR_LOOP
  327. }
  328. }
  329. // Cleanup
  330. c.stopPongTimer()
  331. }
  332. // Returns true if messages from channels were exhausted.
  333. // Blocks in accordance to .sendMonitor throttling.
  334. func (c *MConnection) sendSomeMsgPackets() bool {
  335. // Block until .sendMonitor says we can write.
  336. // Once we're ready we send more than we asked for,
  337. // but amortized it should even out.
  338. c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true)
  339. // Now send some msgPackets.
  340. for i := 0; i < numBatchMsgPackets; i++ {
  341. if c.sendMsgPacket() {
  342. return true
  343. }
  344. }
  345. return false
  346. }
  347. // Returns true if messages from channels were exhausted.
  348. func (c *MConnection) sendMsgPacket() bool {
  349. // Choose a channel to create a msgPacket from.
  350. // The chosen channel will be the one whose recentlySent/priority is the least.
  351. var leastRatio float32 = math.MaxFloat32
  352. var leastChannel *Channel
  353. for _, channel := range c.channels {
  354. // If nothing to send, skip this channel
  355. if !channel.isSendPending() {
  356. continue
  357. }
  358. // Get ratio, and keep track of lowest ratio.
  359. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  360. if ratio < leastRatio {
  361. leastRatio = ratio
  362. leastChannel = channel
  363. }
  364. }
  365. // Nothing to send?
  366. if leastChannel == nil {
  367. return true
  368. } else {
  369. // c.Logger.Info("Found a msgPacket to send")
  370. }
  371. // Make & send a msgPacket from this channel
  372. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  373. if err != nil {
  374. c.Logger.Error("Failed to write msgPacket", "err", err)
  375. c.stopForError(err)
  376. return true
  377. }
  378. c.sendMonitor.Update(int(n))
  379. c.flushTimer.Set()
  380. return false
  381. }
  382. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  383. // After a whole message has been assembled, it's pushed to onReceive().
  384. // Blocks depending on how the connection is throttled.
  385. // Otherwise, it never blocks.
  386. func (c *MConnection) recvRoutine() {
  387. defer c._recover()
  388. FOR_LOOP:
  389. for {
  390. // Block until .recvMonitor says we can read.
  391. c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true)
  392. /*
  393. // Peek into bufReader for debugging
  394. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  395. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  396. bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
  397. if err == nil {
  398. return bytes
  399. } else {
  400. log.Warn("Error peeking connection buffer", "err", err)
  401. return nil
  402. }
  403. }})
  404. }
  405. */
  406. // Read packet type
  407. var n int
  408. var err error
  409. pktType := wire.ReadByte(c.bufReader, &n, &err)
  410. c.recvMonitor.Update(int(n))
  411. if err != nil {
  412. if c.IsRunning() {
  413. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  414. c.stopForError(err)
  415. }
  416. break FOR_LOOP
  417. }
  418. // Read more depending on packet type.
  419. switch pktType {
  420. case packetTypePing:
  421. // TODO: prevent abuse, as they cause flush()'s.
  422. // https://github.com/tendermint/tendermint/issues/1190
  423. c.Logger.Debug("Receive Ping")
  424. select {
  425. case c.pong <- struct{}{}:
  426. default:
  427. // never block
  428. }
  429. case packetTypePong:
  430. c.Logger.Debug("Receive Pong")
  431. select {
  432. case c.pongTimeoutCh <- false:
  433. default:
  434. // never block
  435. }
  436. case packetTypeMsg:
  437. pkt, n, err := msgPacket{}, int(0), error(nil)
  438. wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err)
  439. c.recvMonitor.Update(int(n))
  440. if err != nil {
  441. if c.IsRunning() {
  442. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  443. c.stopForError(err)
  444. }
  445. break FOR_LOOP
  446. }
  447. channel, ok := c.channelsIdx[pkt.ChannelID]
  448. if !ok || channel == nil {
  449. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  450. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  451. c.stopForError(err)
  452. break FOR_LOOP
  453. }
  454. msgBytes, err := channel.recvMsgPacket(pkt)
  455. if err != nil {
  456. if c.IsRunning() {
  457. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  458. c.stopForError(err)
  459. }
  460. break FOR_LOOP
  461. }
  462. if msgBytes != nil {
  463. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  464. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  465. c.onReceive(pkt.ChannelID, msgBytes)
  466. }
  467. default:
  468. err := fmt.Errorf("Unknown message type %X", pktType)
  469. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  470. c.stopForError(err)
  471. break FOR_LOOP
  472. }
  473. }
  474. // Cleanup
  475. close(c.pong)
  476. for range c.pong {
  477. // Drain
  478. }
  479. }
  480. // not goroutine-safe
  481. func (c *MConnection) stopPongTimer() {
  482. if c.pongTimer != nil {
  483. if !c.pongTimer.Stop() {
  484. <-c.pongTimer.C
  485. }
  486. c.pongTimer = nil
  487. }
  488. }
  489. type ConnectionStatus struct {
  490. Duration time.Duration
  491. SendMonitor flow.Status
  492. RecvMonitor flow.Status
  493. Channels []ChannelStatus
  494. }
  495. type ChannelStatus struct {
  496. ID byte
  497. SendQueueCapacity int
  498. SendQueueSize int
  499. Priority int
  500. RecentlySent int64
  501. }
  502. func (c *MConnection) Status() ConnectionStatus {
  503. var status ConnectionStatus
  504. status.Duration = time.Since(c.created)
  505. status.SendMonitor = c.sendMonitor.Status()
  506. status.RecvMonitor = c.recvMonitor.Status()
  507. status.Channels = make([]ChannelStatus, len(c.channels))
  508. for i, channel := range c.channels {
  509. status.Channels[i] = ChannelStatus{
  510. ID: channel.desc.ID,
  511. SendQueueCapacity: cap(channel.sendQueue),
  512. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  513. Priority: channel.desc.Priority,
  514. RecentlySent: channel.recentlySent,
  515. }
  516. }
  517. return status
  518. }
  519. //-----------------------------------------------------------------------------
  520. type ChannelDescriptor struct {
  521. ID byte
  522. Priority int
  523. SendQueueCapacity int
  524. RecvBufferCapacity int
  525. RecvMessageCapacity int
  526. }
  527. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  528. if chDesc.SendQueueCapacity == 0 {
  529. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  530. }
  531. if chDesc.RecvBufferCapacity == 0 {
  532. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  533. }
  534. if chDesc.RecvMessageCapacity == 0 {
  535. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  536. }
  537. filled = chDesc
  538. return
  539. }
  540. // TODO: lowercase.
  541. // NOTE: not goroutine-safe.
  542. type Channel struct {
  543. conn *MConnection
  544. desc ChannelDescriptor
  545. sendQueue chan []byte
  546. sendQueueSize int32 // atomic.
  547. recving []byte
  548. sending []byte
  549. recentlySent int64 // exponential moving average
  550. maxMsgPacketPayloadSize int
  551. Logger log.Logger
  552. }
  553. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  554. desc = desc.FillDefaults()
  555. if desc.Priority <= 0 {
  556. cmn.PanicSanity("Channel default priority must be a positive integer")
  557. }
  558. return &Channel{
  559. conn: conn,
  560. desc: desc,
  561. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  562. recving: make([]byte, 0, desc.RecvBufferCapacity),
  563. maxMsgPacketPayloadSize: conn.config.MaxMsgPacketPayloadSize,
  564. }
  565. }
  566. func (ch *Channel) SetLogger(l log.Logger) {
  567. ch.Logger = l
  568. }
  569. // Queues message to send to this channel.
  570. // Goroutine-safe
  571. // Times out (and returns false) after defaultSendTimeout
  572. func (ch *Channel) sendBytes(bytes []byte) bool {
  573. select {
  574. case ch.sendQueue <- bytes:
  575. atomic.AddInt32(&ch.sendQueueSize, 1)
  576. return true
  577. case <-time.After(defaultSendTimeout):
  578. return false
  579. }
  580. }
  581. // Queues message to send to this channel.
  582. // Nonblocking, returns true if successful.
  583. // Goroutine-safe
  584. func (ch *Channel) trySendBytes(bytes []byte) bool {
  585. select {
  586. case ch.sendQueue <- bytes:
  587. atomic.AddInt32(&ch.sendQueueSize, 1)
  588. return true
  589. default:
  590. return false
  591. }
  592. }
  593. // Goroutine-safe
  594. func (ch *Channel) loadSendQueueSize() (size int) {
  595. return int(atomic.LoadInt32(&ch.sendQueueSize))
  596. }
  597. // Goroutine-safe
  598. // Use only as a heuristic.
  599. func (ch *Channel) canSend() bool {
  600. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  601. }
  602. // Returns true if any msgPackets are pending to be sent.
  603. // Call before calling nextMsgPacket()
  604. // Goroutine-safe
  605. func (ch *Channel) isSendPending() bool {
  606. if len(ch.sending) == 0 {
  607. if len(ch.sendQueue) == 0 {
  608. return false
  609. }
  610. ch.sending = <-ch.sendQueue
  611. }
  612. return true
  613. }
  614. // Creates a new msgPacket to send.
  615. // Not goroutine-safe
  616. func (ch *Channel) nextMsgPacket() msgPacket {
  617. packet := msgPacket{}
  618. packet.ChannelID = byte(ch.desc.ID)
  619. maxSize := ch.maxMsgPacketPayloadSize
  620. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  621. if len(ch.sending) <= maxSize {
  622. packet.EOF = byte(0x01)
  623. ch.sending = nil
  624. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  625. } else {
  626. packet.EOF = byte(0x00)
  627. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  628. }
  629. return packet
  630. }
  631. // Writes next msgPacket to w.
  632. // Not goroutine-safe
  633. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  634. packet := ch.nextMsgPacket()
  635. ch.Logger.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  636. writeMsgPacketTo(packet, w, &n, &err)
  637. if err == nil {
  638. ch.recentlySent += int64(n)
  639. }
  640. return
  641. }
  642. func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) {
  643. wire.WriteByte(packetTypeMsg, w, n, err)
  644. wire.WriteBinary(packet, w, n, err)
  645. }
  646. // Handles incoming msgPackets. It returns a message bytes if message is
  647. // complete. NOTE message bytes may change on next call to recvMsgPacket.
  648. // Not goroutine-safe
  649. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  650. ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  651. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  652. return nil, wire.ErrBinaryReadOverflow
  653. }
  654. ch.recving = append(ch.recving, packet.Bytes...)
  655. if packet.EOF == byte(0x01) {
  656. msgBytes := ch.recving
  657. // clear the slice without re-allocating.
  658. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  659. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  660. // at which point the recving slice stops being used and should be garbage collected
  661. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  662. return msgBytes, nil
  663. }
  664. return nil, nil
  665. }
  666. // Call this periodically to update stats for throttling purposes.
  667. // Not goroutine-safe
  668. func (ch *Channel) updateStats() {
  669. // Exponential decay of stats.
  670. // TODO: optimize.
  671. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  672. }
  673. //-----------------------------------------------------------------------------
  674. const (
  675. defaultMaxMsgPacketPayloadSize = 1024
  676. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  677. packetTypePing = byte(0x01)
  678. packetTypePong = byte(0x02)
  679. packetTypeMsg = byte(0x03)
  680. )
  681. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  682. type msgPacket struct {
  683. ChannelID byte
  684. EOF byte // 1 means message ends here.
  685. Bytes []byte
  686. }
  687. func (p msgPacket) String() string {
  688. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  689. }