You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

889 lines
24 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. amino "github.com/tendermint/go-amino"
  14. cmn "github.com/tendermint/tendermint/libs/common"
  15. flow "github.com/tendermint/tendermint/libs/flowrate"
  16. "github.com/tendermint/tendermint/libs/log"
  17. )
  18. const (
  19. defaultMaxPacketMsgPayloadSize = 1024
  20. numBatchPacketMsgs = 10
  21. minReadBufferSize = 1024
  22. minWriteBufferSize = 65536
  23. updateStats = 2 * time.Second
  24. // some of these defaults are written in the user config
  25. // flushThrottle, sendRate, recvRate
  26. // TODO: remove values present in config
  27. defaultFlushThrottle = 100 * time.Millisecond
  28. defaultSendQueueCapacity = 1
  29. defaultRecvBufferCapacity = 4096
  30. defaultRecvMessageCapacity = 22020096 // 21MB
  31. defaultSendRate = int64(512000) // 500KB/s
  32. defaultRecvRate = int64(512000) // 500KB/s
  33. defaultSendTimeout = 10 * time.Second
  34. defaultPingInterval = 60 * time.Second
  35. defaultPongTimeout = 45 * time.Second
  36. )
  37. type receiveCbFunc func(chID byte, msgBytes []byte)
  38. type errorCbFunc func(interface{})
  39. /*
  40. Each peer has one `MConnection` (multiplex connection) instance.
  41. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  42. several messages along a single channel of communication.
  43. Each `MConnection` handles message transmission on multiple abstract communication
  44. `Channel`s. Each channel has a globally unique byte id.
  45. The byte id and the relative priorities of each `Channel` are configured upon
  46. initialization of the connection.
  47. There are two methods for sending messages:
  48. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  49. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  50. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  51. successfully queued for the channel with the given id byte `chID`, or until the
  52. request times out. The message `msg` is serialized using Go-Amino.
  53. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  54. channel's queue is full.
  55. Inbound message bytes are handled with an onReceive callback function.
  56. */
  57. type MConnection struct {
  58. cmn.BaseService
  59. conn net.Conn
  60. bufConnReader *bufio.Reader
  61. bufConnWriter *bufio.Writer
  62. sendMonitor *flow.Monitor
  63. recvMonitor *flow.Monitor
  64. send chan struct{}
  65. pong chan struct{}
  66. channels []*Channel
  67. channelsIdx map[byte]*Channel
  68. onReceive receiveCbFunc
  69. onError errorCbFunc
  70. errored uint32
  71. config MConnConfig
  72. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  73. // doneSendRoutine is closed when the sendRoutine actually quits.
  74. quitSendRoutine chan struct{}
  75. doneSendRoutine chan struct{}
  76. // Closing quitRecvRouting will cause the recvRouting to eventually quit.
  77. quitRecvRoutine chan struct{}
  78. // used to ensure FlushStop and OnStop
  79. // are safe to call concurrently.
  80. stopMtx sync.Mutex
  81. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  82. pingTimer *time.Ticker // send pings periodically
  83. // close conn if pong is not received in pongTimeout
  84. pongTimer *time.Timer
  85. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  86. chStatsTimer *time.Ticker // update channel stats periodically
  87. created time.Time // time of creation
  88. _maxPacketMsgSize int
  89. }
  90. // MConnConfig is a MConnection configuration.
  91. type MConnConfig struct {
  92. SendRate int64 `mapstructure:"send_rate"`
  93. RecvRate int64 `mapstructure:"recv_rate"`
  94. // Maximum payload size
  95. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  96. // Interval to flush writes (throttled)
  97. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  98. // Interval to send pings
  99. PingInterval time.Duration `mapstructure:"ping_interval"`
  100. // Maximum wait time for pongs
  101. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  102. }
  103. // DefaultMConnConfig returns the default config.
  104. func DefaultMConnConfig() MConnConfig {
  105. return MConnConfig{
  106. SendRate: defaultSendRate,
  107. RecvRate: defaultRecvRate,
  108. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  109. FlushThrottle: defaultFlushThrottle,
  110. PingInterval: defaultPingInterval,
  111. PongTimeout: defaultPongTimeout,
  112. }
  113. }
  114. // NewMConnection wraps net.Conn and creates multiplex connection
  115. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  116. return NewMConnectionWithConfig(
  117. conn,
  118. chDescs,
  119. onReceive,
  120. onError,
  121. DefaultMConnConfig())
  122. }
  123. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  124. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig) *MConnection {
  125. if config.PongTimeout >= config.PingInterval {
  126. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  127. }
  128. mconn := &MConnection{
  129. conn: conn,
  130. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  131. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  132. sendMonitor: flow.New(0, 0),
  133. recvMonitor: flow.New(0, 0),
  134. send: make(chan struct{}, 1),
  135. pong: make(chan struct{}, 1),
  136. onReceive: onReceive,
  137. onError: onError,
  138. config: config,
  139. created: time.Now(),
  140. }
  141. // Create channels
  142. var channelsIdx = map[byte]*Channel{}
  143. var channels = []*Channel{}
  144. for _, desc := range chDescs {
  145. channel := newChannel(mconn, *desc)
  146. channelsIdx[channel.desc.ID] = channel
  147. channels = append(channels, channel)
  148. }
  149. mconn.channels = channels
  150. mconn.channelsIdx = channelsIdx
  151. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  152. // maxPacketMsgSize() is a bit heavy, so call just once
  153. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  154. return mconn
  155. }
  156. func (c *MConnection) SetLogger(l log.Logger) {
  157. c.BaseService.SetLogger(l)
  158. for _, ch := range c.channels {
  159. ch.SetLogger(l)
  160. }
  161. }
  162. // OnStart implements BaseService
  163. func (c *MConnection) OnStart() error {
  164. if err := c.BaseService.OnStart(); err != nil {
  165. return err
  166. }
  167. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
  168. c.pingTimer = time.NewTicker(c.config.PingInterval)
  169. c.pongTimeoutCh = make(chan bool, 1)
  170. c.chStatsTimer = time.NewTicker(updateStats)
  171. c.quitSendRoutine = make(chan struct{})
  172. c.doneSendRoutine = make(chan struct{})
  173. c.quitRecvRoutine = make(chan struct{})
  174. go c.sendRoutine()
  175. go c.recvRoutine()
  176. return nil
  177. }
  178. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  179. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  180. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  181. func (c *MConnection) stopServices() (alreadyStopped bool) {
  182. c.stopMtx.Lock()
  183. defer c.stopMtx.Unlock()
  184. select {
  185. case <-c.quitSendRoutine:
  186. // already quit
  187. return true
  188. default:
  189. }
  190. select {
  191. case <-c.quitRecvRoutine:
  192. // already quit
  193. return true
  194. default:
  195. }
  196. c.BaseService.OnStop()
  197. c.flushTimer.Stop()
  198. c.pingTimer.Stop()
  199. c.chStatsTimer.Stop()
  200. // inform the recvRouting that we are shutting down
  201. close(c.quitRecvRoutine)
  202. close(c.quitSendRoutine)
  203. return false
  204. }
  205. // FlushStop replicates the logic of OnStop.
  206. // It additionally ensures that all successful
  207. // .Send() calls will get flushed before closing
  208. // the connection.
  209. func (c *MConnection) FlushStop() {
  210. if c.stopServices() {
  211. return
  212. }
  213. // this block is unique to FlushStop
  214. {
  215. // wait until the sendRoutine exits
  216. // so we dont race on calling sendSomePacketMsgs
  217. <-c.doneSendRoutine
  218. // Send and flush all pending msgs.
  219. // Since sendRoutine has exited, we can call this
  220. // safely
  221. eof := c.sendSomePacketMsgs()
  222. for !eof {
  223. eof = c.sendSomePacketMsgs()
  224. }
  225. c.flush()
  226. // Now we can close the connection
  227. }
  228. c.conn.Close() // nolint: errcheck
  229. // We can't close pong safely here because
  230. // recvRoutine may write to it after we've stopped.
  231. // Though it doesn't need to get closed at all,
  232. // we close it @ recvRoutine.
  233. // c.Stop()
  234. }
  235. // OnStop implements BaseService
  236. func (c *MConnection) OnStop() {
  237. if c.stopServices() {
  238. return
  239. }
  240. c.conn.Close() // nolint: errcheck
  241. // We can't close pong safely here because
  242. // recvRoutine may write to it after we've stopped.
  243. // Though it doesn't need to get closed at all,
  244. // we close it @ recvRoutine.
  245. }
  246. func (c *MConnection) String() string {
  247. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  248. }
  249. func (c *MConnection) flush() {
  250. c.Logger.Debug("Flush", "conn", c)
  251. err := c.bufConnWriter.Flush()
  252. if err != nil {
  253. c.Logger.Error("MConnection flush failed", "err", err)
  254. }
  255. }
  256. // Catch panics, usually caused by remote disconnects.
  257. func (c *MConnection) _recover() {
  258. if r := recover(); r != nil {
  259. err := cmn.ErrorWrap(r, "recovered panic in MConnection")
  260. c.stopForError(err)
  261. }
  262. }
  263. func (c *MConnection) stopForError(r interface{}) {
  264. c.Stop()
  265. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  266. if c.onError != nil {
  267. c.onError(r)
  268. }
  269. }
  270. }
  271. // Queues a message to be sent to channel.
  272. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  273. if !c.IsRunning() {
  274. return false
  275. }
  276. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  277. // Send message to channel.
  278. channel, ok := c.channelsIdx[chID]
  279. if !ok {
  280. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  281. return false
  282. }
  283. success := channel.sendBytes(msgBytes)
  284. if success {
  285. // Wake up sendRoutine if necessary
  286. select {
  287. case c.send <- struct{}{}:
  288. default:
  289. }
  290. } else {
  291. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  292. }
  293. return success
  294. }
  295. // Queues a message to be sent to channel.
  296. // Nonblocking, returns true if successful.
  297. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  298. if !c.IsRunning() {
  299. return false
  300. }
  301. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  302. // Send message to channel.
  303. channel, ok := c.channelsIdx[chID]
  304. if !ok {
  305. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  306. return false
  307. }
  308. ok = channel.trySendBytes(msgBytes)
  309. if ok {
  310. // Wake up sendRoutine if necessary
  311. select {
  312. case c.send <- struct{}{}:
  313. default:
  314. }
  315. }
  316. return ok
  317. }
  318. // CanSend returns true if you can send more data onto the chID, false
  319. // otherwise. Use only as a heuristic.
  320. func (c *MConnection) CanSend(chID byte) bool {
  321. if !c.IsRunning() {
  322. return false
  323. }
  324. channel, ok := c.channelsIdx[chID]
  325. if !ok {
  326. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  327. return false
  328. }
  329. return channel.canSend()
  330. }
  331. // sendRoutine polls for packets to send from channels.
  332. func (c *MConnection) sendRoutine() {
  333. defer c._recover()
  334. FOR_LOOP:
  335. for {
  336. var _n int64
  337. var err error
  338. SELECTION:
  339. select {
  340. case <-c.flushTimer.Ch:
  341. // NOTE: flushTimer.Set() must be called every time
  342. // something is written to .bufConnWriter.
  343. c.flush()
  344. case <-c.chStatsTimer.C:
  345. for _, channel := range c.channels {
  346. channel.updateStats()
  347. }
  348. case <-c.pingTimer.C:
  349. c.Logger.Debug("Send Ping")
  350. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
  351. if err != nil {
  352. break SELECTION
  353. }
  354. c.sendMonitor.Update(int(_n))
  355. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  356. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  357. select {
  358. case c.pongTimeoutCh <- true:
  359. default:
  360. }
  361. })
  362. c.flush()
  363. case timeout := <-c.pongTimeoutCh:
  364. if timeout {
  365. c.Logger.Debug("Pong timeout")
  366. err = errors.New("pong timeout")
  367. } else {
  368. c.stopPongTimer()
  369. }
  370. case <-c.pong:
  371. c.Logger.Debug("Send Pong")
  372. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{})
  373. if err != nil {
  374. break SELECTION
  375. }
  376. c.sendMonitor.Update(int(_n))
  377. c.flush()
  378. case <-c.quitSendRoutine:
  379. break FOR_LOOP
  380. case <-c.send:
  381. // Send some PacketMsgs
  382. eof := c.sendSomePacketMsgs()
  383. if !eof {
  384. // Keep sendRoutine awake.
  385. select {
  386. case c.send <- struct{}{}:
  387. default:
  388. }
  389. }
  390. }
  391. if !c.IsRunning() {
  392. break FOR_LOOP
  393. }
  394. if err != nil {
  395. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  396. c.stopForError(err)
  397. break FOR_LOOP
  398. }
  399. }
  400. // Cleanup
  401. c.stopPongTimer()
  402. close(c.doneSendRoutine)
  403. }
  404. // Returns true if messages from channels were exhausted.
  405. // Blocks in accordance to .sendMonitor throttling.
  406. func (c *MConnection) sendSomePacketMsgs() bool {
  407. // Block until .sendMonitor says we can write.
  408. // Once we're ready we send more than we asked for,
  409. // but amortized it should even out.
  410. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  411. // Now send some PacketMsgs.
  412. for i := 0; i < numBatchPacketMsgs; i++ {
  413. if c.sendPacketMsg() {
  414. return true
  415. }
  416. }
  417. return false
  418. }
  419. // Returns true if messages from channels were exhausted.
  420. func (c *MConnection) sendPacketMsg() bool {
  421. // Choose a channel to create a PacketMsg from.
  422. // The chosen channel will be the one whose recentlySent/priority is the least.
  423. var leastRatio float32 = math.MaxFloat32
  424. var leastChannel *Channel
  425. for _, channel := range c.channels {
  426. // If nothing to send, skip this channel
  427. if !channel.isSendPending() {
  428. continue
  429. }
  430. // Get ratio, and keep track of lowest ratio.
  431. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  432. if ratio < leastRatio {
  433. leastRatio = ratio
  434. leastChannel = channel
  435. }
  436. }
  437. // Nothing to send?
  438. if leastChannel == nil {
  439. return true
  440. }
  441. // c.Logger.Info("Found a msgPacket to send")
  442. // Make & send a PacketMsg from this channel
  443. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  444. if err != nil {
  445. c.Logger.Error("Failed to write PacketMsg", "err", err)
  446. c.stopForError(err)
  447. return true
  448. }
  449. c.sendMonitor.Update(int(_n))
  450. c.flushTimer.Set()
  451. return false
  452. }
  453. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  454. // After a whole message has been assembled, it's pushed to onReceive().
  455. // Blocks depending on how the connection is throttled.
  456. // Otherwise, it never blocks.
  457. func (c *MConnection) recvRoutine() {
  458. defer c._recover()
  459. FOR_LOOP:
  460. for {
  461. // Block until .recvMonitor says we can read.
  462. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  463. // Peek into bufConnReader for debugging
  464. /*
  465. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  466. bz, err := c.bufConnReader.Peek(cmn.MinInt(numBytes, 100))
  467. if err == nil {
  468. // return
  469. } else {
  470. c.Logger.Debug("Error peeking connection buffer", "err", err)
  471. // return nil
  472. }
  473. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  474. }
  475. */
  476. // Read packet type
  477. var packet Packet
  478. var _n int64
  479. var err error
  480. _n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize))
  481. c.recvMonitor.Update(int(_n))
  482. if err != nil {
  483. // stopServices was invoked and we are shutting down
  484. // receiving is excpected to fail since we will close the connection
  485. select {
  486. case <-c.quitRecvRoutine:
  487. break FOR_LOOP
  488. default:
  489. }
  490. if c.IsRunning() {
  491. if err == io.EOF {
  492. c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
  493. } else {
  494. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  495. }
  496. c.stopForError(err)
  497. }
  498. break FOR_LOOP
  499. }
  500. // Read more depending on packet type.
  501. switch pkt := packet.(type) {
  502. case PacketPing:
  503. // TODO: prevent abuse, as they cause flush()'s.
  504. // https://github.com/tendermint/tendermint/issues/1190
  505. c.Logger.Debug("Receive Ping")
  506. select {
  507. case c.pong <- struct{}{}:
  508. default:
  509. // never block
  510. }
  511. case PacketPong:
  512. c.Logger.Debug("Receive Pong")
  513. select {
  514. case c.pongTimeoutCh <- false:
  515. default:
  516. // never block
  517. }
  518. case PacketMsg:
  519. channel, ok := c.channelsIdx[pkt.ChannelID]
  520. if !ok || channel == nil {
  521. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  522. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  523. c.stopForError(err)
  524. break FOR_LOOP
  525. }
  526. msgBytes, err := channel.recvPacketMsg(pkt)
  527. if err != nil {
  528. if c.IsRunning() {
  529. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  530. c.stopForError(err)
  531. }
  532. break FOR_LOOP
  533. }
  534. if msgBytes != nil {
  535. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  536. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  537. c.onReceive(pkt.ChannelID, msgBytes)
  538. }
  539. default:
  540. err := fmt.Errorf("Unknown message type %v", reflect.TypeOf(packet))
  541. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  542. c.stopForError(err)
  543. break FOR_LOOP
  544. }
  545. }
  546. // Cleanup
  547. close(c.pong)
  548. for range c.pong {
  549. // Drain
  550. }
  551. }
  552. // not goroutine-safe
  553. func (c *MConnection) stopPongTimer() {
  554. if c.pongTimer != nil {
  555. _ = c.pongTimer.Stop()
  556. c.pongTimer = nil
  557. }
  558. }
  559. // maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead
  560. // of amino encoding.
  561. func (c *MConnection) maxPacketMsgSize() int {
  562. return len(cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{
  563. ChannelID: 0x01,
  564. EOF: 1,
  565. Bytes: make([]byte, c.config.MaxPacketMsgPayloadSize),
  566. })) + 10 // leave room for changes in amino
  567. }
  568. type ConnectionStatus struct {
  569. Duration time.Duration
  570. SendMonitor flow.Status
  571. RecvMonitor flow.Status
  572. Channels []ChannelStatus
  573. }
  574. type ChannelStatus struct {
  575. ID byte
  576. SendQueueCapacity int
  577. SendQueueSize int
  578. Priority int
  579. RecentlySent int64
  580. }
  581. func (c *MConnection) Status() ConnectionStatus {
  582. var status ConnectionStatus
  583. status.Duration = time.Since(c.created)
  584. status.SendMonitor = c.sendMonitor.Status()
  585. status.RecvMonitor = c.recvMonitor.Status()
  586. status.Channels = make([]ChannelStatus, len(c.channels))
  587. for i, channel := range c.channels {
  588. status.Channels[i] = ChannelStatus{
  589. ID: channel.desc.ID,
  590. SendQueueCapacity: cap(channel.sendQueue),
  591. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  592. Priority: channel.desc.Priority,
  593. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  594. }
  595. }
  596. return status
  597. }
  598. //-----------------------------------------------------------------------------
  599. type ChannelDescriptor struct {
  600. ID byte
  601. Priority int
  602. SendQueueCapacity int
  603. RecvBufferCapacity int
  604. RecvMessageCapacity int
  605. }
  606. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  607. if chDesc.SendQueueCapacity == 0 {
  608. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  609. }
  610. if chDesc.RecvBufferCapacity == 0 {
  611. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  612. }
  613. if chDesc.RecvMessageCapacity == 0 {
  614. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  615. }
  616. filled = chDesc
  617. return
  618. }
  619. // TODO: lowercase.
  620. // NOTE: not goroutine-safe.
  621. type Channel struct {
  622. conn *MConnection
  623. desc ChannelDescriptor
  624. sendQueue chan []byte
  625. sendQueueSize int32 // atomic.
  626. recving []byte
  627. sending []byte
  628. recentlySent int64 // exponential moving average
  629. maxPacketMsgPayloadSize int
  630. Logger log.Logger
  631. }
  632. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  633. desc = desc.FillDefaults()
  634. if desc.Priority <= 0 {
  635. panic("Channel default priority must be a positive integer")
  636. }
  637. return &Channel{
  638. conn: conn,
  639. desc: desc,
  640. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  641. recving: make([]byte, 0, desc.RecvBufferCapacity),
  642. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  643. }
  644. }
  645. func (ch *Channel) SetLogger(l log.Logger) {
  646. ch.Logger = l
  647. }
  648. // Queues message to send to this channel.
  649. // Goroutine-safe
  650. // Times out (and returns false) after defaultSendTimeout
  651. func (ch *Channel) sendBytes(bytes []byte) bool {
  652. select {
  653. case ch.sendQueue <- bytes:
  654. atomic.AddInt32(&ch.sendQueueSize, 1)
  655. return true
  656. case <-time.After(defaultSendTimeout):
  657. return false
  658. }
  659. }
  660. // Queues message to send to this channel.
  661. // Nonblocking, returns true if successful.
  662. // Goroutine-safe
  663. func (ch *Channel) trySendBytes(bytes []byte) bool {
  664. select {
  665. case ch.sendQueue <- bytes:
  666. atomic.AddInt32(&ch.sendQueueSize, 1)
  667. return true
  668. default:
  669. return false
  670. }
  671. }
  672. // Goroutine-safe
  673. func (ch *Channel) loadSendQueueSize() (size int) {
  674. return int(atomic.LoadInt32(&ch.sendQueueSize))
  675. }
  676. // Goroutine-safe
  677. // Use only as a heuristic.
  678. func (ch *Channel) canSend() bool {
  679. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  680. }
  681. // Returns true if any PacketMsgs are pending to be sent.
  682. // Call before calling nextPacketMsg()
  683. // Goroutine-safe
  684. func (ch *Channel) isSendPending() bool {
  685. if len(ch.sending) == 0 {
  686. if len(ch.sendQueue) == 0 {
  687. return false
  688. }
  689. ch.sending = <-ch.sendQueue
  690. }
  691. return true
  692. }
  693. // Creates a new PacketMsg to send.
  694. // Not goroutine-safe
  695. func (ch *Channel) nextPacketMsg() PacketMsg {
  696. packet := PacketMsg{}
  697. packet.ChannelID = byte(ch.desc.ID)
  698. maxSize := ch.maxPacketMsgPayloadSize
  699. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  700. if len(ch.sending) <= maxSize {
  701. packet.EOF = byte(0x01)
  702. ch.sending = nil
  703. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  704. } else {
  705. packet.EOF = byte(0x00)
  706. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  707. }
  708. return packet
  709. }
  710. // Writes next PacketMsg to w and updates c.recentlySent.
  711. // Not goroutine-safe
  712. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
  713. var packet = ch.nextPacketMsg()
  714. n, err = cdc.MarshalBinaryLengthPrefixedWriter(w, packet)
  715. atomic.AddInt64(&ch.recentlySent, n)
  716. return
  717. }
  718. // Handles incoming PacketMsgs. It returns a message bytes if message is
  719. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  720. // Not goroutine-safe
  721. func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
  722. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  723. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
  724. if recvCap < recvReceived {
  725. return nil, fmt.Errorf("Received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  726. }
  727. ch.recving = append(ch.recving, packet.Bytes...)
  728. if packet.EOF == byte(0x01) {
  729. msgBytes := ch.recving
  730. // clear the slice without re-allocating.
  731. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  732. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  733. // at which point the recving slice stops being used and should be garbage collected
  734. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  735. return msgBytes, nil
  736. }
  737. return nil, nil
  738. }
  739. // Call this periodically to update stats for throttling purposes.
  740. // Not goroutine-safe
  741. func (ch *Channel) updateStats() {
  742. // Exponential decay of stats.
  743. // TODO: optimize.
  744. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  745. }
  746. //----------------------------------------
  747. // Packet
  748. type Packet interface {
  749. AssertIsPacket()
  750. }
  751. func RegisterPacket(cdc *amino.Codec) {
  752. cdc.RegisterInterface((*Packet)(nil), nil)
  753. cdc.RegisterConcrete(PacketPing{}, "tendermint/p2p/PacketPing", nil)
  754. cdc.RegisterConcrete(PacketPong{}, "tendermint/p2p/PacketPong", nil)
  755. cdc.RegisterConcrete(PacketMsg{}, "tendermint/p2p/PacketMsg", nil)
  756. }
  757. func (_ PacketPing) AssertIsPacket() {}
  758. func (_ PacketPong) AssertIsPacket() {}
  759. func (_ PacketMsg) AssertIsPacket() {}
  760. type PacketPing struct {
  761. }
  762. type PacketPong struct {
  763. }
  764. type PacketMsg struct {
  765. ChannelID byte
  766. EOF byte // 1 means message ends here.
  767. Bytes []byte
  768. }
  769. func (mp PacketMsg) String() string {
  770. return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF)
  771. }