You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

804 lines
22 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "sync/atomic"
  11. "time"
  12. amino "github.com/tendermint/go-amino"
  13. cmn "github.com/tendermint/tmlibs/common"
  14. flow "github.com/tendermint/tmlibs/flowrate"
  15. "github.com/tendermint/tmlibs/log"
  16. )
  17. const (
  18. defaultMaxPacketMsgSize = 1024
  19. numBatchPacketMsgs = 10
  20. minReadBufferSize = 1024
  21. minWriteBufferSize = 65536
  22. updateStats = 2 * time.Second
  23. // some of these defaults are written in the user config
  24. // flushThrottle, sendRate, recvRate
  25. // TODO: remove values present in config
  26. defaultFlushThrottle = 100 * time.Millisecond
  27. defaultSendQueueCapacity = 1
  28. defaultRecvBufferCapacity = 4096
  29. defaultRecvMessageCapacity = 22020096 // 21MB
  30. defaultSendRate = int64(512000) // 500KB/s
  31. defaultRecvRate = int64(512000) // 500KB/s
  32. defaultSendTimeout = 10 * time.Second
  33. defaultPingInterval = 60 * time.Second
  34. defaultPongTimeout = 45 * time.Second
  35. )
  36. type receiveCbFunc func(chID byte, msgBytes []byte)
  37. type errorCbFunc func(interface{})
  38. /*
  39. Each peer has one `MConnection` (multiplex connection) instance.
  40. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  41. several messages along a single channel of communication.
  42. Each `MConnection` handles message transmission on multiple abstract communication
  43. `Channel`s. Each channel has a globally unique byte id.
  44. The byte id and the relative priorities of each `Channel` are configured upon
  45. initialization of the connection.
  46. There are two methods for sending messages:
  47. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  48. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  49. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  50. successfully queued for the channel with the given id byte `chID`, or until the
  51. request times out. The message `msg` is serialized using Go-Amino.
  52. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  53. channel's queue is full.
  54. Inbound message bytes are handled with an onReceive callback function.
  55. */
  56. type MConnection struct {
  57. cmn.BaseService
  58. conn net.Conn
  59. bufConnReader *bufio.Reader
  60. bufConnWriter *bufio.Writer
  61. sendMonitor *flow.Monitor
  62. recvMonitor *flow.Monitor
  63. send chan struct{}
  64. pong chan struct{}
  65. channels []*Channel
  66. channelsIdx map[byte]*Channel
  67. onReceive receiveCbFunc
  68. onError errorCbFunc
  69. errored uint32
  70. config MConnConfig
  71. quit chan struct{}
  72. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  73. pingTimer *cmn.RepeatTimer // send pings periodically
  74. // close conn if pong is not received in pongTimeout
  75. pongTimer *time.Timer
  76. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  77. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  78. created time.Time // time of creation
  79. emptyPacketMsgSize int
  80. }
  81. // MConnConfig is a MConnection configuration.
  82. type MConnConfig struct {
  83. SendRate int64 `mapstructure:"send_rate"`
  84. RecvRate int64 `mapstructure:"recv_rate"`
  85. // Maximum payload size
  86. MaxPacketMsgSize int `mapstructure:"max_packet_msg_size"`
  87. // Interval to flush writes (throttled)
  88. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  89. // Interval to send pings
  90. PingInterval time.Duration `mapstructure:"ping_interval"`
  91. // Maximum wait time for pongs
  92. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  93. }
  94. // DefaultMConnConfig returns the default config.
  95. func DefaultMConnConfig() MConnConfig {
  96. return MConnConfig{
  97. SendRate: defaultSendRate,
  98. RecvRate: defaultRecvRate,
  99. MaxPacketMsgSize: defaultMaxPacketMsgSize,
  100. FlushThrottle: defaultFlushThrottle,
  101. PingInterval: defaultPingInterval,
  102. PongTimeout: defaultPongTimeout,
  103. }
  104. }
  105. // NewMConnection wraps net.Conn and creates multiplex connection
  106. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  107. return NewMConnectionWithConfig(
  108. conn,
  109. chDescs,
  110. onReceive,
  111. onError,
  112. DefaultMConnConfig())
  113. }
  114. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  115. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig) *MConnection {
  116. if config.PongTimeout >= config.PingInterval {
  117. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  118. }
  119. mconn := &MConnection{
  120. conn: conn,
  121. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  122. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  123. sendMonitor: flow.New(0, 0),
  124. recvMonitor: flow.New(0, 0),
  125. send: make(chan struct{}, 1),
  126. pong: make(chan struct{}, 1),
  127. onReceive: onReceive,
  128. onError: onError,
  129. config: config,
  130. emptyPacketMsgSize: emptyPacketMsgSize(),
  131. }
  132. // Create channels
  133. var channelsIdx = map[byte]*Channel{}
  134. var channels = []*Channel{}
  135. for _, desc := range chDescs {
  136. channel := newChannel(mconn, *desc)
  137. channelsIdx[channel.desc.ID] = channel
  138. channels = append(channels, channel)
  139. }
  140. mconn.channels = channels
  141. mconn.channelsIdx = channelsIdx
  142. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  143. return mconn
  144. }
  145. func (c *MConnection) SetLogger(l log.Logger) {
  146. c.BaseService.SetLogger(l)
  147. for _, ch := range c.channels {
  148. ch.SetLogger(l)
  149. }
  150. }
  151. // OnStart implements BaseService
  152. func (c *MConnection) OnStart() error {
  153. if err := c.BaseService.OnStart(); err != nil {
  154. return err
  155. }
  156. c.quit = make(chan struct{})
  157. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
  158. c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval)
  159. c.pongTimeoutCh = make(chan bool, 1)
  160. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
  161. go c.sendRoutine()
  162. go c.recvRoutine()
  163. return nil
  164. }
  165. // OnStop implements BaseService
  166. func (c *MConnection) OnStop() {
  167. c.BaseService.OnStop()
  168. c.flushTimer.Stop()
  169. c.pingTimer.Stop()
  170. c.chStatsTimer.Stop()
  171. if c.quit != nil {
  172. close(c.quit)
  173. }
  174. c.conn.Close() // nolint: errcheck
  175. // We can't close pong safely here because
  176. // recvRoutine may write to it after we've stopped.
  177. // Though it doesn't need to get closed at all,
  178. // we close it @ recvRoutine.
  179. }
  180. func (c *MConnection) String() string {
  181. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  182. }
  183. func (c *MConnection) flush() {
  184. c.Logger.Debug("Flush", "conn", c)
  185. err := c.bufConnWriter.Flush()
  186. if err != nil {
  187. c.Logger.Error("MConnection flush failed", "err", err)
  188. }
  189. }
  190. // Catch panics, usually caused by remote disconnects.
  191. func (c *MConnection) _recover() {
  192. if r := recover(); r != nil {
  193. err := cmn.ErrorWrap(r, "recovered panic in MConnection")
  194. c.stopForError(err)
  195. }
  196. }
  197. func (c *MConnection) stopForError(r interface{}) {
  198. c.Stop()
  199. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  200. if c.onError != nil {
  201. c.onError(r)
  202. }
  203. }
  204. }
  205. // Queues a message to be sent to channel.
  206. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  207. if !c.IsRunning() {
  208. return false
  209. }
  210. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  211. // Send message to channel.
  212. channel, ok := c.channelsIdx[chID]
  213. if !ok {
  214. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  215. return false
  216. }
  217. success := channel.sendBytes(msgBytes)
  218. if success {
  219. // Wake up sendRoutine if necessary
  220. select {
  221. case c.send <- struct{}{}:
  222. default:
  223. }
  224. } else {
  225. c.Logger.Error("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  226. }
  227. return success
  228. }
  229. // Queues a message to be sent to channel.
  230. // Nonblocking, returns true if successful.
  231. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  232. if !c.IsRunning() {
  233. return false
  234. }
  235. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  236. // Send message to channel.
  237. channel, ok := c.channelsIdx[chID]
  238. if !ok {
  239. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  240. return false
  241. }
  242. ok = channel.trySendBytes(msgBytes)
  243. if ok {
  244. // Wake up sendRoutine if necessary
  245. select {
  246. case c.send <- struct{}{}:
  247. default:
  248. }
  249. }
  250. return ok
  251. }
  252. // CanSend returns true if you can send more data onto the chID, false
  253. // otherwise. Use only as a heuristic.
  254. func (c *MConnection) CanSend(chID byte) bool {
  255. if !c.IsRunning() {
  256. return false
  257. }
  258. channel, ok := c.channelsIdx[chID]
  259. if !ok {
  260. c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
  261. return false
  262. }
  263. return channel.canSend()
  264. }
  265. // sendRoutine polls for packets to send from channels.
  266. func (c *MConnection) sendRoutine() {
  267. defer c._recover()
  268. FOR_LOOP:
  269. for {
  270. var _n int64
  271. var err error
  272. SELECTION:
  273. select {
  274. case <-c.flushTimer.Ch:
  275. // NOTE: flushTimer.Set() must be called every time
  276. // something is written to .bufConnWriter.
  277. c.flush()
  278. case <-c.chStatsTimer.Chan():
  279. for _, channel := range c.channels {
  280. channel.updateStats()
  281. }
  282. case <-c.pingTimer.Chan():
  283. c.Logger.Debug("Send Ping")
  284. _n, err = cdc.MarshalBinaryWriter(c.bufConnWriter, PacketPing{})
  285. if err != nil {
  286. break SELECTION
  287. }
  288. c.sendMonitor.Update(int(_n))
  289. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  290. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  291. select {
  292. case c.pongTimeoutCh <- true:
  293. default:
  294. }
  295. })
  296. c.flush()
  297. case timeout := <-c.pongTimeoutCh:
  298. if timeout {
  299. c.Logger.Debug("Pong timeout")
  300. err = errors.New("pong timeout")
  301. } else {
  302. c.stopPongTimer()
  303. }
  304. case <-c.pong:
  305. c.Logger.Debug("Send Pong")
  306. _n, err = cdc.MarshalBinaryWriter(c.bufConnWriter, PacketPong{})
  307. if err != nil {
  308. break SELECTION
  309. }
  310. c.sendMonitor.Update(int(_n))
  311. c.flush()
  312. case <-c.quit:
  313. break FOR_LOOP
  314. case <-c.send:
  315. // Send some PacketMsgs
  316. eof := c.sendSomePacketMsgs()
  317. if !eof {
  318. // Keep sendRoutine awake.
  319. select {
  320. case c.send <- struct{}{}:
  321. default:
  322. }
  323. }
  324. }
  325. if !c.IsRunning() {
  326. break FOR_LOOP
  327. }
  328. if err != nil {
  329. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  330. c.stopForError(err)
  331. break FOR_LOOP
  332. }
  333. }
  334. // Cleanup
  335. c.stopPongTimer()
  336. }
  337. // Returns true if messages from channels were exhausted.
  338. // Blocks in accordance to .sendMonitor throttling.
  339. func (c *MConnection) sendSomePacketMsgs() bool {
  340. // Block until .sendMonitor says we can write.
  341. // Once we're ready we send more than we asked for,
  342. // but amortized it should even out.
  343. c.sendMonitor.Limit(c.config.MaxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  344. // Now send some PacketMsgs.
  345. for i := 0; i < numBatchPacketMsgs; i++ {
  346. if c.sendPacketMsg() {
  347. return true
  348. }
  349. }
  350. return false
  351. }
  352. // Returns true if messages from channels were exhausted.
  353. func (c *MConnection) sendPacketMsg() bool {
  354. // Choose a channel to create a PacketMsg from.
  355. // The chosen channel will be the one whose recentlySent/priority is the least.
  356. var leastRatio float32 = math.MaxFloat32
  357. var leastChannel *Channel
  358. for _, channel := range c.channels {
  359. // If nothing to send, skip this channel
  360. if !channel.isSendPending() {
  361. continue
  362. }
  363. // Get ratio, and keep track of lowest ratio.
  364. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  365. if ratio < leastRatio {
  366. leastRatio = ratio
  367. leastChannel = channel
  368. }
  369. }
  370. // Nothing to send?
  371. if leastChannel == nil {
  372. return true
  373. }
  374. // c.Logger.Info("Found a msgPacket to send")
  375. // Make & send a PacketMsg from this channel
  376. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  377. if err != nil {
  378. c.Logger.Error("Failed to write PacketMsg", "err", err)
  379. c.stopForError(err)
  380. return true
  381. }
  382. c.sendMonitor.Update(int(_n))
  383. c.flushTimer.Set()
  384. return false
  385. }
  386. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  387. // After a whole message has been assembled, it's pushed to onReceive().
  388. // Blocks depending on how the connection is throttled.
  389. // Otherwise, it never blocks.
  390. func (c *MConnection) recvRoutine() {
  391. defer c._recover()
  392. FOR_LOOP:
  393. for {
  394. // Block until .recvMonitor says we can read.
  395. c.recvMonitor.Limit(c.config.MaxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  396. // Peek into bufConnReader for debugging
  397. /*
  398. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  399. bz, err := c.bufConnReader.Peek(cmn.MinInt(numBytes, 100))
  400. if err == nil {
  401. // return
  402. } else {
  403. c.Logger.Debug("Error peeking connection buffer", "err", err)
  404. // return nil
  405. }
  406. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  407. }
  408. */
  409. // Read packet type
  410. var packet Packet
  411. var _n int64
  412. var err error
  413. _n, err = cdc.UnmarshalBinaryReader(c.bufConnReader, &packet, int64(c.config.MaxPacketMsgSize))
  414. c.recvMonitor.Update(int(_n))
  415. if err != nil {
  416. if c.IsRunning() {
  417. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  418. c.stopForError(err)
  419. }
  420. break FOR_LOOP
  421. }
  422. // Read more depending on packet type.
  423. switch pkt := packet.(type) {
  424. case PacketPing:
  425. // TODO: prevent abuse, as they cause flush()'s.
  426. // https://github.com/tendermint/tendermint/issues/1190
  427. c.Logger.Debug("Receive Ping")
  428. select {
  429. case c.pong <- struct{}{}:
  430. default:
  431. // never block
  432. }
  433. case PacketPong:
  434. c.Logger.Debug("Receive Pong")
  435. select {
  436. case c.pongTimeoutCh <- false:
  437. default:
  438. // never block
  439. }
  440. case PacketMsg:
  441. channel, ok := c.channelsIdx[pkt.ChannelID]
  442. if !ok || channel == nil {
  443. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  444. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  445. c.stopForError(err)
  446. break FOR_LOOP
  447. }
  448. msgBytes, err := channel.recvPacketMsg(pkt)
  449. if err != nil {
  450. if c.IsRunning() {
  451. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  452. c.stopForError(err)
  453. }
  454. break FOR_LOOP
  455. }
  456. if msgBytes != nil {
  457. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  458. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  459. c.onReceive(pkt.ChannelID, msgBytes)
  460. }
  461. default:
  462. err := fmt.Errorf("Unknown message type %v", reflect.TypeOf(packet))
  463. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  464. c.stopForError(err)
  465. break FOR_LOOP
  466. }
  467. }
  468. // Cleanup
  469. close(c.pong)
  470. for range c.pong {
  471. // Drain
  472. }
  473. }
  474. // not goroutine-safe
  475. func (c *MConnection) stopPongTimer() {
  476. if c.pongTimer != nil {
  477. _ = c.pongTimer.Stop()
  478. c.pongTimer = nil
  479. }
  480. }
  481. type ConnectionStatus struct {
  482. Duration time.Duration
  483. SendMonitor flow.Status
  484. RecvMonitor flow.Status
  485. Channels []ChannelStatus
  486. }
  487. type ChannelStatus struct {
  488. ID byte
  489. SendQueueCapacity int
  490. SendQueueSize int
  491. Priority int
  492. RecentlySent int64
  493. }
  494. func (c *MConnection) Status() ConnectionStatus {
  495. var status ConnectionStatus
  496. status.Duration = time.Since(c.created)
  497. status.SendMonitor = c.sendMonitor.Status()
  498. status.RecvMonitor = c.recvMonitor.Status()
  499. status.Channels = make([]ChannelStatus, len(c.channels))
  500. for i, channel := range c.channels {
  501. status.Channels[i] = ChannelStatus{
  502. ID: channel.desc.ID,
  503. SendQueueCapacity: cap(channel.sendQueue),
  504. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  505. Priority: channel.desc.Priority,
  506. RecentlySent: channel.recentlySent,
  507. }
  508. }
  509. return status
  510. }
  511. //-----------------------------------------------------------------------------
  512. type ChannelDescriptor struct {
  513. ID byte
  514. Priority int
  515. SendQueueCapacity int
  516. RecvBufferCapacity int
  517. RecvMessageCapacity int
  518. }
  519. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  520. if chDesc.SendQueueCapacity == 0 {
  521. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  522. }
  523. if chDesc.RecvBufferCapacity == 0 {
  524. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  525. }
  526. if chDesc.RecvMessageCapacity == 0 {
  527. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  528. }
  529. filled = chDesc
  530. return
  531. }
  532. // TODO: lowercase.
  533. // NOTE: not goroutine-safe.
  534. type Channel struct {
  535. conn *MConnection
  536. desc ChannelDescriptor
  537. sendQueue chan []byte
  538. sendQueueSize int32 // atomic.
  539. recving []byte
  540. sending []byte
  541. recentlySent int64 // exponential moving average
  542. maxPacketMsgPayloadSize int
  543. Logger log.Logger
  544. }
  545. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  546. desc = desc.FillDefaults()
  547. if desc.Priority <= 0 {
  548. cmn.PanicSanity("Channel default priority must be a positive integer")
  549. }
  550. return &Channel{
  551. conn: conn,
  552. desc: desc,
  553. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  554. recving: make([]byte, 0, desc.RecvBufferCapacity),
  555. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgSize,
  556. }
  557. }
  558. func (ch *Channel) SetLogger(l log.Logger) {
  559. ch.Logger = l
  560. }
  561. // Queues message to send to this channel.
  562. // Goroutine-safe
  563. // Times out (and returns false) after defaultSendTimeout
  564. func (ch *Channel) sendBytes(bytes []byte) bool {
  565. select {
  566. case ch.sendQueue <- bytes:
  567. atomic.AddInt32(&ch.sendQueueSize, 1)
  568. return true
  569. case <-time.After(defaultSendTimeout):
  570. return false
  571. }
  572. }
  573. // Queues message to send to this channel.
  574. // Nonblocking, returns true if successful.
  575. // Goroutine-safe
  576. func (ch *Channel) trySendBytes(bytes []byte) bool {
  577. select {
  578. case ch.sendQueue <- bytes:
  579. atomic.AddInt32(&ch.sendQueueSize, 1)
  580. return true
  581. default:
  582. return false
  583. }
  584. }
  585. // Goroutine-safe
  586. func (ch *Channel) loadSendQueueSize() (size int) {
  587. return int(atomic.LoadInt32(&ch.sendQueueSize))
  588. }
  589. // Goroutine-safe
  590. // Use only as a heuristic.
  591. func (ch *Channel) canSend() bool {
  592. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  593. }
  594. // Returns true if any PacketMsgs are pending to be sent.
  595. // Call before calling nextPacketMsg()
  596. // Goroutine-safe
  597. func (ch *Channel) isSendPending() bool {
  598. if len(ch.sending) == 0 {
  599. if len(ch.sendQueue) == 0 {
  600. return false
  601. }
  602. ch.sending = <-ch.sendQueue
  603. }
  604. return true
  605. }
  606. // Creates a new PacketMsg to send.
  607. // Not goroutine-safe
  608. func (ch *Channel) nextPacketMsg() PacketMsg {
  609. packet := PacketMsg{}
  610. packet.ChannelID = byte(ch.desc.ID)
  611. maxSize := ch.maxPacketMsgPayloadSize - ch.conn.emptyPacketMsgSize
  612. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  613. if len(ch.sending) <= maxSize {
  614. packet.EOF = byte(0x01)
  615. ch.sending = nil
  616. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  617. } else {
  618. packet.EOF = byte(0x00)
  619. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  620. }
  621. return packet
  622. }
  623. // Writes next PacketMsg to w and updates c.recentlySent.
  624. // Not goroutine-safe
  625. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
  626. var packet = ch.nextPacketMsg()
  627. n, err = cdc.MarshalBinaryWriter(w, packet)
  628. ch.recentlySent += n
  629. return
  630. }
  631. // Handles incoming PacketMsgs. It returns a message bytes if message is
  632. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  633. // Not goroutine-safe
  634. func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
  635. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  636. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
  637. if recvCap < recvReceived {
  638. return nil, fmt.Errorf("Received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  639. }
  640. ch.recving = append(ch.recving, packet.Bytes...)
  641. if packet.EOF == byte(0x01) {
  642. msgBytes := ch.recving
  643. // clear the slice without re-allocating.
  644. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  645. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  646. // at which point the recving slice stops being used and should be garbage collected
  647. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  648. return msgBytes, nil
  649. }
  650. return nil, nil
  651. }
  652. // Call this periodically to update stats for throttling purposes.
  653. // Not goroutine-safe
  654. func (ch *Channel) updateStats() {
  655. // Exponential decay of stats.
  656. // TODO: optimize.
  657. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  658. }
  659. //----------------------------------------
  660. // Packet
  661. type Packet interface {
  662. AssertIsPacket()
  663. }
  664. func RegisterPacket(cdc *amino.Codec) {
  665. cdc.RegisterInterface((*Packet)(nil), nil)
  666. cdc.RegisterConcrete(PacketPing{}, "tendermint/p2p/PacketPing", nil)
  667. cdc.RegisterConcrete(PacketPong{}, "tendermint/p2p/PacketPong", nil)
  668. cdc.RegisterConcrete(PacketMsg{}, "tendermint/p2p/PacketMsg", nil)
  669. }
  670. func (_ PacketPing) AssertIsPacket() {}
  671. func (_ PacketPong) AssertIsPacket() {}
  672. func (_ PacketMsg) AssertIsPacket() {}
  673. type PacketPing struct {
  674. }
  675. type PacketPong struct {
  676. }
  677. type PacketMsg struct {
  678. ChannelID byte
  679. EOF byte // 1 means message ends here.
  680. Bytes []byte
  681. }
  682. func (mp PacketMsg) String() string {
  683. return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF)
  684. }
  685. // - Uvarint length of MustMarshalBinary(packet) = 1 or 2 bytes
  686. // (as long as it's less than 16,384 bytes)
  687. // - Prefix bytes = 4 bytes
  688. // - ChannelID field key + byte = 2 bytes
  689. // - EOF field key + byte = 2 bytes
  690. // - Bytes field key = 1 bytes
  691. // - Uvarint length of MustMarshalBinary(bytes) = 1 or 2 bytes
  692. // - Struct terminator = 1 byte
  693. // = up to 14 bytes overhead for the packet.
  694. func emptyPacketMsgSize() int {
  695. emptyPacketMsgSize := len(cdc.MustMarshalBinary(PacketMsg{
  696. ChannelID: 0x01,
  697. EOF: 1,
  698. Bytes: make([]byte, 1),
  699. }))
  700. // -1 byte of data
  701. // +1 byte because uvarint length of MustMarshalBinary(bytes) will be 2 bytes for big packets
  702. // +1 byte because uvarint length of MustMarshalBinary(packet) will be 2 bytes for big packets
  703. return emptyPacketMsgSize - 1 + 1 + 1
  704. }