You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

845 lines
23 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "sync/atomic"
  11. "time"
  12. amino "github.com/tendermint/go-amino"
  13. cmn "github.com/tendermint/tendermint/libs/common"
  14. flow "github.com/tendermint/tendermint/libs/flowrate"
  15. "github.com/tendermint/tendermint/libs/log"
  16. )
  17. const (
  18. defaultMaxPacketMsgPayloadSize = 1024
  19. numBatchPacketMsgs = 10
  20. minReadBufferSize = 1024
  21. minWriteBufferSize = 65536
  22. updateStats = 2 * time.Second
  23. // some of these defaults are written in the user config
  24. // flushThrottle, sendRate, recvRate
  25. // TODO: remove values present in config
  26. defaultFlushThrottle = 100 * time.Millisecond
  27. defaultSendQueueCapacity = 1
  28. defaultRecvBufferCapacity = 4096
  29. defaultRecvMessageCapacity = 22020096 // 21MB
  30. defaultSendRate = int64(512000) // 500KB/s
  31. defaultRecvRate = int64(512000) // 500KB/s
  32. defaultSendTimeout = 10 * time.Second
  33. defaultPingInterval = 60 * time.Second
  34. defaultPongTimeout = 45 * time.Second
  35. )
  36. type receiveCbFunc func(chID byte, msgBytes []byte)
  37. type errorCbFunc func(interface{})
  38. /*
  39. Each peer has one `MConnection` (multiplex connection) instance.
  40. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  41. several messages along a single channel of communication.
  42. Each `MConnection` handles message transmission on multiple abstract communication
  43. `Channel`s. Each channel has a globally unique byte id.
  44. The byte id and the relative priorities of each `Channel` are configured upon
  45. initialization of the connection.
  46. There are two methods for sending messages:
  47. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  48. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  49. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  50. successfully queued for the channel with the given id byte `chID`, or until the
  51. request times out. The message `msg` is serialized using Go-Amino.
  52. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  53. channel's queue is full.
  54. Inbound message bytes are handled with an onReceive callback function.
  55. */
  56. type MConnection struct {
  57. cmn.BaseService
  58. conn net.Conn
  59. bufConnReader *bufio.Reader
  60. bufConnWriter *bufio.Writer
  61. sendMonitor *flow.Monitor
  62. recvMonitor *flow.Monitor
  63. send chan struct{}
  64. pong chan struct{}
  65. channels []*Channel
  66. channelsIdx map[byte]*Channel
  67. onReceive receiveCbFunc
  68. onError errorCbFunc
  69. errored uint32
  70. config MConnConfig
  71. // Closing quitSendRoutine will cause
  72. // doneSendRoutine to close.
  73. quitSendRoutine chan struct{}
  74. doneSendRoutine chan struct{}
  75. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  76. pingTimer *cmn.RepeatTimer // send pings periodically
  77. // close conn if pong is not received in pongTimeout
  78. pongTimer *time.Timer
  79. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  80. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  81. created time.Time // time of creation
  82. _maxPacketMsgSize int
  83. }
  84. // MConnConfig is a MConnection configuration.
  85. type MConnConfig struct {
  86. SendRate int64 `mapstructure:"send_rate"`
  87. RecvRate int64 `mapstructure:"recv_rate"`
  88. // Maximum payload size
  89. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  90. // Interval to flush writes (throttled)
  91. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  92. // Interval to send pings
  93. PingInterval time.Duration `mapstructure:"ping_interval"`
  94. // Maximum wait time for pongs
  95. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  96. }
  97. // DefaultMConnConfig returns the default config.
  98. func DefaultMConnConfig() MConnConfig {
  99. return MConnConfig{
  100. SendRate: defaultSendRate,
  101. RecvRate: defaultRecvRate,
  102. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  103. FlushThrottle: defaultFlushThrottle,
  104. PingInterval: defaultPingInterval,
  105. PongTimeout: defaultPongTimeout,
  106. }
  107. }
  108. // NewMConnection wraps net.Conn and creates multiplex connection
  109. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  110. return NewMConnectionWithConfig(
  111. conn,
  112. chDescs,
  113. onReceive,
  114. onError,
  115. DefaultMConnConfig())
  116. }
  117. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  118. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig) *MConnection {
  119. if config.PongTimeout >= config.PingInterval {
  120. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  121. }
  122. mconn := &MConnection{
  123. conn: conn,
  124. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  125. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  126. sendMonitor: flow.New(0, 0),
  127. recvMonitor: flow.New(0, 0),
  128. send: make(chan struct{}, 1),
  129. pong: make(chan struct{}, 1),
  130. onReceive: onReceive,
  131. onError: onError,
  132. config: config,
  133. created: time.Now(),
  134. }
  135. // Create channels
  136. var channelsIdx = map[byte]*Channel{}
  137. var channels = []*Channel{}
  138. for _, desc := range chDescs {
  139. channel := newChannel(mconn, *desc)
  140. channelsIdx[channel.desc.ID] = channel
  141. channels = append(channels, channel)
  142. }
  143. mconn.channels = channels
  144. mconn.channelsIdx = channelsIdx
  145. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  146. // maxPacketMsgSize() is a bit heavy, so call just once
  147. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  148. return mconn
  149. }
  150. func (c *MConnection) SetLogger(l log.Logger) {
  151. c.BaseService.SetLogger(l)
  152. for _, ch := range c.channels {
  153. ch.SetLogger(l)
  154. }
  155. }
  156. // OnStart implements BaseService
  157. func (c *MConnection) OnStart() error {
  158. if err := c.BaseService.OnStart(); err != nil {
  159. return err
  160. }
  161. c.quitSendRoutine = make(chan struct{})
  162. c.doneSendRoutine = make(chan struct{})
  163. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
  164. c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval)
  165. c.pongTimeoutCh = make(chan bool, 1)
  166. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
  167. go c.sendRoutine()
  168. go c.recvRoutine()
  169. return nil
  170. }
  171. // FlushStop replicates the logic of OnStop.
  172. // It additionally ensures that all successful
  173. // .Send() calls will get flushed before closing
  174. // the connection.
  175. // NOTE: it is not safe to call this method more than once.
  176. func (c *MConnection) FlushStop() {
  177. c.BaseService.OnStop()
  178. c.flushTimer.Stop()
  179. c.pingTimer.Stop()
  180. c.chStatsTimer.Stop()
  181. if c.quitSendRoutine != nil {
  182. close(c.quitSendRoutine)
  183. // wait until the sendRoutine exits
  184. // so we dont race on calling sendSomePacketMsgs
  185. <-c.doneSendRoutine
  186. }
  187. // Send and flush all pending msgs.
  188. // By now, IsRunning == false,
  189. // so any concurrent attempts to send will fail.
  190. // Since sendRoutine has exited, we can call this
  191. // safely
  192. eof := c.sendSomePacketMsgs()
  193. for !eof {
  194. eof = c.sendSomePacketMsgs()
  195. }
  196. c.flush()
  197. // Now we can close the connection
  198. c.conn.Close() // nolint: errcheck
  199. // We can't close pong safely here because
  200. // recvRoutine may write to it after we've stopped.
  201. // Though it doesn't need to get closed at all,
  202. // we close it @ recvRoutine.
  203. // c.Stop()
  204. }
  205. // OnStop implements BaseService
  206. func (c *MConnection) OnStop() {
  207. select {
  208. case <-c.quitSendRoutine:
  209. // already quit via FlushStop
  210. return
  211. default:
  212. }
  213. c.BaseService.OnStop()
  214. c.flushTimer.Stop()
  215. c.pingTimer.Stop()
  216. c.chStatsTimer.Stop()
  217. close(c.quitSendRoutine)
  218. c.conn.Close() // nolint: errcheck
  219. // We can't close pong safely here because
  220. // recvRoutine may write to it after we've stopped.
  221. // Though it doesn't need to get closed at all,
  222. // we close it @ recvRoutine.
  223. }
  224. func (c *MConnection) String() string {
  225. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  226. }
  227. func (c *MConnection) flush() {
  228. c.Logger.Debug("Flush", "conn", c)
  229. err := c.bufConnWriter.Flush()
  230. if err != nil {
  231. c.Logger.Error("MConnection flush failed", "err", err)
  232. }
  233. }
  234. // Catch panics, usually caused by remote disconnects.
  235. func (c *MConnection) _recover() {
  236. if r := recover(); r != nil {
  237. err := cmn.ErrorWrap(r, "recovered panic in MConnection")
  238. c.stopForError(err)
  239. }
  240. }
  241. func (c *MConnection) stopForError(r interface{}) {
  242. c.Stop()
  243. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  244. if c.onError != nil {
  245. c.onError(r)
  246. }
  247. }
  248. }
  249. // Queues a message to be sent to channel.
  250. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  251. if !c.IsRunning() {
  252. return false
  253. }
  254. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  255. // Send message to channel.
  256. channel, ok := c.channelsIdx[chID]
  257. if !ok {
  258. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  259. return false
  260. }
  261. success := channel.sendBytes(msgBytes)
  262. if success {
  263. // Wake up sendRoutine if necessary
  264. select {
  265. case c.send <- struct{}{}:
  266. default:
  267. }
  268. } else {
  269. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  270. }
  271. return success
  272. }
  273. // Queues a message to be sent to channel.
  274. // Nonblocking, returns true if successful.
  275. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  276. if !c.IsRunning() {
  277. return false
  278. }
  279. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  280. // Send message to channel.
  281. channel, ok := c.channelsIdx[chID]
  282. if !ok {
  283. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  284. return false
  285. }
  286. ok = channel.trySendBytes(msgBytes)
  287. if ok {
  288. // Wake up sendRoutine if necessary
  289. select {
  290. case c.send <- struct{}{}:
  291. default:
  292. }
  293. }
  294. return ok
  295. }
  296. // CanSend returns true if you can send more data onto the chID, false
  297. // otherwise. Use only as a heuristic.
  298. func (c *MConnection) CanSend(chID byte) bool {
  299. if !c.IsRunning() {
  300. return false
  301. }
  302. channel, ok := c.channelsIdx[chID]
  303. if !ok {
  304. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  305. return false
  306. }
  307. return channel.canSend()
  308. }
  309. // sendRoutine polls for packets to send from channels.
  310. func (c *MConnection) sendRoutine() {
  311. defer c._recover()
  312. FOR_LOOP:
  313. for {
  314. var _n int64
  315. var err error
  316. SELECTION:
  317. select {
  318. case <-c.flushTimer.Ch:
  319. // NOTE: flushTimer.Set() must be called every time
  320. // something is written to .bufConnWriter.
  321. c.flush()
  322. case <-c.chStatsTimer.Chan():
  323. for _, channel := range c.channels {
  324. channel.updateStats()
  325. }
  326. case <-c.pingTimer.Chan():
  327. c.Logger.Debug("Send Ping")
  328. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
  329. if err != nil {
  330. break SELECTION
  331. }
  332. c.sendMonitor.Update(int(_n))
  333. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  334. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  335. select {
  336. case c.pongTimeoutCh <- true:
  337. default:
  338. }
  339. })
  340. c.flush()
  341. case timeout := <-c.pongTimeoutCh:
  342. if timeout {
  343. c.Logger.Debug("Pong timeout")
  344. err = errors.New("pong timeout")
  345. } else {
  346. c.stopPongTimer()
  347. }
  348. case <-c.pong:
  349. c.Logger.Debug("Send Pong")
  350. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{})
  351. if err != nil {
  352. break SELECTION
  353. }
  354. c.sendMonitor.Update(int(_n))
  355. c.flush()
  356. case <-c.quitSendRoutine:
  357. close(c.doneSendRoutine)
  358. break FOR_LOOP
  359. case <-c.send:
  360. // Send some PacketMsgs
  361. eof := c.sendSomePacketMsgs()
  362. if !eof {
  363. // Keep sendRoutine awake.
  364. select {
  365. case c.send <- struct{}{}:
  366. default:
  367. }
  368. }
  369. }
  370. if !c.IsRunning() {
  371. break FOR_LOOP
  372. }
  373. if err != nil {
  374. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  375. c.stopForError(err)
  376. break FOR_LOOP
  377. }
  378. }
  379. // Cleanup
  380. c.stopPongTimer()
  381. }
  382. // Returns true if messages from channels were exhausted.
  383. // Blocks in accordance to .sendMonitor throttling.
  384. func (c *MConnection) sendSomePacketMsgs() bool {
  385. // Block until .sendMonitor says we can write.
  386. // Once we're ready we send more than we asked for,
  387. // but amortized it should even out.
  388. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  389. // Now send some PacketMsgs.
  390. for i := 0; i < numBatchPacketMsgs; i++ {
  391. if c.sendPacketMsg() {
  392. return true
  393. }
  394. }
  395. return false
  396. }
  397. // Returns true if messages from channels were exhausted.
  398. func (c *MConnection) sendPacketMsg() bool {
  399. // Choose a channel to create a PacketMsg from.
  400. // The chosen channel will be the one whose recentlySent/priority is the least.
  401. var leastRatio float32 = math.MaxFloat32
  402. var leastChannel *Channel
  403. for _, channel := range c.channels {
  404. // If nothing to send, skip this channel
  405. if !channel.isSendPending() {
  406. continue
  407. }
  408. // Get ratio, and keep track of lowest ratio.
  409. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  410. if ratio < leastRatio {
  411. leastRatio = ratio
  412. leastChannel = channel
  413. }
  414. }
  415. // Nothing to send?
  416. if leastChannel == nil {
  417. return true
  418. }
  419. // c.Logger.Info("Found a msgPacket to send")
  420. // Make & send a PacketMsg from this channel
  421. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  422. if err != nil {
  423. c.Logger.Error("Failed to write PacketMsg", "err", err)
  424. c.stopForError(err)
  425. return true
  426. }
  427. c.sendMonitor.Update(int(_n))
  428. c.flushTimer.Set()
  429. return false
  430. }
  431. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  432. // After a whole message has been assembled, it's pushed to onReceive().
  433. // Blocks depending on how the connection is throttled.
  434. // Otherwise, it never blocks.
  435. func (c *MConnection) recvRoutine() {
  436. defer c._recover()
  437. FOR_LOOP:
  438. for {
  439. // Block until .recvMonitor says we can read.
  440. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  441. // Peek into bufConnReader for debugging
  442. /*
  443. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  444. bz, err := c.bufConnReader.Peek(cmn.MinInt(numBytes, 100))
  445. if err == nil {
  446. // return
  447. } else {
  448. c.Logger.Debug("Error peeking connection buffer", "err", err)
  449. // return nil
  450. }
  451. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  452. }
  453. */
  454. // Read packet type
  455. var packet Packet
  456. var _n int64
  457. var err error
  458. _n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize))
  459. c.recvMonitor.Update(int(_n))
  460. if err != nil {
  461. if c.IsRunning() {
  462. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  463. c.stopForError(err)
  464. }
  465. break FOR_LOOP
  466. }
  467. // Read more depending on packet type.
  468. switch pkt := packet.(type) {
  469. case PacketPing:
  470. // TODO: prevent abuse, as they cause flush()'s.
  471. // https://github.com/tendermint/tendermint/issues/1190
  472. c.Logger.Debug("Receive Ping")
  473. select {
  474. case c.pong <- struct{}{}:
  475. default:
  476. // never block
  477. }
  478. case PacketPong:
  479. c.Logger.Debug("Receive Pong")
  480. select {
  481. case c.pongTimeoutCh <- false:
  482. default:
  483. // never block
  484. }
  485. case PacketMsg:
  486. channel, ok := c.channelsIdx[pkt.ChannelID]
  487. if !ok || channel == nil {
  488. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  489. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  490. c.stopForError(err)
  491. break FOR_LOOP
  492. }
  493. msgBytes, err := channel.recvPacketMsg(pkt)
  494. if err != nil {
  495. if c.IsRunning() {
  496. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  497. c.stopForError(err)
  498. }
  499. break FOR_LOOP
  500. }
  501. if msgBytes != nil {
  502. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  503. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  504. c.onReceive(pkt.ChannelID, msgBytes)
  505. }
  506. default:
  507. err := fmt.Errorf("Unknown message type %v", reflect.TypeOf(packet))
  508. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  509. c.stopForError(err)
  510. break FOR_LOOP
  511. }
  512. }
  513. // Cleanup
  514. close(c.pong)
  515. for range c.pong {
  516. // Drain
  517. }
  518. }
  519. // not goroutine-safe
  520. func (c *MConnection) stopPongTimer() {
  521. if c.pongTimer != nil {
  522. _ = c.pongTimer.Stop()
  523. c.pongTimer = nil
  524. }
  525. }
  526. // maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead
  527. // of amino encoding.
  528. func (c *MConnection) maxPacketMsgSize() int {
  529. return len(cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{
  530. ChannelID: 0x01,
  531. EOF: 1,
  532. Bytes: make([]byte, c.config.MaxPacketMsgPayloadSize),
  533. })) + 10 // leave room for changes in amino
  534. }
  535. type ConnectionStatus struct {
  536. Duration time.Duration
  537. SendMonitor flow.Status
  538. RecvMonitor flow.Status
  539. Channels []ChannelStatus
  540. }
  541. type ChannelStatus struct {
  542. ID byte
  543. SendQueueCapacity int
  544. SendQueueSize int
  545. Priority int
  546. RecentlySent int64
  547. }
  548. func (c *MConnection) Status() ConnectionStatus {
  549. var status ConnectionStatus
  550. status.Duration = time.Since(c.created)
  551. status.SendMonitor = c.sendMonitor.Status()
  552. status.RecvMonitor = c.recvMonitor.Status()
  553. status.Channels = make([]ChannelStatus, len(c.channels))
  554. for i, channel := range c.channels {
  555. status.Channels[i] = ChannelStatus{
  556. ID: channel.desc.ID,
  557. SendQueueCapacity: cap(channel.sendQueue),
  558. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  559. Priority: channel.desc.Priority,
  560. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  561. }
  562. }
  563. return status
  564. }
  565. //-----------------------------------------------------------------------------
  566. type ChannelDescriptor struct {
  567. ID byte
  568. Priority int
  569. SendQueueCapacity int
  570. RecvBufferCapacity int
  571. RecvMessageCapacity int
  572. }
  573. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  574. if chDesc.SendQueueCapacity == 0 {
  575. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  576. }
  577. if chDesc.RecvBufferCapacity == 0 {
  578. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  579. }
  580. if chDesc.RecvMessageCapacity == 0 {
  581. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  582. }
  583. filled = chDesc
  584. return
  585. }
  586. // TODO: lowercase.
  587. // NOTE: not goroutine-safe.
  588. type Channel struct {
  589. conn *MConnection
  590. desc ChannelDescriptor
  591. sendQueue chan []byte
  592. sendQueueSize int32 // atomic.
  593. recving []byte
  594. sending []byte
  595. recentlySent int64 // exponential moving average
  596. maxPacketMsgPayloadSize int
  597. Logger log.Logger
  598. }
  599. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  600. desc = desc.FillDefaults()
  601. if desc.Priority <= 0 {
  602. cmn.PanicSanity("Channel default priority must be a positive integer")
  603. }
  604. return &Channel{
  605. conn: conn,
  606. desc: desc,
  607. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  608. recving: make([]byte, 0, desc.RecvBufferCapacity),
  609. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  610. }
  611. }
  612. func (ch *Channel) SetLogger(l log.Logger) {
  613. ch.Logger = l
  614. }
  615. // Queues message to send to this channel.
  616. // Goroutine-safe
  617. // Times out (and returns false) after defaultSendTimeout
  618. func (ch *Channel) sendBytes(bytes []byte) bool {
  619. select {
  620. case ch.sendQueue <- bytes:
  621. atomic.AddInt32(&ch.sendQueueSize, 1)
  622. return true
  623. case <-time.After(defaultSendTimeout):
  624. return false
  625. }
  626. }
  627. // Queues message to send to this channel.
  628. // Nonblocking, returns true if successful.
  629. // Goroutine-safe
  630. func (ch *Channel) trySendBytes(bytes []byte) bool {
  631. select {
  632. case ch.sendQueue <- bytes:
  633. atomic.AddInt32(&ch.sendQueueSize, 1)
  634. return true
  635. default:
  636. return false
  637. }
  638. }
  639. // Goroutine-safe
  640. func (ch *Channel) loadSendQueueSize() (size int) {
  641. return int(atomic.LoadInt32(&ch.sendQueueSize))
  642. }
  643. // Goroutine-safe
  644. // Use only as a heuristic.
  645. func (ch *Channel) canSend() bool {
  646. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  647. }
  648. // Returns true if any PacketMsgs are pending to be sent.
  649. // Call before calling nextPacketMsg()
  650. // Goroutine-safe
  651. func (ch *Channel) isSendPending() bool {
  652. if len(ch.sending) == 0 {
  653. if len(ch.sendQueue) == 0 {
  654. return false
  655. }
  656. ch.sending = <-ch.sendQueue
  657. }
  658. return true
  659. }
  660. // Creates a new PacketMsg to send.
  661. // Not goroutine-safe
  662. func (ch *Channel) nextPacketMsg() PacketMsg {
  663. packet := PacketMsg{}
  664. packet.ChannelID = byte(ch.desc.ID)
  665. maxSize := ch.maxPacketMsgPayloadSize
  666. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  667. if len(ch.sending) <= maxSize {
  668. packet.EOF = byte(0x01)
  669. ch.sending = nil
  670. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  671. } else {
  672. packet.EOF = byte(0x00)
  673. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  674. }
  675. return packet
  676. }
  677. // Writes next PacketMsg to w and updates c.recentlySent.
  678. // Not goroutine-safe
  679. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
  680. var packet = ch.nextPacketMsg()
  681. n, err = cdc.MarshalBinaryLengthPrefixedWriter(w, packet)
  682. atomic.AddInt64(&ch.recentlySent, n)
  683. return
  684. }
  685. // Handles incoming PacketMsgs. It returns a message bytes if message is
  686. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  687. // Not goroutine-safe
  688. func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
  689. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  690. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
  691. if recvCap < recvReceived {
  692. return nil, fmt.Errorf("Received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  693. }
  694. ch.recving = append(ch.recving, packet.Bytes...)
  695. if packet.EOF == byte(0x01) {
  696. msgBytes := ch.recving
  697. // clear the slice without re-allocating.
  698. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  699. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  700. // at which point the recving slice stops being used and should be garbage collected
  701. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  702. return msgBytes, nil
  703. }
  704. return nil, nil
  705. }
  706. // Call this periodically to update stats for throttling purposes.
  707. // Not goroutine-safe
  708. func (ch *Channel) updateStats() {
  709. // Exponential decay of stats.
  710. // TODO: optimize.
  711. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  712. }
  713. //----------------------------------------
  714. // Packet
  715. type Packet interface {
  716. AssertIsPacket()
  717. }
  718. func RegisterPacket(cdc *amino.Codec) {
  719. cdc.RegisterInterface((*Packet)(nil), nil)
  720. cdc.RegisterConcrete(PacketPing{}, "tendermint/p2p/PacketPing", nil)
  721. cdc.RegisterConcrete(PacketPong{}, "tendermint/p2p/PacketPong", nil)
  722. cdc.RegisterConcrete(PacketMsg{}, "tendermint/p2p/PacketMsg", nil)
  723. }
  724. func (_ PacketPing) AssertIsPacket() {}
  725. func (_ PacketPong) AssertIsPacket() {}
  726. func (_ PacketMsg) AssertIsPacket() {}
  727. type PacketPing struct {
  728. }
  729. type PacketPong struct {
  730. }
  731. type PacketMsg struct {
  732. ChannelID byte
  733. EOF byte // 1 means message ends here.
  734. Bytes []byte
  735. }
  736. func (mp PacketMsg) String() string {
  737. return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF)
  738. }