You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

916 lines
24 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "runtime/debug"
  11. "sync/atomic"
  12. "time"
  13. "github.com/gogo/protobuf/proto"
  14. flow "github.com/tendermint/tendermint/libs/flowrate"
  15. "github.com/tendermint/tendermint/libs/log"
  16. tmmath "github.com/tendermint/tendermint/libs/math"
  17. "github.com/tendermint/tendermint/libs/protoio"
  18. "github.com/tendermint/tendermint/libs/service"
  19. tmsync "github.com/tendermint/tendermint/libs/sync"
  20. "github.com/tendermint/tendermint/libs/timer"
  21. tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  22. )
  23. const (
  24. defaultMaxPacketMsgPayloadSize = 1024
  25. numBatchPacketMsgs = 10
  26. minReadBufferSize = 1024
  27. minWriteBufferSize = 65536
  28. updateStats = 2 * time.Second
  29. // some of these defaults are written in the user config
  30. // flushThrottle, sendRate, recvRate
  31. // TODO: remove values present in config
  32. defaultFlushThrottle = 100 * time.Millisecond
  33. defaultSendQueueCapacity = 1
  34. defaultRecvBufferCapacity = 4096
  35. defaultRecvMessageCapacity = 22020096 // 21MB
  36. defaultSendRate = int64(512000) // 500KB/s
  37. defaultRecvRate = int64(512000) // 500KB/s
  38. defaultSendTimeout = 10 * time.Second
  39. defaultPingInterval = 60 * time.Second
  40. defaultPongTimeout = 45 * time.Second
  41. )
  42. type receiveCbFunc func(chID byte, msgBytes []byte)
  43. type errorCbFunc func(interface{})
  44. /*
  45. Each peer has one `MConnection` (multiplex connection) instance.
  46. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  47. several messages along a single channel of communication.
  48. Each `MConnection` handles message transmission on multiple abstract communication
  49. `Channel`s. Each channel has a globally unique byte id.
  50. The byte id and the relative priorities of each `Channel` are configured upon
  51. initialization of the connection.
  52. There are two methods for sending messages:
  53. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  54. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  55. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  56. successfully queued for the channel with the given id byte `chID`, or until the
  57. request times out. The message `msg` is serialized using Protobuf.
  58. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  59. channel's queue is full.
  60. Inbound message bytes are handled with an onReceive callback function.
  61. */
  62. type MConnection struct {
  63. service.BaseService
  64. conn net.Conn
  65. bufConnReader *bufio.Reader
  66. bufConnWriter *bufio.Writer
  67. sendMonitor *flow.Monitor
  68. recvMonitor *flow.Monitor
  69. send chan struct{}
  70. pong chan struct{}
  71. channels []*Channel
  72. channelsIdx map[byte]*Channel
  73. onReceive receiveCbFunc
  74. onError errorCbFunc
  75. errored uint32
  76. config MConnConfig
  77. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  78. // doneSendRoutine is closed when the sendRoutine actually quits.
  79. quitSendRoutine chan struct{}
  80. doneSendRoutine chan struct{}
  81. // Closing quitRecvRouting will cause the recvRouting to eventually quit.
  82. quitRecvRoutine chan struct{}
  83. // used to ensure FlushStop and OnStop
  84. // are safe to call concurrently.
  85. stopMtx tmsync.Mutex
  86. flushTimer *timer.ThrottleTimer // flush writes as necessary but throttled.
  87. pingTimer *time.Ticker // send pings periodically
  88. // close conn if pong is not received in pongTimeout
  89. pongTimer *time.Timer
  90. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  91. chStatsTimer *time.Ticker // update channel stats periodically
  92. created time.Time // time of creation
  93. _maxPacketMsgSize int
  94. }
  95. // MConnConfig is a MConnection configuration.
  96. type MConnConfig struct {
  97. SendRate int64 `mapstructure:"send_rate"`
  98. RecvRate int64 `mapstructure:"recv_rate"`
  99. // Maximum payload size
  100. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  101. // Interval to flush writes (throttled)
  102. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  103. // Interval to send pings
  104. PingInterval time.Duration `mapstructure:"ping_interval"`
  105. // Maximum wait time for pongs
  106. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  107. }
  108. // DefaultMConnConfig returns the default config.
  109. func DefaultMConnConfig() MConnConfig {
  110. return MConnConfig{
  111. SendRate: defaultSendRate,
  112. RecvRate: defaultRecvRate,
  113. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  114. FlushThrottle: defaultFlushThrottle,
  115. PingInterval: defaultPingInterval,
  116. PongTimeout: defaultPongTimeout,
  117. }
  118. }
  119. // NewMConnection wraps net.Conn and creates multiplex connection
  120. func NewMConnection(
  121. conn net.Conn,
  122. chDescs []*ChannelDescriptor,
  123. onReceive receiveCbFunc,
  124. onError errorCbFunc,
  125. ) *MConnection {
  126. return NewMConnectionWithConfig(
  127. conn,
  128. chDescs,
  129. onReceive,
  130. onError,
  131. DefaultMConnConfig())
  132. }
  133. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  134. func NewMConnectionWithConfig(
  135. conn net.Conn,
  136. chDescs []*ChannelDescriptor,
  137. onReceive receiveCbFunc,
  138. onError errorCbFunc,
  139. config MConnConfig,
  140. ) *MConnection {
  141. if config.PongTimeout >= config.PingInterval {
  142. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  143. }
  144. mconn := &MConnection{
  145. conn: conn,
  146. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  147. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  148. sendMonitor: flow.New(0, 0),
  149. recvMonitor: flow.New(0, 0),
  150. send: make(chan struct{}, 1),
  151. pong: make(chan struct{}, 1),
  152. onReceive: onReceive,
  153. onError: onError,
  154. config: config,
  155. created: time.Now(),
  156. }
  157. // Create channels
  158. var channelsIdx = map[byte]*Channel{}
  159. var channels = []*Channel{}
  160. for _, desc := range chDescs {
  161. channel := newChannel(mconn, *desc)
  162. channelsIdx[channel.desc.ID] = channel
  163. channels = append(channels, channel)
  164. }
  165. mconn.channels = channels
  166. mconn.channelsIdx = channelsIdx
  167. mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn)
  168. // maxPacketMsgSize() is a bit heavy, so call just once
  169. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  170. return mconn
  171. }
  172. func (c *MConnection) SetLogger(l log.Logger) {
  173. c.BaseService.SetLogger(l)
  174. for _, ch := range c.channels {
  175. ch.SetLogger(l)
  176. }
  177. }
  178. // OnStart implements BaseService
  179. func (c *MConnection) OnStart() error {
  180. if err := c.BaseService.OnStart(); err != nil {
  181. return err
  182. }
  183. c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle)
  184. c.pingTimer = time.NewTicker(c.config.PingInterval)
  185. c.pongTimeoutCh = make(chan bool, 1)
  186. c.chStatsTimer = time.NewTicker(updateStats)
  187. c.quitSendRoutine = make(chan struct{})
  188. c.doneSendRoutine = make(chan struct{})
  189. c.quitRecvRoutine = make(chan struct{})
  190. go c.sendRoutine()
  191. go c.recvRoutine()
  192. return nil
  193. }
  194. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  195. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  196. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  197. func (c *MConnection) stopServices() (alreadyStopped bool) {
  198. c.stopMtx.Lock()
  199. defer c.stopMtx.Unlock()
  200. select {
  201. case <-c.quitSendRoutine:
  202. // already quit
  203. return true
  204. default:
  205. }
  206. select {
  207. case <-c.quitRecvRoutine:
  208. // already quit
  209. return true
  210. default:
  211. }
  212. c.BaseService.OnStop()
  213. c.flushTimer.Stop()
  214. c.pingTimer.Stop()
  215. c.chStatsTimer.Stop()
  216. // inform the recvRouting that we are shutting down
  217. close(c.quitRecvRoutine)
  218. close(c.quitSendRoutine)
  219. return false
  220. }
  221. // FlushStop replicates the logic of OnStop.
  222. // It additionally ensures that all successful
  223. // .Send() calls will get flushed before closing
  224. // the connection.
  225. func (c *MConnection) FlushStop() {
  226. if c.stopServices() {
  227. return
  228. }
  229. // this block is unique to FlushStop
  230. {
  231. // wait until the sendRoutine exits
  232. // so we dont race on calling sendSomePacketMsgs
  233. <-c.doneSendRoutine
  234. // Send and flush all pending msgs.
  235. // Since sendRoutine has exited, we can call this
  236. // safely
  237. eof := c.sendSomePacketMsgs()
  238. for !eof {
  239. eof = c.sendSomePacketMsgs()
  240. }
  241. c.flush()
  242. // Now we can close the connection
  243. }
  244. c.conn.Close()
  245. // We can't close pong safely here because
  246. // recvRoutine may write to it after we've stopped.
  247. // Though it doesn't need to get closed at all,
  248. // we close it @ recvRoutine.
  249. // c.Stop()
  250. }
  251. // OnStop implements BaseService
  252. func (c *MConnection) OnStop() {
  253. if c.stopServices() {
  254. return
  255. }
  256. c.conn.Close()
  257. // We can't close pong safely here because
  258. // recvRoutine may write to it after we've stopped.
  259. // Though it doesn't need to get closed at all,
  260. // we close it @ recvRoutine.
  261. }
  262. func (c *MConnection) String() string {
  263. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  264. }
  265. func (c *MConnection) flush() {
  266. c.Logger.Debug("Flush", "conn", c)
  267. err := c.bufConnWriter.Flush()
  268. if err != nil {
  269. c.Logger.Debug("MConnection flush failed", "err", err)
  270. }
  271. }
  272. // Catch panics, usually caused by remote disconnects.
  273. func (c *MConnection) _recover() {
  274. if r := recover(); r != nil {
  275. c.Logger.Error("MConnection panicked", "err", r, "stack", string(debug.Stack()))
  276. c.stopForError(fmt.Errorf("recovered from panic: %v", r))
  277. }
  278. }
  279. func (c *MConnection) stopForError(r interface{}) {
  280. if err := c.Stop(); err != nil {
  281. c.Logger.Error("Error stopping connection", "err", err)
  282. }
  283. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  284. if c.onError != nil {
  285. c.onError(r)
  286. }
  287. }
  288. }
  289. // Queues a message to be sent to channel.
  290. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  291. if !c.IsRunning() {
  292. return false
  293. }
  294. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  295. // Send message to channel.
  296. channel, ok := c.channelsIdx[chID]
  297. if !ok {
  298. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  299. return false
  300. }
  301. success := channel.sendBytes(msgBytes)
  302. if success {
  303. // Wake up sendRoutine if necessary
  304. select {
  305. case c.send <- struct{}{}:
  306. default:
  307. }
  308. } else {
  309. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  310. }
  311. return success
  312. }
  313. // Queues a message to be sent to channel.
  314. // Nonblocking, returns true if successful.
  315. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  316. if !c.IsRunning() {
  317. return false
  318. }
  319. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  320. // Send message to channel.
  321. channel, ok := c.channelsIdx[chID]
  322. if !ok {
  323. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  324. return false
  325. }
  326. ok = channel.trySendBytes(msgBytes)
  327. if ok {
  328. // Wake up sendRoutine if necessary
  329. select {
  330. case c.send <- struct{}{}:
  331. default:
  332. }
  333. }
  334. return ok
  335. }
  336. // CanSend returns true if you can send more data onto the chID, false
  337. // otherwise. Use only as a heuristic.
  338. func (c *MConnection) CanSend(chID byte) bool {
  339. if !c.IsRunning() {
  340. return false
  341. }
  342. channel, ok := c.channelsIdx[chID]
  343. if !ok {
  344. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  345. return false
  346. }
  347. return channel.canSend()
  348. }
  349. // sendRoutine polls for packets to send from channels.
  350. func (c *MConnection) sendRoutine() {
  351. defer c._recover()
  352. protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter)
  353. FOR_LOOP:
  354. for {
  355. var _n int
  356. var err error
  357. SELECTION:
  358. select {
  359. case <-c.flushTimer.Ch:
  360. // NOTE: flushTimer.Set() must be called every time
  361. // something is written to .bufConnWriter.
  362. c.flush()
  363. case <-c.chStatsTimer.C:
  364. for _, channel := range c.channels {
  365. channel.updateStats()
  366. }
  367. case <-c.pingTimer.C:
  368. c.Logger.Debug("Send Ping")
  369. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  370. if err != nil {
  371. c.Logger.Error("Failed to send PacketPing", "err", err)
  372. break SELECTION
  373. }
  374. c.sendMonitor.Update(_n)
  375. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  376. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  377. select {
  378. case c.pongTimeoutCh <- true:
  379. default:
  380. }
  381. })
  382. c.flush()
  383. case timeout := <-c.pongTimeoutCh:
  384. if timeout {
  385. c.Logger.Debug("Pong timeout")
  386. err = errors.New("pong timeout")
  387. } else {
  388. c.stopPongTimer()
  389. }
  390. case <-c.pong:
  391. c.Logger.Debug("Send Pong")
  392. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  393. if err != nil {
  394. c.Logger.Error("Failed to send PacketPong", "err", err)
  395. break SELECTION
  396. }
  397. c.sendMonitor.Update(_n)
  398. c.flush()
  399. case <-c.quitSendRoutine:
  400. break FOR_LOOP
  401. case <-c.send:
  402. // Send some PacketMsgs
  403. eof := c.sendSomePacketMsgs()
  404. if !eof {
  405. // Keep sendRoutine awake.
  406. select {
  407. case c.send <- struct{}{}:
  408. default:
  409. }
  410. }
  411. }
  412. if !c.IsRunning() {
  413. break FOR_LOOP
  414. }
  415. if err != nil {
  416. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  417. c.stopForError(err)
  418. break FOR_LOOP
  419. }
  420. }
  421. // Cleanup
  422. c.stopPongTimer()
  423. close(c.doneSendRoutine)
  424. }
  425. // Returns true if messages from channels were exhausted.
  426. // Blocks in accordance to .sendMonitor throttling.
  427. func (c *MConnection) sendSomePacketMsgs() bool {
  428. // Block until .sendMonitor says we can write.
  429. // Once we're ready we send more than we asked for,
  430. // but amortized it should even out.
  431. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  432. // Now send some PacketMsgs.
  433. for i := 0; i < numBatchPacketMsgs; i++ {
  434. if c.sendPacketMsg() {
  435. return true
  436. }
  437. }
  438. return false
  439. }
  440. // Returns true if messages from channels were exhausted.
  441. func (c *MConnection) sendPacketMsg() bool {
  442. // Choose a channel to create a PacketMsg from.
  443. // The chosen channel will be the one whose recentlySent/priority is the least.
  444. var leastRatio float32 = math.MaxFloat32
  445. var leastChannel *Channel
  446. for _, channel := range c.channels {
  447. // If nothing to send, skip this channel
  448. if !channel.isSendPending() {
  449. continue
  450. }
  451. // Get ratio, and keep track of lowest ratio.
  452. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  453. if ratio < leastRatio {
  454. leastRatio = ratio
  455. leastChannel = channel
  456. }
  457. }
  458. // Nothing to send?
  459. if leastChannel == nil {
  460. return true
  461. }
  462. // c.Logger.Info("Found a msgPacket to send")
  463. // Make & send a PacketMsg from this channel
  464. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  465. if err != nil {
  466. c.Logger.Error("Failed to write PacketMsg", "err", err)
  467. c.stopForError(err)
  468. return true
  469. }
  470. c.sendMonitor.Update(_n)
  471. c.flushTimer.Set()
  472. return false
  473. }
  474. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  475. // After a whole message has been assembled, it's pushed to onReceive().
  476. // Blocks depending on how the connection is throttled.
  477. // Otherwise, it never blocks.
  478. func (c *MConnection) recvRoutine() {
  479. defer c._recover()
  480. protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)
  481. FOR_LOOP:
  482. for {
  483. // Block until .recvMonitor says we can read.
  484. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  485. // Peek into bufConnReader for debugging
  486. /*
  487. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  488. bz, err := c.bufConnReader.Peek(tmmath.MinInt(numBytes, 100))
  489. if err == nil {
  490. // return
  491. } else {
  492. c.Logger.Debug("Error peeking connection buffer", "err", err)
  493. // return nil
  494. }
  495. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  496. }
  497. */
  498. // Read packet type
  499. var packet tmp2p.Packet
  500. _n, err := protoReader.ReadMsg(&packet)
  501. c.recvMonitor.Update(_n)
  502. if err != nil {
  503. // stopServices was invoked and we are shutting down
  504. // receiving is excpected to fail since we will close the connection
  505. select {
  506. case <-c.quitRecvRoutine:
  507. break FOR_LOOP
  508. default:
  509. }
  510. if c.IsRunning() {
  511. if err == io.EOF {
  512. c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
  513. } else {
  514. c.Logger.Debug("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  515. }
  516. c.stopForError(err)
  517. }
  518. break FOR_LOOP
  519. }
  520. // Read more depending on packet type.
  521. switch pkt := packet.Sum.(type) {
  522. case *tmp2p.Packet_PacketPing:
  523. // TODO: prevent abuse, as they cause flush()'s.
  524. // https://github.com/tendermint/tendermint/issues/1190
  525. c.Logger.Debug("Receive Ping")
  526. select {
  527. case c.pong <- struct{}{}:
  528. default:
  529. // never block
  530. }
  531. case *tmp2p.Packet_PacketPong:
  532. c.Logger.Debug("Receive Pong")
  533. select {
  534. case c.pongTimeoutCh <- false:
  535. default:
  536. // never block
  537. }
  538. case *tmp2p.Packet_PacketMsg:
  539. channelID := byte(pkt.PacketMsg.ChannelID)
  540. channel, ok := c.channelsIdx[channelID]
  541. if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil {
  542. err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
  543. c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
  544. c.stopForError(err)
  545. break FOR_LOOP
  546. }
  547. msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
  548. if err != nil {
  549. if c.IsRunning() {
  550. c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
  551. c.stopForError(err)
  552. }
  553. break FOR_LOOP
  554. }
  555. if msgBytes != nil {
  556. c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes)
  557. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  558. c.onReceive(channelID, msgBytes)
  559. }
  560. default:
  561. err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
  562. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  563. c.stopForError(err)
  564. break FOR_LOOP
  565. }
  566. }
  567. // Cleanup
  568. close(c.pong)
  569. for range c.pong {
  570. // Drain
  571. }
  572. }
  573. // not goroutine-safe
  574. func (c *MConnection) stopPongTimer() {
  575. if c.pongTimer != nil {
  576. _ = c.pongTimer.Stop()
  577. c.pongTimer = nil
  578. }
  579. }
  580. // maxPacketMsgSize returns a maximum size of PacketMsg
  581. func (c *MConnection) maxPacketMsgSize() int {
  582. bz, err := proto.Marshal(mustWrapPacket(&tmp2p.PacketMsg{
  583. ChannelID: 0x01,
  584. EOF: true,
  585. Data: make([]byte, c.config.MaxPacketMsgPayloadSize),
  586. }))
  587. if err != nil {
  588. panic(err)
  589. }
  590. return len(bz)
  591. }
  592. type ConnectionStatus struct {
  593. Duration time.Duration
  594. SendMonitor flow.Status
  595. RecvMonitor flow.Status
  596. Channels []ChannelStatus
  597. }
  598. type ChannelStatus struct {
  599. ID byte
  600. SendQueueCapacity int
  601. SendQueueSize int
  602. Priority int
  603. RecentlySent int64
  604. }
  605. func (c *MConnection) Status() ConnectionStatus {
  606. var status ConnectionStatus
  607. status.Duration = time.Since(c.created)
  608. status.SendMonitor = c.sendMonitor.Status()
  609. status.RecvMonitor = c.recvMonitor.Status()
  610. status.Channels = make([]ChannelStatus, len(c.channels))
  611. for i, channel := range c.channels {
  612. status.Channels[i] = ChannelStatus{
  613. ID: channel.desc.ID,
  614. SendQueueCapacity: cap(channel.sendQueue),
  615. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  616. Priority: channel.desc.Priority,
  617. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  618. }
  619. }
  620. return status
  621. }
  622. //-----------------------------------------------------------------------------
  623. type ChannelDescriptor struct {
  624. ID byte
  625. Priority int
  626. SendQueueCapacity int
  627. RecvBufferCapacity int
  628. RecvMessageCapacity int
  629. }
  630. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  631. if chDesc.SendQueueCapacity == 0 {
  632. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  633. }
  634. if chDesc.RecvBufferCapacity == 0 {
  635. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  636. }
  637. if chDesc.RecvMessageCapacity == 0 {
  638. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  639. }
  640. filled = chDesc
  641. return
  642. }
  643. // TODO: lowercase.
  644. // NOTE: not goroutine-safe.
  645. type Channel struct {
  646. conn *MConnection
  647. desc ChannelDescriptor
  648. sendQueue chan []byte
  649. sendQueueSize int32 // atomic.
  650. recving []byte
  651. sending []byte
  652. recentlySent int64 // exponential moving average
  653. maxPacketMsgPayloadSize int
  654. Logger log.Logger
  655. }
  656. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  657. desc = desc.FillDefaults()
  658. if desc.Priority <= 0 {
  659. panic("Channel default priority must be a positive integer")
  660. }
  661. return &Channel{
  662. conn: conn,
  663. desc: desc,
  664. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  665. recving: make([]byte, 0, desc.RecvBufferCapacity),
  666. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  667. }
  668. }
  669. func (ch *Channel) SetLogger(l log.Logger) {
  670. ch.Logger = l
  671. }
  672. // Queues message to send to this channel.
  673. // Goroutine-safe
  674. // Times out (and returns false) after defaultSendTimeout
  675. func (ch *Channel) sendBytes(bytes []byte) bool {
  676. select {
  677. case ch.sendQueue <- bytes:
  678. atomic.AddInt32(&ch.sendQueueSize, 1)
  679. return true
  680. case <-time.After(defaultSendTimeout):
  681. return false
  682. }
  683. }
  684. // Queues message to send to this channel.
  685. // Nonblocking, returns true if successful.
  686. // Goroutine-safe
  687. func (ch *Channel) trySendBytes(bytes []byte) bool {
  688. select {
  689. case ch.sendQueue <- bytes:
  690. atomic.AddInt32(&ch.sendQueueSize, 1)
  691. return true
  692. default:
  693. return false
  694. }
  695. }
  696. // Goroutine-safe
  697. func (ch *Channel) loadSendQueueSize() (size int) {
  698. return int(atomic.LoadInt32(&ch.sendQueueSize))
  699. }
  700. // Goroutine-safe
  701. // Use only as a heuristic.
  702. func (ch *Channel) canSend() bool {
  703. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  704. }
  705. // Returns true if any PacketMsgs are pending to be sent.
  706. // Call before calling nextPacketMsg()
  707. // Goroutine-safe
  708. func (ch *Channel) isSendPending() bool {
  709. if len(ch.sending) == 0 {
  710. if len(ch.sendQueue) == 0 {
  711. return false
  712. }
  713. ch.sending = <-ch.sendQueue
  714. }
  715. return true
  716. }
  717. // Creates a new PacketMsg to send.
  718. // Not goroutine-safe
  719. func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg {
  720. packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)}
  721. maxSize := ch.maxPacketMsgPayloadSize
  722. packet.Data = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))]
  723. if len(ch.sending) <= maxSize {
  724. packet.EOF = true
  725. ch.sending = nil
  726. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  727. } else {
  728. packet.EOF = false
  729. ch.sending = ch.sending[tmmath.MinInt(maxSize, len(ch.sending)):]
  730. }
  731. return packet
  732. }
  733. // Writes next PacketMsg to w and updates c.recentlySent.
  734. // Not goroutine-safe
  735. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) {
  736. packet := ch.nextPacketMsg()
  737. n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet))
  738. atomic.AddInt64(&ch.recentlySent, int64(n))
  739. return
  740. }
  741. // Handles incoming PacketMsgs. It returns a message bytes if message is
  742. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  743. // Not goroutine-safe
  744. func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
  745. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  746. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data)
  747. if recvCap < recvReceived {
  748. return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  749. }
  750. ch.recving = append(ch.recving, packet.Data...)
  751. if packet.EOF {
  752. msgBytes := ch.recving
  753. // clear the slice without re-allocating.
  754. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  755. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  756. // at which point the recving slice stops being used and should be garbage collected
  757. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  758. return msgBytes, nil
  759. }
  760. return nil, nil
  761. }
  762. // Call this periodically to update stats for throttling purposes.
  763. // Not goroutine-safe
  764. func (ch *Channel) updateStats() {
  765. // Exponential decay of stats.
  766. // TODO: optimize.
  767. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  768. }
  769. //----------------------------------------
  770. // Packet
  771. // mustWrapPacket takes a packet kind (oneof) and wraps it in a tmp2p.Packet message.
  772. func mustWrapPacket(pb proto.Message) *tmp2p.Packet {
  773. var msg tmp2p.Packet
  774. switch pb := pb.(type) {
  775. case *tmp2p.Packet: // already a packet
  776. msg = *pb
  777. case *tmp2p.PacketPing:
  778. msg = tmp2p.Packet{
  779. Sum: &tmp2p.Packet_PacketPing{
  780. PacketPing: pb,
  781. },
  782. }
  783. case *tmp2p.PacketPong:
  784. msg = tmp2p.Packet{
  785. Sum: &tmp2p.Packet_PacketPong{
  786. PacketPong: pb,
  787. },
  788. }
  789. case *tmp2p.PacketMsg:
  790. msg = tmp2p.Packet{
  791. Sum: &tmp2p.Packet_PacketMsg{
  792. PacketMsg: pb,
  793. },
  794. }
  795. default:
  796. panic(fmt.Errorf("unknown packet type %T", pb))
  797. }
  798. return &msg
  799. }