You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

914 lines
24 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "runtime/debug"
  11. "sync/atomic"
  12. "time"
  13. "github.com/gogo/protobuf/proto"
  14. flow "github.com/tendermint/tendermint/libs/flowrate"
  15. "github.com/tendermint/tendermint/libs/log"
  16. tmmath "github.com/tendermint/tendermint/libs/math"
  17. "github.com/tendermint/tendermint/libs/protoio"
  18. "github.com/tendermint/tendermint/libs/service"
  19. tmsync "github.com/tendermint/tendermint/libs/sync"
  20. "github.com/tendermint/tendermint/libs/timer"
  21. tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  22. )
  23. const (
  24. defaultMaxPacketMsgPayloadSize = 1024
  25. numBatchPacketMsgs = 10
  26. minReadBufferSize = 1024
  27. minWriteBufferSize = 65536
  28. updateStats = 2 * time.Second
  29. // some of these defaults are written in the user config
  30. // flushThrottle, sendRate, recvRate
  31. // TODO: remove values present in config
  32. defaultFlushThrottle = 100 * time.Millisecond
  33. defaultSendQueueCapacity = 1
  34. defaultRecvBufferCapacity = 4096
  35. defaultRecvMessageCapacity = 22020096 // 21MB
  36. defaultSendRate = int64(512000) // 500KB/s
  37. defaultRecvRate = int64(512000) // 500KB/s
  38. defaultSendTimeout = 10 * time.Second
  39. defaultPingInterval = 60 * time.Second
  40. defaultPongTimeout = 45 * time.Second
  41. )
  42. type receiveCbFunc func(chID byte, msgBytes []byte)
  43. type errorCbFunc func(interface{})
  44. /*
  45. Each peer has one `MConnection` (multiplex connection) instance.
  46. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  47. several messages along a single channel of communication.
  48. Each `MConnection` handles message transmission on multiple abstract communication
  49. `Channel`s. Each channel has a globally unique byte id.
  50. The byte id and the relative priorities of each `Channel` are configured upon
  51. initialization of the connection.
  52. There are two methods for sending messages:
  53. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  54. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  55. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  56. successfully queued for the channel with the given id byte `chID`, or until the
  57. request times out. The message `msg` is serialized using Protobuf.
  58. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  59. channel's queue is full.
  60. Inbound message bytes are handled with an onReceive callback function.
  61. */
  62. type MConnection struct {
  63. service.BaseService
  64. conn net.Conn
  65. bufConnReader *bufio.Reader
  66. bufConnWriter *bufio.Writer
  67. sendMonitor *flow.Monitor
  68. recvMonitor *flow.Monitor
  69. send chan struct{}
  70. pong chan struct{}
  71. channels []*Channel
  72. channelsIdx map[byte]*Channel
  73. onReceive receiveCbFunc
  74. onError errorCbFunc
  75. errored uint32
  76. config MConnConfig
  77. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  78. // doneSendRoutine is closed when the sendRoutine actually quits.
  79. quitSendRoutine chan struct{}
  80. doneSendRoutine chan struct{}
  81. // Closing quitRecvRouting will cause the recvRouting to eventually quit.
  82. quitRecvRoutine chan struct{}
  83. // used to ensure FlushStop and OnStop
  84. // are safe to call concurrently.
  85. stopMtx tmsync.Mutex
  86. flushTimer *timer.ThrottleTimer // flush writes as necessary but throttled.
  87. pingTimer *time.Ticker // send pings periodically
  88. // close conn if pong is not received in pongTimeout
  89. pongTimer *time.Timer
  90. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  91. chStatsTimer *time.Ticker // update channel stats periodically
  92. created time.Time // time of creation
  93. _maxPacketMsgSize int
  94. }
  95. // MConnConfig is a MConnection configuration.
  96. type MConnConfig struct {
  97. SendRate int64 `mapstructure:"send_rate"`
  98. RecvRate int64 `mapstructure:"recv_rate"`
  99. // Maximum payload size
  100. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  101. // Interval to flush writes (throttled)
  102. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  103. // Interval to send pings
  104. PingInterval time.Duration `mapstructure:"ping_interval"`
  105. // Maximum wait time for pongs
  106. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  107. }
  108. // DefaultMConnConfig returns the default config.
  109. func DefaultMConnConfig() MConnConfig {
  110. return MConnConfig{
  111. SendRate: defaultSendRate,
  112. RecvRate: defaultRecvRate,
  113. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  114. FlushThrottle: defaultFlushThrottle,
  115. PingInterval: defaultPingInterval,
  116. PongTimeout: defaultPongTimeout,
  117. }
  118. }
  119. // NewMConnection wraps net.Conn and creates multiplex connection
  120. func NewMConnection(
  121. conn net.Conn,
  122. chDescs []*ChannelDescriptor,
  123. onReceive receiveCbFunc,
  124. onError errorCbFunc,
  125. ) *MConnection {
  126. return NewMConnectionWithConfig(
  127. conn,
  128. chDescs,
  129. onReceive,
  130. onError,
  131. DefaultMConnConfig())
  132. }
  133. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  134. func NewMConnectionWithConfig(
  135. conn net.Conn,
  136. chDescs []*ChannelDescriptor,
  137. onReceive receiveCbFunc,
  138. onError errorCbFunc,
  139. config MConnConfig,
  140. ) *MConnection {
  141. if config.PongTimeout >= config.PingInterval {
  142. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  143. }
  144. mconn := &MConnection{
  145. conn: conn,
  146. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  147. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  148. sendMonitor: flow.New(0, 0),
  149. recvMonitor: flow.New(0, 0),
  150. send: make(chan struct{}, 1),
  151. pong: make(chan struct{}, 1),
  152. onReceive: onReceive,
  153. onError: onError,
  154. config: config,
  155. created: time.Now(),
  156. }
  157. // Create channels
  158. var channelsIdx = map[byte]*Channel{}
  159. var channels = []*Channel{}
  160. for _, desc := range chDescs {
  161. channel := newChannel(mconn, *desc)
  162. channelsIdx[channel.desc.ID] = channel
  163. channels = append(channels, channel)
  164. }
  165. mconn.channels = channels
  166. mconn.channelsIdx = channelsIdx
  167. mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn)
  168. // maxPacketMsgSize() is a bit heavy, so call just once
  169. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  170. return mconn
  171. }
  172. func (c *MConnection) SetLogger(l log.Logger) {
  173. c.BaseService.SetLogger(l)
  174. for _, ch := range c.channels {
  175. ch.SetLogger(l)
  176. }
  177. }
  178. // OnStart implements BaseService
  179. func (c *MConnection) OnStart() error {
  180. if err := c.BaseService.OnStart(); err != nil {
  181. return err
  182. }
  183. c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle)
  184. c.pingTimer = time.NewTicker(c.config.PingInterval)
  185. c.pongTimeoutCh = make(chan bool, 1)
  186. c.chStatsTimer = time.NewTicker(updateStats)
  187. c.quitSendRoutine = make(chan struct{})
  188. c.doneSendRoutine = make(chan struct{})
  189. c.quitRecvRoutine = make(chan struct{})
  190. go c.sendRoutine()
  191. go c.recvRoutine()
  192. return nil
  193. }
  194. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  195. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  196. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  197. func (c *MConnection) stopServices() (alreadyStopped bool) {
  198. c.stopMtx.Lock()
  199. defer c.stopMtx.Unlock()
  200. select {
  201. case <-c.quitSendRoutine:
  202. // already quit
  203. return true
  204. default:
  205. }
  206. select {
  207. case <-c.quitRecvRoutine:
  208. // already quit
  209. return true
  210. default:
  211. }
  212. c.BaseService.OnStop()
  213. c.flushTimer.Stop()
  214. c.pingTimer.Stop()
  215. c.chStatsTimer.Stop()
  216. // inform the recvRouting that we are shutting down
  217. close(c.quitRecvRoutine)
  218. close(c.quitSendRoutine)
  219. return false
  220. }
  221. // FlushStop replicates the logic of OnStop.
  222. // It additionally ensures that all successful
  223. // .Send() calls will get flushed before closing
  224. // the connection.
  225. func (c *MConnection) FlushStop() {
  226. if c.stopServices() {
  227. return
  228. }
  229. // this block is unique to FlushStop
  230. {
  231. // wait until the sendRoutine exits
  232. // so we dont race on calling sendSomePacketMsgs
  233. <-c.doneSendRoutine
  234. // Send and flush all pending msgs.
  235. // Since sendRoutine has exited, we can call this
  236. // safely
  237. eof := c.sendSomePacketMsgs()
  238. for !eof {
  239. eof = c.sendSomePacketMsgs()
  240. }
  241. c.flush()
  242. // Now we can close the connection
  243. }
  244. c.conn.Close()
  245. // We can't close pong safely here because
  246. // recvRoutine may write to it after we've stopped.
  247. // Though it doesn't need to get closed at all,
  248. // we close it @ recvRoutine.
  249. // c.Stop()
  250. }
  251. // OnStop implements BaseService
  252. func (c *MConnection) OnStop() {
  253. if c.stopServices() {
  254. return
  255. }
  256. c.conn.Close()
  257. // We can't close pong safely here because
  258. // recvRoutine may write to it after we've stopped.
  259. // Though it doesn't need to get closed at all,
  260. // we close it @ recvRoutine.
  261. }
  262. func (c *MConnection) String() string {
  263. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  264. }
  265. func (c *MConnection) flush() {
  266. c.Logger.Debug("Flush", "conn", c)
  267. err := c.bufConnWriter.Flush()
  268. if err != nil {
  269. c.Logger.Debug("MConnection flush failed", "err", err)
  270. }
  271. }
  272. // Catch panics, usually caused by remote disconnects.
  273. func (c *MConnection) _recover() {
  274. if r := recover(); r != nil {
  275. c.Logger.Error("MConnection panicked", "err", r, "stack", string(debug.Stack()))
  276. c.stopForError(fmt.Errorf("recovered from panic: %v", r))
  277. }
  278. }
  279. func (c *MConnection) stopForError(r interface{}) {
  280. if err := c.Stop(); err != nil {
  281. c.Logger.Error("Error stopping connection", "err", err)
  282. }
  283. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  284. if c.onError != nil {
  285. c.onError(r)
  286. }
  287. }
  288. }
  289. // Queues a message to be sent to channel.
  290. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  291. if !c.IsRunning() {
  292. return false
  293. }
  294. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  295. // Send message to channel.
  296. channel, ok := c.channelsIdx[chID]
  297. if !ok {
  298. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  299. return false
  300. }
  301. success := channel.sendBytes(msgBytes)
  302. if success {
  303. // Wake up sendRoutine if necessary
  304. select {
  305. case c.send <- struct{}{}:
  306. default:
  307. }
  308. } else {
  309. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  310. }
  311. return success
  312. }
  313. // Queues a message to be sent to channel.
  314. // Nonblocking, returns true if successful.
  315. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  316. if !c.IsRunning() {
  317. return false
  318. }
  319. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  320. // Send message to channel.
  321. channel, ok := c.channelsIdx[chID]
  322. if !ok {
  323. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  324. return false
  325. }
  326. ok = channel.trySendBytes(msgBytes)
  327. if ok {
  328. // Wake up sendRoutine if necessary
  329. select {
  330. case c.send <- struct{}{}:
  331. default:
  332. }
  333. }
  334. return ok
  335. }
  336. // CanSend returns true if you can send more data onto the chID, false
  337. // otherwise. Use only as a heuristic.
  338. func (c *MConnection) CanSend(chID byte) bool {
  339. if !c.IsRunning() {
  340. return false
  341. }
  342. channel, ok := c.channelsIdx[chID]
  343. if !ok {
  344. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  345. return false
  346. }
  347. return channel.canSend()
  348. }
  349. // sendRoutine polls for packets to send from channels.
  350. func (c *MConnection) sendRoutine() {
  351. defer c._recover()
  352. protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter)
  353. FOR_LOOP:
  354. for {
  355. var _n int
  356. var err error
  357. SELECTION:
  358. select {
  359. case <-c.flushTimer.Ch:
  360. // NOTE: flushTimer.Set() must be called every time
  361. // something is written to .bufConnWriter.
  362. c.flush()
  363. case <-c.chStatsTimer.C:
  364. for _, channel := range c.channels {
  365. channel.updateStats()
  366. }
  367. case <-c.pingTimer.C:
  368. c.Logger.Debug("Send Ping")
  369. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  370. if err != nil {
  371. c.Logger.Error("Failed to send PacketPing", "err", err)
  372. break SELECTION
  373. }
  374. c.sendMonitor.Update(_n)
  375. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  376. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  377. select {
  378. case c.pongTimeoutCh <- true:
  379. default:
  380. }
  381. })
  382. c.flush()
  383. case timeout := <-c.pongTimeoutCh:
  384. if timeout {
  385. c.Logger.Debug("Pong timeout")
  386. err = errors.New("pong timeout")
  387. } else {
  388. c.stopPongTimer()
  389. }
  390. case <-c.pong:
  391. c.Logger.Debug("Send Pong")
  392. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  393. if err != nil {
  394. c.Logger.Error("Failed to send PacketPong", "err", err)
  395. break SELECTION
  396. }
  397. c.sendMonitor.Update(_n)
  398. c.flush()
  399. case <-c.quitSendRoutine:
  400. break FOR_LOOP
  401. case <-c.send:
  402. // Send some PacketMsgs
  403. eof := c.sendSomePacketMsgs()
  404. if !eof {
  405. // Keep sendRoutine awake.
  406. select {
  407. case c.send <- struct{}{}:
  408. default:
  409. }
  410. }
  411. }
  412. if !c.IsRunning() {
  413. break FOR_LOOP
  414. }
  415. if err != nil {
  416. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  417. c.stopForError(err)
  418. break FOR_LOOP
  419. }
  420. }
  421. // Cleanup
  422. c.stopPongTimer()
  423. close(c.doneSendRoutine)
  424. }
  425. // Returns true if messages from channels were exhausted.
  426. // Blocks in accordance to .sendMonitor throttling.
  427. func (c *MConnection) sendSomePacketMsgs() bool {
  428. // Block until .sendMonitor says we can write.
  429. // Once we're ready we send more than we asked for,
  430. // but amortized it should even out.
  431. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  432. // Now send some PacketMsgs.
  433. for i := 0; i < numBatchPacketMsgs; i++ {
  434. if c.sendPacketMsg() {
  435. return true
  436. }
  437. }
  438. return false
  439. }
  440. // Returns true if messages from channels were exhausted.
  441. func (c *MConnection) sendPacketMsg() bool {
  442. // Choose a channel to create a PacketMsg from.
  443. // The chosen channel will be the one whose recentlySent/priority is the least.
  444. var leastRatio float32 = math.MaxFloat32
  445. var leastChannel *Channel
  446. for _, channel := range c.channels {
  447. // If nothing to send, skip this channel
  448. if !channel.isSendPending() {
  449. continue
  450. }
  451. // Get ratio, and keep track of lowest ratio.
  452. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  453. if ratio < leastRatio {
  454. leastRatio = ratio
  455. leastChannel = channel
  456. }
  457. }
  458. // Nothing to send?
  459. if leastChannel == nil {
  460. return true
  461. }
  462. // c.Logger.Info("Found a msgPacket to send")
  463. // Make & send a PacketMsg from this channel
  464. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  465. if err != nil {
  466. c.Logger.Error("Failed to write PacketMsg", "err", err)
  467. c.stopForError(err)
  468. return true
  469. }
  470. c.sendMonitor.Update(_n)
  471. c.flushTimer.Set()
  472. return false
  473. }
  474. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  475. // After a whole message has been assembled, it's pushed to onReceive().
  476. // Blocks depending on how the connection is throttled.
  477. // Otherwise, it never blocks.
  478. func (c *MConnection) recvRoutine() {
  479. defer c._recover()
  480. protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)
  481. FOR_LOOP:
  482. for {
  483. // Block until .recvMonitor says we can read.
  484. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  485. // Peek into bufConnReader for debugging
  486. /*
  487. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  488. bz, err := c.bufConnReader.Peek(tmmath.MinInt(numBytes, 100))
  489. if err == nil {
  490. // return
  491. } else {
  492. c.Logger.Debug("Error peeking connection buffer", "err", err)
  493. // return nil
  494. }
  495. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  496. }
  497. */
  498. // Read packet type
  499. var packet tmp2p.Packet
  500. err := protoReader.ReadMsg(&packet)
  501. if err != nil {
  502. // stopServices was invoked and we are shutting down
  503. // receiving is excpected to fail since we will close the connection
  504. select {
  505. case <-c.quitRecvRoutine:
  506. break FOR_LOOP
  507. default:
  508. }
  509. if c.IsRunning() {
  510. if err == io.EOF {
  511. c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
  512. } else {
  513. c.Logger.Debug("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  514. }
  515. c.stopForError(err)
  516. }
  517. break FOR_LOOP
  518. }
  519. // Read more depending on packet type.
  520. switch pkt := packet.Sum.(type) {
  521. case *tmp2p.Packet_PacketPing:
  522. // TODO: prevent abuse, as they cause flush()'s.
  523. // https://github.com/tendermint/tendermint/issues/1190
  524. c.Logger.Debug("Receive Ping")
  525. select {
  526. case c.pong <- struct{}{}:
  527. default:
  528. // never block
  529. }
  530. case *tmp2p.Packet_PacketPong:
  531. c.Logger.Debug("Receive Pong")
  532. select {
  533. case c.pongTimeoutCh <- false:
  534. default:
  535. // never block
  536. }
  537. case *tmp2p.Packet_PacketMsg:
  538. channel, ok := c.channelsIdx[byte(pkt.PacketMsg.ChannelID)]
  539. if !ok || channel == nil {
  540. err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
  541. c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
  542. c.stopForError(err)
  543. break FOR_LOOP
  544. }
  545. msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
  546. if err != nil {
  547. if c.IsRunning() {
  548. c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
  549. c.stopForError(err)
  550. }
  551. break FOR_LOOP
  552. }
  553. if msgBytes != nil {
  554. c.Logger.Debug("Received bytes", "chID", pkt.PacketMsg.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  555. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  556. c.onReceive(byte(pkt.PacketMsg.ChannelID), msgBytes)
  557. }
  558. default:
  559. err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
  560. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  561. c.stopForError(err)
  562. break FOR_LOOP
  563. }
  564. }
  565. // Cleanup
  566. close(c.pong)
  567. for range c.pong {
  568. // Drain
  569. }
  570. }
  571. // not goroutine-safe
  572. func (c *MConnection) stopPongTimer() {
  573. if c.pongTimer != nil {
  574. _ = c.pongTimer.Stop()
  575. c.pongTimer = nil
  576. }
  577. }
  578. // maxPacketMsgSize returns a maximum size of PacketMsg
  579. func (c *MConnection) maxPacketMsgSize() int {
  580. bz, err := proto.Marshal(mustWrapPacket(&tmp2p.PacketMsg{
  581. ChannelID: 0x01,
  582. EOF: true,
  583. Data: make([]byte, c.config.MaxPacketMsgPayloadSize),
  584. }))
  585. if err != nil {
  586. panic(err)
  587. }
  588. return len(bz)
  589. }
  590. type ConnectionStatus struct {
  591. Duration time.Duration
  592. SendMonitor flow.Status
  593. RecvMonitor flow.Status
  594. Channels []ChannelStatus
  595. }
  596. type ChannelStatus struct {
  597. ID byte
  598. SendQueueCapacity int
  599. SendQueueSize int
  600. Priority int
  601. RecentlySent int64
  602. }
  603. func (c *MConnection) Status() ConnectionStatus {
  604. var status ConnectionStatus
  605. status.Duration = time.Since(c.created)
  606. status.SendMonitor = c.sendMonitor.Status()
  607. status.RecvMonitor = c.recvMonitor.Status()
  608. status.Channels = make([]ChannelStatus, len(c.channels))
  609. for i, channel := range c.channels {
  610. status.Channels[i] = ChannelStatus{
  611. ID: channel.desc.ID,
  612. SendQueueCapacity: cap(channel.sendQueue),
  613. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  614. Priority: channel.desc.Priority,
  615. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  616. }
  617. }
  618. return status
  619. }
  620. //-----------------------------------------------------------------------------
  621. type ChannelDescriptor struct {
  622. ID byte
  623. Priority int
  624. SendQueueCapacity int
  625. RecvBufferCapacity int
  626. RecvMessageCapacity int
  627. }
  628. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  629. if chDesc.SendQueueCapacity == 0 {
  630. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  631. }
  632. if chDesc.RecvBufferCapacity == 0 {
  633. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  634. }
  635. if chDesc.RecvMessageCapacity == 0 {
  636. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  637. }
  638. filled = chDesc
  639. return
  640. }
  641. // TODO: lowercase.
  642. // NOTE: not goroutine-safe.
  643. type Channel struct {
  644. conn *MConnection
  645. desc ChannelDescriptor
  646. sendQueue chan []byte
  647. sendQueueSize int32 // atomic.
  648. recving []byte
  649. sending []byte
  650. recentlySent int64 // exponential moving average
  651. maxPacketMsgPayloadSize int
  652. Logger log.Logger
  653. }
  654. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  655. desc = desc.FillDefaults()
  656. if desc.Priority <= 0 {
  657. panic("Channel default priority must be a positive integer")
  658. }
  659. return &Channel{
  660. conn: conn,
  661. desc: desc,
  662. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  663. recving: make([]byte, 0, desc.RecvBufferCapacity),
  664. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  665. }
  666. }
  667. func (ch *Channel) SetLogger(l log.Logger) {
  668. ch.Logger = l
  669. }
  670. // Queues message to send to this channel.
  671. // Goroutine-safe
  672. // Times out (and returns false) after defaultSendTimeout
  673. func (ch *Channel) sendBytes(bytes []byte) bool {
  674. select {
  675. case ch.sendQueue <- bytes:
  676. atomic.AddInt32(&ch.sendQueueSize, 1)
  677. return true
  678. case <-time.After(defaultSendTimeout):
  679. return false
  680. }
  681. }
  682. // Queues message to send to this channel.
  683. // Nonblocking, returns true if successful.
  684. // Goroutine-safe
  685. func (ch *Channel) trySendBytes(bytes []byte) bool {
  686. select {
  687. case ch.sendQueue <- bytes:
  688. atomic.AddInt32(&ch.sendQueueSize, 1)
  689. return true
  690. default:
  691. return false
  692. }
  693. }
  694. // Goroutine-safe
  695. func (ch *Channel) loadSendQueueSize() (size int) {
  696. return int(atomic.LoadInt32(&ch.sendQueueSize))
  697. }
  698. // Goroutine-safe
  699. // Use only as a heuristic.
  700. func (ch *Channel) canSend() bool {
  701. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  702. }
  703. // Returns true if any PacketMsgs are pending to be sent.
  704. // Call before calling nextPacketMsg()
  705. // Goroutine-safe
  706. func (ch *Channel) isSendPending() bool {
  707. if len(ch.sending) == 0 {
  708. if len(ch.sendQueue) == 0 {
  709. return false
  710. }
  711. ch.sending = <-ch.sendQueue
  712. }
  713. return true
  714. }
  715. // Creates a new PacketMsg to send.
  716. // Not goroutine-safe
  717. func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg {
  718. packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)}
  719. maxSize := ch.maxPacketMsgPayloadSize
  720. packet.Data = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))]
  721. if len(ch.sending) <= maxSize {
  722. packet.EOF = true
  723. ch.sending = nil
  724. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  725. } else {
  726. packet.EOF = false
  727. ch.sending = ch.sending[tmmath.MinInt(maxSize, len(ch.sending)):]
  728. }
  729. return packet
  730. }
  731. // Writes next PacketMsg to w and updates c.recentlySent.
  732. // Not goroutine-safe
  733. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) {
  734. packet := ch.nextPacketMsg()
  735. n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet))
  736. atomic.AddInt64(&ch.recentlySent, int64(n))
  737. return
  738. }
  739. // Handles incoming PacketMsgs. It returns a message bytes if message is
  740. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  741. // Not goroutine-safe
  742. func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
  743. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  744. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data)
  745. if recvCap < recvReceived {
  746. return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  747. }
  748. ch.recving = append(ch.recving, packet.Data...)
  749. if packet.EOF {
  750. msgBytes := ch.recving
  751. // clear the slice without re-allocating.
  752. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  753. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  754. // at which point the recving slice stops being used and should be garbage collected
  755. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  756. return msgBytes, nil
  757. }
  758. return nil, nil
  759. }
  760. // Call this periodically to update stats for throttling purposes.
  761. // Not goroutine-safe
  762. func (ch *Channel) updateStats() {
  763. // Exponential decay of stats.
  764. // TODO: optimize.
  765. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  766. }
  767. //----------------------------------------
  768. // Packet
  769. // mustWrapPacket takes a packet kind (oneof) and wraps it in a tmp2p.Packet message.
  770. func mustWrapPacket(pb proto.Message) *tmp2p.Packet {
  771. var msg tmp2p.Packet
  772. switch pb := pb.(type) {
  773. case *tmp2p.Packet: // already a packet
  774. msg = *pb
  775. case *tmp2p.PacketPing:
  776. msg = tmp2p.Packet{
  777. Sum: &tmp2p.Packet_PacketPing{
  778. PacketPing: pb,
  779. },
  780. }
  781. case *tmp2p.PacketPong:
  782. msg = tmp2p.Packet{
  783. Sum: &tmp2p.Packet_PacketPong{
  784. PacketPong: pb,
  785. },
  786. }
  787. case *tmp2p.PacketMsg:
  788. msg = tmp2p.Packet{
  789. Sum: &tmp2p.Packet_PacketMsg{
  790. PacketMsg: pb,
  791. },
  792. }
  793. default:
  794. panic(fmt.Errorf("unknown packet type %T", pb))
  795. }
  796. return &msg
  797. }