You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

756 lines
21 KiB

9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. wire "github.com/tendermint/go-wire"
  12. tmlegacy "github.com/tendermint/go-wire/nowriter/tmlegacy"
  13. cmn "github.com/tendermint/tmlibs/common"
  14. flow "github.com/tendermint/tmlibs/flowrate"
  15. "github.com/tendermint/tmlibs/log"
  16. )
  17. var legacy = tmlegacy.TMEncoderLegacy{}
  18. const (
  19. numBatchMsgPackets = 10
  20. minReadBufferSize = 1024
  21. minWriteBufferSize = 65536
  22. updateStats = 2 * time.Second
  23. pingTimeout = 40 * time.Second
  24. // some of these defaults are written in the user config
  25. // flushThrottle, sendRate, recvRate
  26. // TODO: remove values present in config
  27. defaultFlushThrottle = 100 * time.Millisecond
  28. defaultSendQueueCapacity = 1
  29. defaultRecvBufferCapacity = 4096
  30. defaultRecvMessageCapacity = 22020096 // 21MB
  31. defaultSendRate = int64(512000) // 500KB/s
  32. defaultRecvRate = int64(512000) // 500KB/s
  33. defaultSendTimeout = 10 * time.Second
  34. )
  35. type receiveCbFunc func(chID byte, msgBytes []byte)
  36. type errorCbFunc func(interface{})
  37. /*
  38. Each peer has one `MConnection` (multiplex connection) instance.
  39. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  40. several messages along a single channel of communication.
  41. Each `MConnection` handles message transmission on multiple abstract communication
  42. `Channel`s. Each channel has a globally unique byte id.
  43. The byte id and the relative priorities of each `Channel` are configured upon
  44. initialization of the connection.
  45. There are two methods for sending messages:
  46. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  47. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  48. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  49. for the channel with the given id byte `chID`, or until the request times out.
  50. The message `msg` is serialized using the `tendermint/wire` submodule's
  51. `WriteBinary()` reflection routine.
  52. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  53. queue is full.
  54. Inbound message bytes are handled with an onReceive callback function.
  55. */
  56. type MConnection struct {
  57. cmn.BaseService
  58. conn net.Conn
  59. bufReader *bufio.Reader
  60. bufWriter *bufio.Writer
  61. sendMonitor *flow.Monitor
  62. recvMonitor *flow.Monitor
  63. send chan struct{}
  64. pong chan struct{}
  65. channels []*Channel
  66. channelsIdx map[byte]*Channel
  67. onReceive receiveCbFunc
  68. onError errorCbFunc
  69. errored uint32
  70. config *MConnConfig
  71. quit chan struct{}
  72. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  73. pingTimer *cmn.RepeatTimer // send pings periodically
  74. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  75. LocalAddress *NetAddress
  76. RemoteAddress *NetAddress
  77. }
  78. // MConnConfig is a MConnection configuration.
  79. type MConnConfig struct {
  80. SendRate int64 `mapstructure:"send_rate"`
  81. RecvRate int64 `mapstructure:"recv_rate"`
  82. maxMsgPacketPayloadSize int
  83. flushThrottle time.Duration
  84. }
  85. func (cfg *MConnConfig) maxMsgPacketTotalSize() int {
  86. return cfg.maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  87. }
  88. // DefaultMConnConfig returns the default config.
  89. func DefaultMConnConfig() *MConnConfig {
  90. return &MConnConfig{
  91. SendRate: defaultSendRate,
  92. RecvRate: defaultRecvRate,
  93. maxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize,
  94. flushThrottle: defaultFlushThrottle,
  95. }
  96. }
  97. // NewMConnection wraps net.Conn and creates multiplex connection
  98. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  99. return NewMConnectionWithConfig(
  100. conn,
  101. chDescs,
  102. onReceive,
  103. onError,
  104. DefaultMConnConfig())
  105. }
  106. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  107. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
  108. mconn := &MConnection{
  109. conn: conn,
  110. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  111. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  112. sendMonitor: flow.New(0, 0),
  113. recvMonitor: flow.New(0, 0),
  114. send: make(chan struct{}, 1),
  115. pong: make(chan struct{}),
  116. onReceive: onReceive,
  117. onError: onError,
  118. config: config,
  119. LocalAddress: NewNetAddress(conn.LocalAddr()),
  120. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  121. }
  122. // Create channels
  123. var channelsIdx = map[byte]*Channel{}
  124. var channels = []*Channel{}
  125. for _, desc := range chDescs {
  126. channel := newChannel(mconn, *desc)
  127. channelsIdx[channel.desc.ID] = channel
  128. channels = append(channels, channel)
  129. }
  130. mconn.channels = channels
  131. mconn.channelsIdx = channelsIdx
  132. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  133. return mconn
  134. }
  135. func (c *MConnection) SetLogger(l log.Logger) {
  136. c.BaseService.SetLogger(l)
  137. for _, ch := range c.channels {
  138. ch.SetLogger(l)
  139. }
  140. }
  141. // OnStart implements BaseService
  142. func (c *MConnection) OnStart() error {
  143. if err := c.BaseService.OnStart(); err != nil {
  144. return err
  145. }
  146. c.quit = make(chan struct{})
  147. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle)
  148. c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
  149. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
  150. go c.sendRoutine()
  151. go c.recvRoutine()
  152. return nil
  153. }
  154. // OnStop implements BaseService
  155. func (c *MConnection) OnStop() {
  156. c.Logger.Debug("MConn.OnStop")
  157. c.BaseService.OnStop()
  158. c.Logger.Debug("MConn.flushTimer.Stop")
  159. c.flushTimer.Stop()
  160. c.Logger.Debug("MConn.pingTimer.Stop")
  161. c.pingTimer.Stop()
  162. c.Logger.Debug("MConn.chStatsTimer.Stop")
  163. c.chStatsTimer.Stop()
  164. if c.quit != nil {
  165. c.Logger.Debug("MConn: Close Quit")
  166. close(c.quit)
  167. }
  168. c.Logger.Debug("MConn.conn.Close()")
  169. c.conn.Close() // nolint: errcheck
  170. // We can't close pong safely here because
  171. // recvRoutine may write to it after we've stopped.
  172. // Though it doesn't need to get closed at all,
  173. // we close it @ recvRoutine.
  174. // close(c.pong)
  175. }
  176. func (c *MConnection) String() string {
  177. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  178. }
  179. func (c *MConnection) flush() {
  180. c.Logger.Debug("Flush", "conn", c)
  181. err := c.bufWriter.Flush()
  182. if err != nil {
  183. c.Logger.Error("MConnection flush failed", "err", err)
  184. }
  185. }
  186. // Catch panics, usually caused by remote disconnects.
  187. func (c *MConnection) _recover() {
  188. if r := recover(); r != nil {
  189. stack := debug.Stack()
  190. err := cmn.StackError{r, stack}
  191. c.stopForError(err)
  192. }
  193. }
  194. func (c *MConnection) stopForError(r interface{}) {
  195. c.Stop()
  196. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  197. if c.onError != nil {
  198. c.onError(r)
  199. }
  200. }
  201. }
  202. // Queues a message to be sent to channel.
  203. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  204. if !c.IsRunning() {
  205. return false
  206. }
  207. c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  208. // Send message to channel.
  209. channel, ok := c.channelsIdx[chID]
  210. if !ok {
  211. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  212. return false
  213. }
  214. success := channel.sendBytes(wire.BinaryBytes(msg))
  215. if success {
  216. // Wake up sendRoutine if necessary
  217. select {
  218. case c.send <- struct{}{}:
  219. default:
  220. }
  221. } else {
  222. c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
  223. }
  224. return success
  225. }
  226. // Queues a message to be sent to channel.
  227. // Nonblocking, returns true if successful.
  228. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  229. if !c.IsRunning() {
  230. return false
  231. }
  232. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  233. // Send message to channel.
  234. channel, ok := c.channelsIdx[chID]
  235. if !ok {
  236. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  237. return false
  238. }
  239. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  240. if ok {
  241. // Wake up sendRoutine if necessary
  242. select {
  243. case c.send <- struct{}{}:
  244. default:
  245. }
  246. }
  247. return ok
  248. }
  249. // CanSend returns true if you can send more data onto the chID, false
  250. // otherwise. Use only as a heuristic.
  251. func (c *MConnection) CanSend(chID byte) bool {
  252. if !c.IsRunning() {
  253. return false
  254. }
  255. channel, ok := c.channelsIdx[chID]
  256. if !ok {
  257. c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
  258. return false
  259. }
  260. return channel.canSend()
  261. }
  262. // sendRoutine polls for packets to send from channels.
  263. func (c *MConnection) sendRoutine() {
  264. defer c._recover()
  265. FOR_LOOP:
  266. for {
  267. var n int
  268. var err error
  269. select {
  270. case <-c.flushTimer.Ch:
  271. // NOTE: flushTimer.Set() must be called every time
  272. // something is written to .bufWriter.
  273. c.flush()
  274. case <-c.chStatsTimer.Ch:
  275. for _, channel := range c.channels {
  276. channel.updateStats()
  277. }
  278. case <-c.pingTimer.Ch:
  279. c.Logger.Debug("Send Ping")
  280. legacy.WriteOctet(packetTypePing, c.bufWriter, &n, &err)
  281. c.sendMonitor.Update(int(n))
  282. c.flush()
  283. case <-c.pong:
  284. c.Logger.Debug("Send Pong")
  285. legacy.WriteOctet(packetTypePong, c.bufWriter, &n, &err)
  286. c.sendMonitor.Update(int(n))
  287. c.flush()
  288. case <-c.quit:
  289. break FOR_LOOP
  290. case <-c.send:
  291. // Send some msgPackets
  292. eof := c.sendSomeMsgPackets()
  293. c.Logger.Debug("finished sendSomeMsgPackets", "eof", eof)
  294. if !eof {
  295. // Keep sendRoutine awake.
  296. select {
  297. case c.send <- struct{}{}:
  298. default:
  299. }
  300. }
  301. }
  302. if !c.IsRunning() {
  303. break FOR_LOOP
  304. }
  305. if err != nil {
  306. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  307. c.stopForError(err)
  308. break FOR_LOOP
  309. }
  310. }
  311. c.Logger.Debug("sendRoutine: End")
  312. // Cleanup
  313. }
  314. // Returns true if messages from channels were exhausted.
  315. // Blocks in accordance to .sendMonitor throttling.
  316. func (c *MConnection) sendSomeMsgPackets() bool {
  317. // Block until .sendMonitor says we can write.
  318. // Once we're ready we send more than we asked for,
  319. // but amortized it should even out.
  320. c.Logger.Debug("sendMonitor.Limit")
  321. c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true)
  322. // Now send some msgPackets.
  323. for i := 0; i < numBatchMsgPackets; i++ {
  324. c.Logger.Debug("sendMsgPacket", "i", i)
  325. if c.sendMsgPacket() {
  326. return true
  327. }
  328. }
  329. return false
  330. }
  331. // Returns true if messages from channels were exhausted.
  332. func (c *MConnection) sendMsgPacket() bool {
  333. // Choose a channel to create a msgPacket from.
  334. // The chosen channel will be the one whose recentlySent/priority is the least.
  335. var leastRatio float32 = math.MaxFloat32
  336. var leastChannel *Channel
  337. for _, channel := range c.channels {
  338. // If nothing to send, skip this channel
  339. if !channel.isSendPending() {
  340. continue
  341. }
  342. // Get ratio, and keep track of lowest ratio.
  343. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  344. if ratio < leastRatio {
  345. leastRatio = ratio
  346. leastChannel = channel
  347. }
  348. }
  349. // Nothing to send?
  350. if leastChannel == nil {
  351. c.Logger.Debug("Least channel == nil")
  352. return true
  353. } else {
  354. // c.Logger.Info("Found a msgPacket to send")
  355. }
  356. // Make & send a msgPacket from this channel
  357. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  358. if err != nil {
  359. c.Logger.Error("Failed to write msgPacket", "err", err)
  360. c.stopForError(err)
  361. return true
  362. }
  363. c.Logger.Debug("sendMonitor.Update")
  364. c.sendMonitor.Update(int(n))
  365. c.flushTimer.Set()
  366. return false
  367. }
  368. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  369. // After a whole message has been assembled, it's pushed to onReceive().
  370. // Blocks depending on how the connection is throttled.
  371. func (c *MConnection) recvRoutine() {
  372. defer c._recover()
  373. FOR_LOOP:
  374. for {
  375. c.Logger.Debug("recvRoutine: recvMonitor.Limit")
  376. // Block until .recvMonitor says we can read.
  377. c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true)
  378. /*
  379. // Peek into bufReader for debugging
  380. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  381. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  382. bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
  383. if err == nil {
  384. return bytes
  385. } else {
  386. log.Warn("Error peeking connection buffer", "err", err)
  387. return nil
  388. }
  389. }})
  390. }
  391. */
  392. // Read packet type
  393. var n int
  394. var err error
  395. c.Logger.Debug("recvRoutine: ReadByte")
  396. pktType := wire.ReadByte(c.bufReader, &n, &err)
  397. c.Logger.Debug("recvRoutine: recvMonitor.Update")
  398. c.recvMonitor.Update(int(n))
  399. if err != nil {
  400. if c.IsRunning() {
  401. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  402. c.stopForError(err)
  403. }
  404. break FOR_LOOP
  405. }
  406. // Read more depending on packet type.
  407. switch pktType {
  408. case packetTypePing:
  409. // TODO: prevent abuse, as they cause flush()'s.
  410. c.Logger.Debug("Receive Ping")
  411. c.pong <- struct{}{}
  412. c.Logger.Debug("recvRoutine: trigger pong")
  413. case packetTypePong:
  414. // do nothing
  415. c.Logger.Debug("Receive Pong")
  416. case packetTypeMsg:
  417. pkt, n, err := msgPacket{}, int(0), error(nil)
  418. c.Logger.Debug("recvRoutine: ReadBinaryPtr")
  419. wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err)
  420. c.Logger.Debug("recvRoutine: recvMonitor.Update")
  421. c.recvMonitor.Update(int(n))
  422. if err != nil {
  423. if c.IsRunning() {
  424. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  425. c.stopForError(err)
  426. }
  427. break FOR_LOOP
  428. }
  429. channel, ok := c.channelsIdx[pkt.ChannelID]
  430. if !ok || channel == nil {
  431. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  432. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  433. c.stopForError(err)
  434. }
  435. c.Logger.Debug("recvRoutine: recvMsgPacket")
  436. msgBytes, err := channel.recvMsgPacket(pkt)
  437. c.Logger.Debug("recvRoutine: msgBytes", "msgBytes", msgBytes, "err", err)
  438. if err != nil {
  439. if c.IsRunning() {
  440. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  441. c.stopForError(err)
  442. }
  443. break FOR_LOOP
  444. }
  445. if msgBytes != nil {
  446. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  447. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  448. c.onReceive(pkt.ChannelID, msgBytes)
  449. c.Logger.Debug("done onReceive")
  450. }
  451. default:
  452. err := fmt.Errorf("Unknown message type %X", pktType)
  453. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  454. c.stopForError(err)
  455. }
  456. // TODO: shouldn't this go in the sendRoutine?
  457. // Better to send a ping packet when *we* haven't sent anything for a while.
  458. c.Logger.Debug("pingTimer.Reset()")
  459. c.pingTimer.Reset()
  460. c.Logger.Debug("done pingTimer.Reset()")
  461. }
  462. c.Logger.Debug("recvRoutine: End")
  463. // Cleanup
  464. close(c.pong)
  465. for range c.pong {
  466. // Drain
  467. }
  468. }
  469. type ConnectionStatus struct {
  470. SendMonitor flow.Status
  471. RecvMonitor flow.Status
  472. Channels []ChannelStatus
  473. }
  474. type ChannelStatus struct {
  475. ID byte
  476. SendQueueCapacity int
  477. SendQueueSize int
  478. Priority int
  479. RecentlySent int64
  480. }
  481. func (c *MConnection) Status() ConnectionStatus {
  482. var status ConnectionStatus
  483. status.SendMonitor = c.sendMonitor.Status()
  484. status.RecvMonitor = c.recvMonitor.Status()
  485. status.Channels = make([]ChannelStatus, len(c.channels))
  486. for i, channel := range c.channels {
  487. status.Channels[i] = ChannelStatus{
  488. ID: channel.desc.ID,
  489. SendQueueCapacity: cap(channel.sendQueue),
  490. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  491. Priority: channel.desc.Priority,
  492. RecentlySent: channel.recentlySent,
  493. }
  494. }
  495. return status
  496. }
  497. //-----------------------------------------------------------------------------
  498. type ChannelDescriptor struct {
  499. ID byte
  500. Priority int
  501. SendQueueCapacity int
  502. RecvBufferCapacity int
  503. RecvMessageCapacity int
  504. }
  505. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  506. if chDesc.SendQueueCapacity == 0 {
  507. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  508. }
  509. if chDesc.RecvBufferCapacity == 0 {
  510. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  511. }
  512. if chDesc.RecvMessageCapacity == 0 {
  513. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  514. }
  515. filled = chDesc
  516. return
  517. }
  518. // TODO: lowercase.
  519. // NOTE: not goroutine-safe.
  520. type Channel struct {
  521. conn *MConnection
  522. desc ChannelDescriptor
  523. sendQueue chan []byte
  524. sendQueueSize int32 // atomic.
  525. recving []byte
  526. sending []byte
  527. recentlySent int64 // exponential moving average
  528. maxMsgPacketPayloadSize int
  529. Logger log.Logger
  530. }
  531. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  532. desc = desc.FillDefaults()
  533. if desc.Priority <= 0 {
  534. cmn.PanicSanity("Channel default priority must be a positive integer")
  535. }
  536. return &Channel{
  537. conn: conn,
  538. desc: desc,
  539. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  540. recving: make([]byte, 0, desc.RecvBufferCapacity),
  541. maxMsgPacketPayloadSize: conn.config.maxMsgPacketPayloadSize,
  542. }
  543. }
  544. func (ch *Channel) SetLogger(l log.Logger) {
  545. ch.Logger = l
  546. }
  547. // Queues message to send to this channel.
  548. // Goroutine-safe
  549. // Times out (and returns false) after defaultSendTimeout
  550. func (ch *Channel) sendBytes(bytes []byte) bool {
  551. select {
  552. case ch.sendQueue <- bytes:
  553. atomic.AddInt32(&ch.sendQueueSize, 1)
  554. return true
  555. case <-time.After(defaultSendTimeout):
  556. return false
  557. }
  558. }
  559. // Queues message to send to this channel.
  560. // Nonblocking, returns true if successful.
  561. // Goroutine-safe
  562. func (ch *Channel) trySendBytes(bytes []byte) bool {
  563. select {
  564. case ch.sendQueue <- bytes:
  565. atomic.AddInt32(&ch.sendQueueSize, 1)
  566. return true
  567. default:
  568. return false
  569. }
  570. }
  571. // Goroutine-safe
  572. func (ch *Channel) loadSendQueueSize() (size int) {
  573. return int(atomic.LoadInt32(&ch.sendQueueSize))
  574. }
  575. // Goroutine-safe
  576. // Use only as a heuristic.
  577. func (ch *Channel) canSend() bool {
  578. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  579. }
  580. // Returns true if any msgPackets are pending to be sent.
  581. // Call before calling nextMsgPacket()
  582. // Goroutine-safe
  583. func (ch *Channel) isSendPending() bool {
  584. if len(ch.sending) == 0 {
  585. if len(ch.sendQueue) == 0 {
  586. return false
  587. }
  588. ch.sending = <-ch.sendQueue
  589. }
  590. return true
  591. }
  592. // Creates a new msgPacket to send.
  593. // Not goroutine-safe
  594. func (ch *Channel) nextMsgPacket() msgPacket {
  595. packet := msgPacket{}
  596. packet.ChannelID = byte(ch.desc.ID)
  597. maxSize := ch.maxMsgPacketPayloadSize
  598. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  599. if len(ch.sending) <= maxSize {
  600. packet.EOF = byte(0x01)
  601. ch.sending = nil
  602. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  603. } else {
  604. packet.EOF = byte(0x00)
  605. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  606. }
  607. return packet
  608. }
  609. // Writes next msgPacket to w.
  610. // Not goroutine-safe
  611. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  612. packet := ch.nextMsgPacket()
  613. ch.Logger.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  614. writeMsgPacketTo(packet, w, &n, &err)
  615. if err == nil {
  616. ch.recentlySent += int64(n)
  617. }
  618. return
  619. }
  620. func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) {
  621. legacy.WriteOctet(packetTypeMsg, w, n, err)
  622. wire.WriteBinary(packet, w, n, err)
  623. }
  624. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  625. // Not goroutine-safe
  626. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  627. ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  628. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  629. return nil, wire.ErrBinaryReadOverflow
  630. }
  631. ch.recving = append(ch.recving, packet.Bytes...)
  632. if packet.EOF == byte(0x01) {
  633. msgBytes := ch.recving
  634. // clear the slice without re-allocating.
  635. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  636. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  637. // at which point the recving slice stops being used and should be garbage collected
  638. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  639. return msgBytes, nil
  640. }
  641. return nil, nil
  642. }
  643. // Call this periodically to update stats for throttling purposes.
  644. // Not goroutine-safe
  645. func (ch *Channel) updateStats() {
  646. // Exponential decay of stats.
  647. // TODO: optimize.
  648. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  649. }
  650. //-----------------------------------------------------------------------------
  651. const (
  652. defaultMaxMsgPacketPayloadSize = 1024
  653. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  654. packetTypePing = byte(0x01)
  655. packetTypePong = byte(0x02)
  656. packetTypeMsg = byte(0x03)
  657. )
  658. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  659. type msgPacket struct {
  660. ChannelID byte
  661. EOF byte // 1 means message ends here.
  662. Bytes []byte
  663. }
  664. func (p msgPacket) String() string {
  665. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  666. }