You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

912 lines
24 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "runtime/debug"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/gogo/protobuf/proto"
  15. flow "github.com/tendermint/tendermint/libs/flowrate"
  16. "github.com/tendermint/tendermint/libs/log"
  17. tmmath "github.com/tendermint/tendermint/libs/math"
  18. "github.com/tendermint/tendermint/libs/protoio"
  19. "github.com/tendermint/tendermint/libs/service"
  20. "github.com/tendermint/tendermint/libs/timer"
  21. tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  22. )
  23. const (
  24. defaultMaxPacketMsgPayloadSize = 1024
  25. numBatchPacketMsgs = 10
  26. minReadBufferSize = 1024
  27. minWriteBufferSize = 65536
  28. updateStats = 2 * time.Second
  29. // some of these defaults are written in the user config
  30. // flushThrottle, sendRate, recvRate
  31. // TODO: remove values present in config
  32. defaultFlushThrottle = 100 * time.Millisecond
  33. defaultSendQueueCapacity = 1
  34. defaultRecvBufferCapacity = 4096
  35. defaultRecvMessageCapacity = 22020096 // 21MB
  36. defaultSendRate = int64(512000) // 500KB/s
  37. defaultRecvRate = int64(512000) // 500KB/s
  38. defaultSendTimeout = 10 * time.Second
  39. defaultPingInterval = 60 * time.Second
  40. defaultPongTimeout = 45 * time.Second
  41. )
  42. type receiveCbFunc func(chID byte, msgBytes []byte)
  43. type errorCbFunc func(interface{})
  44. /*
  45. Each peer has one `MConnection` (multiplex connection) instance.
  46. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  47. several messages along a single channel of communication.
  48. Each `MConnection` handles message transmission on multiple abstract communication
  49. `Channel`s. Each channel has a globally unique byte id.
  50. The byte id and the relative priorities of each `Channel` are configured upon
  51. initialization of the connection.
  52. There are two methods for sending messages:
  53. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  54. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  55. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  56. successfully queued for the channel with the given id byte `chID`, or until the
  57. request times out. The message `msg` is serialized using Protobuf.
  58. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  59. channel's queue is full.
  60. Inbound message bytes are handled with an onReceive callback function.
  61. */
  62. type MConnection struct {
  63. service.BaseService
  64. conn net.Conn
  65. bufConnReader *bufio.Reader
  66. bufConnWriter *bufio.Writer
  67. sendMonitor *flow.Monitor
  68. recvMonitor *flow.Monitor
  69. send chan struct{}
  70. pong chan struct{}
  71. channels []*Channel
  72. channelsIdx map[byte]*Channel
  73. onReceive receiveCbFunc
  74. onError errorCbFunc
  75. errored uint32
  76. config MConnConfig
  77. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  78. // doneSendRoutine is closed when the sendRoutine actually quits.
  79. quitSendRoutine chan struct{}
  80. doneSendRoutine chan struct{}
  81. // Closing quitRecvRouting will cause the recvRouting to eventually quit.
  82. quitRecvRoutine chan struct{}
  83. // used to ensure FlushStop and OnStop
  84. // are safe to call concurrently.
  85. stopMtx sync.Mutex
  86. flushTimer *timer.ThrottleTimer // flush writes as necessary but throttled.
  87. pingTimer *time.Ticker // send pings periodically
  88. // close conn if pong is not received in pongTimeout
  89. pongTimer *time.Timer
  90. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  91. chStatsTimer *time.Ticker // update channel stats periodically
  92. created time.Time // time of creation
  93. _maxPacketMsgSize int
  94. }
  95. // MConnConfig is a MConnection configuration.
  96. type MConnConfig struct {
  97. SendRate int64 `mapstructure:"send_rate"`
  98. RecvRate int64 `mapstructure:"recv_rate"`
  99. // Maximum payload size
  100. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  101. // Interval to flush writes (throttled)
  102. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  103. // Interval to send pings
  104. PingInterval time.Duration `mapstructure:"ping_interval"`
  105. // Maximum wait time for pongs
  106. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  107. }
  108. // DefaultMConnConfig returns the default config.
  109. func DefaultMConnConfig() MConnConfig {
  110. return MConnConfig{
  111. SendRate: defaultSendRate,
  112. RecvRate: defaultRecvRate,
  113. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  114. FlushThrottle: defaultFlushThrottle,
  115. PingInterval: defaultPingInterval,
  116. PongTimeout: defaultPongTimeout,
  117. }
  118. }
  119. // NewMConnection wraps net.Conn and creates multiplex connection
  120. func NewMConnection(
  121. conn net.Conn,
  122. chDescs []*ChannelDescriptor,
  123. onReceive receiveCbFunc,
  124. onError errorCbFunc,
  125. ) *MConnection {
  126. return NewMConnectionWithConfig(
  127. conn,
  128. chDescs,
  129. onReceive,
  130. onError,
  131. DefaultMConnConfig())
  132. }
  133. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  134. func NewMConnectionWithConfig(
  135. conn net.Conn,
  136. chDescs []*ChannelDescriptor,
  137. onReceive receiveCbFunc,
  138. onError errorCbFunc,
  139. config MConnConfig,
  140. ) *MConnection {
  141. if config.PongTimeout >= config.PingInterval {
  142. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  143. }
  144. mconn := &MConnection{
  145. conn: conn,
  146. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  147. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  148. sendMonitor: flow.New(0, 0),
  149. recvMonitor: flow.New(0, 0),
  150. send: make(chan struct{}, 1),
  151. pong: make(chan struct{}, 1),
  152. onReceive: onReceive,
  153. onError: onError,
  154. config: config,
  155. created: time.Now(),
  156. }
  157. // Create channels
  158. var channelsIdx = map[byte]*Channel{}
  159. var channels = []*Channel{}
  160. for _, desc := range chDescs {
  161. channel := newChannel(mconn, *desc)
  162. channelsIdx[channel.desc.ID] = channel
  163. channels = append(channels, channel)
  164. }
  165. mconn.channels = channels
  166. mconn.channelsIdx = channelsIdx
  167. mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn)
  168. // maxPacketMsgSize() is a bit heavy, so call just once
  169. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  170. return mconn
  171. }
  172. func (c *MConnection) SetLogger(l log.Logger) {
  173. c.BaseService.SetLogger(l)
  174. for _, ch := range c.channels {
  175. ch.SetLogger(l)
  176. }
  177. }
  178. // OnStart implements BaseService
  179. func (c *MConnection) OnStart() error {
  180. if err := c.BaseService.OnStart(); err != nil {
  181. return err
  182. }
  183. c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle)
  184. c.pingTimer = time.NewTicker(c.config.PingInterval)
  185. c.pongTimeoutCh = make(chan bool, 1)
  186. c.chStatsTimer = time.NewTicker(updateStats)
  187. c.quitSendRoutine = make(chan struct{})
  188. c.doneSendRoutine = make(chan struct{})
  189. c.quitRecvRoutine = make(chan struct{})
  190. go c.sendRoutine()
  191. go c.recvRoutine()
  192. return nil
  193. }
  194. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  195. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  196. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  197. func (c *MConnection) stopServices() (alreadyStopped bool) {
  198. c.stopMtx.Lock()
  199. defer c.stopMtx.Unlock()
  200. select {
  201. case <-c.quitSendRoutine:
  202. // already quit
  203. return true
  204. default:
  205. }
  206. select {
  207. case <-c.quitRecvRoutine:
  208. // already quit
  209. return true
  210. default:
  211. }
  212. c.BaseService.OnStop()
  213. c.flushTimer.Stop()
  214. c.pingTimer.Stop()
  215. c.chStatsTimer.Stop()
  216. // inform the recvRouting that we are shutting down
  217. close(c.quitRecvRoutine)
  218. close(c.quitSendRoutine)
  219. return false
  220. }
  221. // FlushStop replicates the logic of OnStop.
  222. // It additionally ensures that all successful
  223. // .Send() calls will get flushed before closing
  224. // the connection.
  225. func (c *MConnection) FlushStop() {
  226. if c.stopServices() {
  227. return
  228. }
  229. // this block is unique to FlushStop
  230. {
  231. // wait until the sendRoutine exits
  232. // so we dont race on calling sendSomePacketMsgs
  233. <-c.doneSendRoutine
  234. // Send and flush all pending msgs.
  235. // Since sendRoutine has exited, we can call this
  236. // safely
  237. eof := c.sendSomePacketMsgs()
  238. for !eof {
  239. eof = c.sendSomePacketMsgs()
  240. }
  241. c.flush()
  242. // Now we can close the connection
  243. }
  244. c.conn.Close()
  245. // We can't close pong safely here because
  246. // recvRoutine may write to it after we've stopped.
  247. // Though it doesn't need to get closed at all,
  248. // we close it @ recvRoutine.
  249. // c.Stop()
  250. }
  251. // OnStop implements BaseService
  252. func (c *MConnection) OnStop() {
  253. if c.stopServices() {
  254. return
  255. }
  256. c.conn.Close()
  257. // We can't close pong safely here because
  258. // recvRoutine may write to it after we've stopped.
  259. // Though it doesn't need to get closed at all,
  260. // we close it @ recvRoutine.
  261. }
  262. func (c *MConnection) String() string {
  263. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  264. }
  265. func (c *MConnection) flush() {
  266. c.Logger.Debug("Flush", "conn", c)
  267. err := c.bufConnWriter.Flush()
  268. if err != nil {
  269. c.Logger.Error("MConnection flush failed", "err", err)
  270. }
  271. }
  272. // Catch panics, usually caused by remote disconnects.
  273. func (c *MConnection) _recover() {
  274. if r := recover(); r != nil {
  275. c.Logger.Error("MConnection panicked", "err", r, "stack", string(debug.Stack()))
  276. c.stopForError(fmt.Errorf("recovered from panic: %v", r))
  277. }
  278. }
  279. func (c *MConnection) stopForError(r interface{}) {
  280. c.Stop()
  281. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  282. if c.onError != nil {
  283. c.onError(r)
  284. }
  285. }
  286. }
  287. // Queues a message to be sent to channel.
  288. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  289. if !c.IsRunning() {
  290. return false
  291. }
  292. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  293. // Send message to channel.
  294. channel, ok := c.channelsIdx[chID]
  295. if !ok {
  296. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  297. return false
  298. }
  299. success := channel.sendBytes(msgBytes)
  300. if success {
  301. // Wake up sendRoutine if necessary
  302. select {
  303. case c.send <- struct{}{}:
  304. default:
  305. }
  306. } else {
  307. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  308. }
  309. return success
  310. }
  311. // Queues a message to be sent to channel.
  312. // Nonblocking, returns true if successful.
  313. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  314. if !c.IsRunning() {
  315. return false
  316. }
  317. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  318. // Send message to channel.
  319. channel, ok := c.channelsIdx[chID]
  320. if !ok {
  321. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  322. return false
  323. }
  324. ok = channel.trySendBytes(msgBytes)
  325. if ok {
  326. // Wake up sendRoutine if necessary
  327. select {
  328. case c.send <- struct{}{}:
  329. default:
  330. }
  331. }
  332. return ok
  333. }
  334. // CanSend returns true if you can send more data onto the chID, false
  335. // otherwise. Use only as a heuristic.
  336. func (c *MConnection) CanSend(chID byte) bool {
  337. if !c.IsRunning() {
  338. return false
  339. }
  340. channel, ok := c.channelsIdx[chID]
  341. if !ok {
  342. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  343. return false
  344. }
  345. return channel.canSend()
  346. }
  347. // sendRoutine polls for packets to send from channels.
  348. func (c *MConnection) sendRoutine() {
  349. defer c._recover()
  350. protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter)
  351. FOR_LOOP:
  352. for {
  353. var _n int
  354. var err error
  355. SELECTION:
  356. select {
  357. case <-c.flushTimer.Ch:
  358. // NOTE: flushTimer.Set() must be called every time
  359. // something is written to .bufConnWriter.
  360. c.flush()
  361. case <-c.chStatsTimer.C:
  362. for _, channel := range c.channels {
  363. channel.updateStats()
  364. }
  365. case <-c.pingTimer.C:
  366. c.Logger.Debug("Send Ping")
  367. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  368. if err != nil {
  369. c.Logger.Error("Failed to send PacketPing", "err", err)
  370. break SELECTION
  371. }
  372. c.sendMonitor.Update(_n)
  373. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  374. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  375. select {
  376. case c.pongTimeoutCh <- true:
  377. default:
  378. }
  379. })
  380. c.flush()
  381. case timeout := <-c.pongTimeoutCh:
  382. if timeout {
  383. c.Logger.Debug("Pong timeout")
  384. err = errors.New("pong timeout")
  385. } else {
  386. c.stopPongTimer()
  387. }
  388. case <-c.pong:
  389. c.Logger.Debug("Send Pong")
  390. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  391. if err != nil {
  392. c.Logger.Error("Failed to send PacketPong", "err", err)
  393. break SELECTION
  394. }
  395. c.sendMonitor.Update(_n)
  396. c.flush()
  397. case <-c.quitSendRoutine:
  398. break FOR_LOOP
  399. case <-c.send:
  400. // Send some PacketMsgs
  401. eof := c.sendSomePacketMsgs()
  402. if !eof {
  403. // Keep sendRoutine awake.
  404. select {
  405. case c.send <- struct{}{}:
  406. default:
  407. }
  408. }
  409. }
  410. if !c.IsRunning() {
  411. break FOR_LOOP
  412. }
  413. if err != nil {
  414. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  415. c.stopForError(err)
  416. break FOR_LOOP
  417. }
  418. }
  419. // Cleanup
  420. c.stopPongTimer()
  421. close(c.doneSendRoutine)
  422. }
  423. // Returns true if messages from channels were exhausted.
  424. // Blocks in accordance to .sendMonitor throttling.
  425. func (c *MConnection) sendSomePacketMsgs() bool {
  426. // Block until .sendMonitor says we can write.
  427. // Once we're ready we send more than we asked for,
  428. // but amortized it should even out.
  429. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  430. // Now send some PacketMsgs.
  431. for i := 0; i < numBatchPacketMsgs; i++ {
  432. if c.sendPacketMsg() {
  433. return true
  434. }
  435. }
  436. return false
  437. }
  438. // Returns true if messages from channels were exhausted.
  439. func (c *MConnection) sendPacketMsg() bool {
  440. // Choose a channel to create a PacketMsg from.
  441. // The chosen channel will be the one whose recentlySent/priority is the least.
  442. var leastRatio float32 = math.MaxFloat32
  443. var leastChannel *Channel
  444. for _, channel := range c.channels {
  445. // If nothing to send, skip this channel
  446. if !channel.isSendPending() {
  447. continue
  448. }
  449. // Get ratio, and keep track of lowest ratio.
  450. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  451. if ratio < leastRatio {
  452. leastRatio = ratio
  453. leastChannel = channel
  454. }
  455. }
  456. // Nothing to send?
  457. if leastChannel == nil {
  458. return true
  459. }
  460. // c.Logger.Info("Found a msgPacket to send")
  461. // Make & send a PacketMsg from this channel
  462. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  463. if err != nil {
  464. c.Logger.Error("Failed to write PacketMsg", "err", err)
  465. c.stopForError(err)
  466. return true
  467. }
  468. c.sendMonitor.Update(_n)
  469. c.flushTimer.Set()
  470. return false
  471. }
  472. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  473. // After a whole message has been assembled, it's pushed to onReceive().
  474. // Blocks depending on how the connection is throttled.
  475. // Otherwise, it never blocks.
  476. func (c *MConnection) recvRoutine() {
  477. defer c._recover()
  478. protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)
  479. FOR_LOOP:
  480. for {
  481. // Block until .recvMonitor says we can read.
  482. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  483. // Peek into bufConnReader for debugging
  484. /*
  485. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  486. bz, err := c.bufConnReader.Peek(tmmath.MinInt(numBytes, 100))
  487. if err == nil {
  488. // return
  489. } else {
  490. c.Logger.Debug("Error peeking connection buffer", "err", err)
  491. // return nil
  492. }
  493. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  494. }
  495. */
  496. // Read packet type
  497. var packet tmp2p.Packet
  498. err := protoReader.ReadMsg(&packet)
  499. if err != nil {
  500. // stopServices was invoked and we are shutting down
  501. // receiving is excpected to fail since we will close the connection
  502. select {
  503. case <-c.quitRecvRoutine:
  504. break FOR_LOOP
  505. default:
  506. }
  507. if c.IsRunning() {
  508. if err == io.EOF {
  509. c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
  510. } else {
  511. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  512. }
  513. c.stopForError(err)
  514. }
  515. break FOR_LOOP
  516. }
  517. // Read more depending on packet type.
  518. switch pkt := packet.Sum.(type) {
  519. case *tmp2p.Packet_PacketPing:
  520. // TODO: prevent abuse, as they cause flush()'s.
  521. // https://github.com/tendermint/tendermint/issues/1190
  522. c.Logger.Debug("Receive Ping")
  523. select {
  524. case c.pong <- struct{}{}:
  525. default:
  526. // never block
  527. }
  528. case *tmp2p.Packet_PacketPong:
  529. c.Logger.Debug("Receive Pong")
  530. select {
  531. case c.pongTimeoutCh <- false:
  532. default:
  533. // never block
  534. }
  535. case *tmp2p.Packet_PacketMsg:
  536. channel, ok := c.channelsIdx[byte(pkt.PacketMsg.ChannelID)]
  537. if !ok || channel == nil {
  538. err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
  539. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  540. c.stopForError(err)
  541. break FOR_LOOP
  542. }
  543. msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
  544. if err != nil {
  545. if c.IsRunning() {
  546. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  547. c.stopForError(err)
  548. }
  549. break FOR_LOOP
  550. }
  551. if msgBytes != nil {
  552. c.Logger.Debug("Received bytes", "chID", pkt.PacketMsg.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  553. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  554. c.onReceive(byte(pkt.PacketMsg.ChannelID), msgBytes)
  555. }
  556. default:
  557. err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
  558. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  559. c.stopForError(err)
  560. break FOR_LOOP
  561. }
  562. }
  563. // Cleanup
  564. close(c.pong)
  565. for range c.pong {
  566. // Drain
  567. }
  568. }
  569. // not goroutine-safe
  570. func (c *MConnection) stopPongTimer() {
  571. if c.pongTimer != nil {
  572. _ = c.pongTimer.Stop()
  573. c.pongTimer = nil
  574. }
  575. }
  576. // maxPacketMsgSize returns a maximum size of PacketMsg
  577. func (c *MConnection) maxPacketMsgSize() int {
  578. bz, err := proto.Marshal(mustWrapPacket(&tmp2p.PacketMsg{
  579. ChannelID: 0x01,
  580. EOF: true,
  581. Data: make([]byte, c.config.MaxPacketMsgPayloadSize),
  582. }))
  583. if err != nil {
  584. panic(err)
  585. }
  586. return len(bz)
  587. }
  588. type ConnectionStatus struct {
  589. Duration time.Duration
  590. SendMonitor flow.Status
  591. RecvMonitor flow.Status
  592. Channels []ChannelStatus
  593. }
  594. type ChannelStatus struct {
  595. ID byte
  596. SendQueueCapacity int
  597. SendQueueSize int
  598. Priority int
  599. RecentlySent int64
  600. }
  601. func (c *MConnection) Status() ConnectionStatus {
  602. var status ConnectionStatus
  603. status.Duration = time.Since(c.created)
  604. status.SendMonitor = c.sendMonitor.Status()
  605. status.RecvMonitor = c.recvMonitor.Status()
  606. status.Channels = make([]ChannelStatus, len(c.channels))
  607. for i, channel := range c.channels {
  608. status.Channels[i] = ChannelStatus{
  609. ID: channel.desc.ID,
  610. SendQueueCapacity: cap(channel.sendQueue),
  611. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  612. Priority: channel.desc.Priority,
  613. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  614. }
  615. }
  616. return status
  617. }
  618. //-----------------------------------------------------------------------------
  619. type ChannelDescriptor struct {
  620. ID byte
  621. Priority int
  622. SendQueueCapacity int
  623. RecvBufferCapacity int
  624. RecvMessageCapacity int
  625. }
  626. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  627. if chDesc.SendQueueCapacity == 0 {
  628. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  629. }
  630. if chDesc.RecvBufferCapacity == 0 {
  631. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  632. }
  633. if chDesc.RecvMessageCapacity == 0 {
  634. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  635. }
  636. filled = chDesc
  637. return
  638. }
  639. // TODO: lowercase.
  640. // NOTE: not goroutine-safe.
  641. type Channel struct {
  642. conn *MConnection
  643. desc ChannelDescriptor
  644. sendQueue chan []byte
  645. sendQueueSize int32 // atomic.
  646. recving []byte
  647. sending []byte
  648. recentlySent int64 // exponential moving average
  649. maxPacketMsgPayloadSize int
  650. Logger log.Logger
  651. }
  652. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  653. desc = desc.FillDefaults()
  654. if desc.Priority <= 0 {
  655. panic("Channel default priority must be a positive integer")
  656. }
  657. return &Channel{
  658. conn: conn,
  659. desc: desc,
  660. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  661. recving: make([]byte, 0, desc.RecvBufferCapacity),
  662. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  663. }
  664. }
  665. func (ch *Channel) SetLogger(l log.Logger) {
  666. ch.Logger = l
  667. }
  668. // Queues message to send to this channel.
  669. // Goroutine-safe
  670. // Times out (and returns false) after defaultSendTimeout
  671. func (ch *Channel) sendBytes(bytes []byte) bool {
  672. select {
  673. case ch.sendQueue <- bytes:
  674. atomic.AddInt32(&ch.sendQueueSize, 1)
  675. return true
  676. case <-time.After(defaultSendTimeout):
  677. return false
  678. }
  679. }
  680. // Queues message to send to this channel.
  681. // Nonblocking, returns true if successful.
  682. // Goroutine-safe
  683. func (ch *Channel) trySendBytes(bytes []byte) bool {
  684. select {
  685. case ch.sendQueue <- bytes:
  686. atomic.AddInt32(&ch.sendQueueSize, 1)
  687. return true
  688. default:
  689. return false
  690. }
  691. }
  692. // Goroutine-safe
  693. func (ch *Channel) loadSendQueueSize() (size int) {
  694. return int(atomic.LoadInt32(&ch.sendQueueSize))
  695. }
  696. // Goroutine-safe
  697. // Use only as a heuristic.
  698. func (ch *Channel) canSend() bool {
  699. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  700. }
  701. // Returns true if any PacketMsgs are pending to be sent.
  702. // Call before calling nextPacketMsg()
  703. // Goroutine-safe
  704. func (ch *Channel) isSendPending() bool {
  705. if len(ch.sending) == 0 {
  706. if len(ch.sendQueue) == 0 {
  707. return false
  708. }
  709. ch.sending = <-ch.sendQueue
  710. }
  711. return true
  712. }
  713. // Creates a new PacketMsg to send.
  714. // Not goroutine-safe
  715. func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg {
  716. packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)}
  717. maxSize := ch.maxPacketMsgPayloadSize
  718. packet.Data = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))]
  719. if len(ch.sending) <= maxSize {
  720. packet.EOF = true
  721. ch.sending = nil
  722. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  723. } else {
  724. packet.EOF = false
  725. ch.sending = ch.sending[tmmath.MinInt(maxSize, len(ch.sending)):]
  726. }
  727. return packet
  728. }
  729. // Writes next PacketMsg to w and updates c.recentlySent.
  730. // Not goroutine-safe
  731. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) {
  732. packet := ch.nextPacketMsg()
  733. n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet))
  734. atomic.AddInt64(&ch.recentlySent, int64(n))
  735. return
  736. }
  737. // Handles incoming PacketMsgs. It returns a message bytes if message is
  738. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  739. // Not goroutine-safe
  740. func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
  741. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  742. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data)
  743. if recvCap < recvReceived {
  744. return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  745. }
  746. ch.recving = append(ch.recving, packet.Data...)
  747. if packet.EOF {
  748. msgBytes := ch.recving
  749. // clear the slice without re-allocating.
  750. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  751. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  752. // at which point the recving slice stops being used and should be garbage collected
  753. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  754. return msgBytes, nil
  755. }
  756. return nil, nil
  757. }
  758. // Call this periodically to update stats for throttling purposes.
  759. // Not goroutine-safe
  760. func (ch *Channel) updateStats() {
  761. // Exponential decay of stats.
  762. // TODO: optimize.
  763. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  764. }
  765. //----------------------------------------
  766. // Packet
  767. // mustWrapPacket takes a packet kind (oneof) and wraps it in a tmp2p.Packet message.
  768. func mustWrapPacket(pb proto.Message) *tmp2p.Packet {
  769. var msg tmp2p.Packet
  770. switch pb := pb.(type) {
  771. case *tmp2p.Packet: // already a packet
  772. msg = *pb
  773. case *tmp2p.PacketPing:
  774. msg = tmp2p.Packet{
  775. Sum: &tmp2p.Packet_PacketPing{
  776. PacketPing: pb,
  777. },
  778. }
  779. case *tmp2p.PacketPong:
  780. msg = tmp2p.Packet{
  781. Sum: &tmp2p.Packet_PacketPong{
  782. PacketPong: pb,
  783. },
  784. }
  785. case *tmp2p.PacketMsg:
  786. msg = tmp2p.Packet{
  787. Sum: &tmp2p.Packet_PacketMsg{
  788. PacketMsg: pb,
  789. },
  790. }
  791. default:
  792. panic(fmt.Errorf("unknown packet type %T", pb))
  793. }
  794. return &msg
  795. }