You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

905 lines
24 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "runtime/debug"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "math"
  9. "net"
  10. "reflect"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. amino "github.com/tendermint/go-amino"
  15. flow "github.com/tendermint/tendermint/libs/flowrate"
  16. "github.com/tendermint/tendermint/libs/log"
  17. tmmath "github.com/tendermint/tendermint/libs/math"
  18. "github.com/tendermint/tendermint/libs/service"
  19. "github.com/tendermint/tendermint/libs/timer"
  20. )
  21. const (
  22. defaultMaxPacketMsgPayloadSize = 1024
  23. numBatchPacketMsgs = 10
  24. minReadBufferSize = 1024
  25. minWriteBufferSize = 65536
  26. updateStats = 2 * time.Second
  27. // some of these defaults are written in the user config
  28. // flushThrottle, sendRate, recvRate
  29. // TODO: remove values present in config
  30. defaultFlushThrottle = 100 * time.Millisecond
  31. defaultSendQueueCapacity = 1
  32. defaultRecvBufferCapacity = 4096
  33. defaultRecvMessageCapacity = 22020096 // 21MB
  34. defaultSendRate = int64(512000) // 500KB/s
  35. defaultRecvRate = int64(512000) // 500KB/s
  36. defaultSendTimeout = 10 * time.Second
  37. defaultPingInterval = 60 * time.Second
  38. defaultPongTimeout = 45 * time.Second
  39. )
  40. type receiveCbFunc func(chID byte, msgBytes []byte)
  41. type errorCbFunc func(interface{})
  42. /*
  43. Each peer has one `MConnection` (multiplex connection) instance.
  44. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  45. several messages along a single channel of communication.
  46. Each `MConnection` handles message transmission on multiple abstract communication
  47. `Channel`s. Each channel has a globally unique byte id.
  48. The byte id and the relative priorities of each `Channel` are configured upon
  49. initialization of the connection.
  50. There are two methods for sending messages:
  51. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  52. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  53. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  54. successfully queued for the channel with the given id byte `chID`, or until the
  55. request times out. The message `msg` is serialized using Go-Amino.
  56. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  57. channel's queue is full.
  58. Inbound message bytes are handled with an onReceive callback function.
  59. */
  60. type MConnection struct {
  61. service.BaseService
  62. conn net.Conn
  63. bufConnReader *bufio.Reader
  64. bufConnWriter *bufio.Writer
  65. sendMonitor *flow.Monitor
  66. recvMonitor *flow.Monitor
  67. send chan struct{}
  68. pong chan struct{}
  69. channels []*Channel
  70. channelsIdx map[byte]*Channel
  71. onReceive receiveCbFunc
  72. onError errorCbFunc
  73. errored uint32
  74. config MConnConfig
  75. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  76. // doneSendRoutine is closed when the sendRoutine actually quits.
  77. quitSendRoutine chan struct{}
  78. doneSendRoutine chan struct{}
  79. // Closing quitRecvRouting will cause the recvRouting to eventually quit.
  80. quitRecvRoutine chan struct{}
  81. // used to ensure FlushStop and OnStop
  82. // are safe to call concurrently.
  83. stopMtx sync.Mutex
  84. flushTimer *timer.ThrottleTimer // flush writes as necessary but throttled.
  85. pingTimer *time.Ticker // send pings periodically
  86. // close conn if pong is not received in pongTimeout
  87. pongTimer *time.Timer
  88. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  89. chStatsTimer *time.Ticker // update channel stats periodically
  90. created time.Time // time of creation
  91. _maxPacketMsgSize int
  92. }
  93. // MConnConfig is a MConnection configuration.
  94. type MConnConfig struct {
  95. SendRate int64 `mapstructure:"send_rate"`
  96. RecvRate int64 `mapstructure:"recv_rate"`
  97. // Maximum payload size
  98. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  99. // Interval to flush writes (throttled)
  100. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  101. // Interval to send pings
  102. PingInterval time.Duration `mapstructure:"ping_interval"`
  103. // Maximum wait time for pongs
  104. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  105. }
  106. // DefaultMConnConfig returns the default config.
  107. func DefaultMConnConfig() MConnConfig {
  108. return MConnConfig{
  109. SendRate: defaultSendRate,
  110. RecvRate: defaultRecvRate,
  111. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  112. FlushThrottle: defaultFlushThrottle,
  113. PingInterval: defaultPingInterval,
  114. PongTimeout: defaultPongTimeout,
  115. }
  116. }
  117. // NewMConnection wraps net.Conn and creates multiplex connection
  118. func NewMConnection(
  119. conn net.Conn,
  120. chDescs []*ChannelDescriptor,
  121. onReceive receiveCbFunc,
  122. onError errorCbFunc,
  123. ) *MConnection {
  124. return NewMConnectionWithConfig(
  125. conn,
  126. chDescs,
  127. onReceive,
  128. onError,
  129. DefaultMConnConfig())
  130. }
  131. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  132. func NewMConnectionWithConfig(
  133. conn net.Conn,
  134. chDescs []*ChannelDescriptor,
  135. onReceive receiveCbFunc,
  136. onError errorCbFunc,
  137. config MConnConfig,
  138. ) *MConnection {
  139. if config.PongTimeout >= config.PingInterval {
  140. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  141. }
  142. mconn := &MConnection{
  143. conn: conn,
  144. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  145. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  146. sendMonitor: flow.New(0, 0),
  147. recvMonitor: flow.New(0, 0),
  148. send: make(chan struct{}, 1),
  149. pong: make(chan struct{}, 1),
  150. onReceive: onReceive,
  151. onError: onError,
  152. config: config,
  153. created: time.Now(),
  154. }
  155. // Create channels
  156. var channelsIdx = map[byte]*Channel{}
  157. var channels = []*Channel{}
  158. for _, desc := range chDescs {
  159. channel := newChannel(mconn, *desc)
  160. channelsIdx[channel.desc.ID] = channel
  161. channels = append(channels, channel)
  162. }
  163. mconn.channels = channels
  164. mconn.channelsIdx = channelsIdx
  165. mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn)
  166. // maxPacketMsgSize() is a bit heavy, so call just once
  167. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  168. return mconn
  169. }
  170. func (c *MConnection) SetLogger(l log.Logger) {
  171. c.BaseService.SetLogger(l)
  172. for _, ch := range c.channels {
  173. ch.SetLogger(l)
  174. }
  175. }
  176. // OnStart implements BaseService
  177. func (c *MConnection) OnStart() error {
  178. if err := c.BaseService.OnStart(); err != nil {
  179. return err
  180. }
  181. c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle)
  182. c.pingTimer = time.NewTicker(c.config.PingInterval)
  183. c.pongTimeoutCh = make(chan bool, 1)
  184. c.chStatsTimer = time.NewTicker(updateStats)
  185. c.quitSendRoutine = make(chan struct{})
  186. c.doneSendRoutine = make(chan struct{})
  187. c.quitRecvRoutine = make(chan struct{})
  188. go c.sendRoutine()
  189. go c.recvRoutine()
  190. return nil
  191. }
  192. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  193. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  194. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  195. func (c *MConnection) stopServices() (alreadyStopped bool) {
  196. c.stopMtx.Lock()
  197. defer c.stopMtx.Unlock()
  198. select {
  199. case <-c.quitSendRoutine:
  200. // already quit
  201. return true
  202. default:
  203. }
  204. select {
  205. case <-c.quitRecvRoutine:
  206. // already quit
  207. return true
  208. default:
  209. }
  210. c.BaseService.OnStop()
  211. c.flushTimer.Stop()
  212. c.pingTimer.Stop()
  213. c.chStatsTimer.Stop()
  214. // inform the recvRouting that we are shutting down
  215. close(c.quitRecvRoutine)
  216. close(c.quitSendRoutine)
  217. return false
  218. }
  219. // FlushStop replicates the logic of OnStop.
  220. // It additionally ensures that all successful
  221. // .Send() calls will get flushed before closing
  222. // the connection.
  223. func (c *MConnection) FlushStop() {
  224. if c.stopServices() {
  225. return
  226. }
  227. // this block is unique to FlushStop
  228. {
  229. // wait until the sendRoutine exits
  230. // so we dont race on calling sendSomePacketMsgs
  231. <-c.doneSendRoutine
  232. // Send and flush all pending msgs.
  233. // Since sendRoutine has exited, we can call this
  234. // safely
  235. eof := c.sendSomePacketMsgs()
  236. for !eof {
  237. eof = c.sendSomePacketMsgs()
  238. }
  239. c.flush()
  240. // Now we can close the connection
  241. }
  242. c.conn.Close() // nolint: errcheck
  243. // We can't close pong safely here because
  244. // recvRoutine may write to it after we've stopped.
  245. // Though it doesn't need to get closed at all,
  246. // we close it @ recvRoutine.
  247. // c.Stop()
  248. }
  249. // OnStop implements BaseService
  250. func (c *MConnection) OnStop() {
  251. if c.stopServices() {
  252. return
  253. }
  254. c.conn.Close() // nolint: errcheck
  255. // We can't close pong safely here because
  256. // recvRoutine may write to it after we've stopped.
  257. // Though it doesn't need to get closed at all,
  258. // we close it @ recvRoutine.
  259. }
  260. func (c *MConnection) String() string {
  261. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  262. }
  263. func (c *MConnection) flush() {
  264. c.Logger.Debug("Flush", "conn", c)
  265. err := c.bufConnWriter.Flush()
  266. if err != nil {
  267. c.Logger.Error("MConnection flush failed", "err", err)
  268. }
  269. }
  270. // Catch panics, usually caused by remote disconnects.
  271. func (c *MConnection) _recover() {
  272. if r := recover(); r != nil {
  273. c.Logger.Error("MConnection panicked", "err", r, "stack", string(debug.Stack()))
  274. c.stopForError(fmt.Errorf("recovered from panic: %v", r))
  275. }
  276. }
  277. func (c *MConnection) stopForError(r interface{}) {
  278. c.Stop()
  279. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  280. if c.onError != nil {
  281. c.onError(r)
  282. }
  283. }
  284. }
  285. // Queues a message to be sent to channel.
  286. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  287. if !c.IsRunning() {
  288. return false
  289. }
  290. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  291. // Send message to channel.
  292. channel, ok := c.channelsIdx[chID]
  293. if !ok {
  294. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  295. return false
  296. }
  297. success := channel.sendBytes(msgBytes)
  298. if success {
  299. // Wake up sendRoutine if necessary
  300. select {
  301. case c.send <- struct{}{}:
  302. default:
  303. }
  304. } else {
  305. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  306. }
  307. return success
  308. }
  309. // Queues a message to be sent to channel.
  310. // Nonblocking, returns true if successful.
  311. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  312. if !c.IsRunning() {
  313. return false
  314. }
  315. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  316. // Send message to channel.
  317. channel, ok := c.channelsIdx[chID]
  318. if !ok {
  319. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  320. return false
  321. }
  322. ok = channel.trySendBytes(msgBytes)
  323. if ok {
  324. // Wake up sendRoutine if necessary
  325. select {
  326. case c.send <- struct{}{}:
  327. default:
  328. }
  329. }
  330. return ok
  331. }
  332. // CanSend returns true if you can send more data onto the chID, false
  333. // otherwise. Use only as a heuristic.
  334. func (c *MConnection) CanSend(chID byte) bool {
  335. if !c.IsRunning() {
  336. return false
  337. }
  338. channel, ok := c.channelsIdx[chID]
  339. if !ok {
  340. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  341. return false
  342. }
  343. return channel.canSend()
  344. }
  345. // sendRoutine polls for packets to send from channels.
  346. func (c *MConnection) sendRoutine() {
  347. defer c._recover()
  348. FOR_LOOP:
  349. for {
  350. var _n int64
  351. var err error
  352. SELECTION:
  353. select {
  354. case <-c.flushTimer.Ch:
  355. // NOTE: flushTimer.Set() must be called every time
  356. // something is written to .bufConnWriter.
  357. c.flush()
  358. case <-c.chStatsTimer.C:
  359. for _, channel := range c.channels {
  360. channel.updateStats()
  361. }
  362. case <-c.pingTimer.C:
  363. c.Logger.Debug("Send Ping")
  364. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
  365. if err != nil {
  366. break SELECTION
  367. }
  368. c.sendMonitor.Update(int(_n))
  369. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  370. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  371. select {
  372. case c.pongTimeoutCh <- true:
  373. default:
  374. }
  375. })
  376. c.flush()
  377. case timeout := <-c.pongTimeoutCh:
  378. if timeout {
  379. c.Logger.Debug("Pong timeout")
  380. err = errors.New("pong timeout")
  381. } else {
  382. c.stopPongTimer()
  383. }
  384. case <-c.pong:
  385. c.Logger.Debug("Send Pong")
  386. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{})
  387. if err != nil {
  388. break SELECTION
  389. }
  390. c.sendMonitor.Update(int(_n))
  391. c.flush()
  392. case <-c.quitSendRoutine:
  393. break FOR_LOOP
  394. case <-c.send:
  395. // Send some PacketMsgs
  396. eof := c.sendSomePacketMsgs()
  397. if !eof {
  398. // Keep sendRoutine awake.
  399. select {
  400. case c.send <- struct{}{}:
  401. default:
  402. }
  403. }
  404. }
  405. if !c.IsRunning() {
  406. break FOR_LOOP
  407. }
  408. if err != nil {
  409. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  410. c.stopForError(err)
  411. break FOR_LOOP
  412. }
  413. }
  414. // Cleanup
  415. c.stopPongTimer()
  416. close(c.doneSendRoutine)
  417. }
  418. // Returns true if messages from channels were exhausted.
  419. // Blocks in accordance to .sendMonitor throttling.
  420. func (c *MConnection) sendSomePacketMsgs() bool {
  421. // Block until .sendMonitor says we can write.
  422. // Once we're ready we send more than we asked for,
  423. // but amortized it should even out.
  424. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  425. // Now send some PacketMsgs.
  426. for i := 0; i < numBatchPacketMsgs; i++ {
  427. if c.sendPacketMsg() {
  428. return true
  429. }
  430. }
  431. return false
  432. }
  433. // Returns true if messages from channels were exhausted.
  434. func (c *MConnection) sendPacketMsg() bool {
  435. // Choose a channel to create a PacketMsg from.
  436. // The chosen channel will be the one whose recentlySent/priority is the least.
  437. var leastRatio float32 = math.MaxFloat32
  438. var leastChannel *Channel
  439. for _, channel := range c.channels {
  440. // If nothing to send, skip this channel
  441. if !channel.isSendPending() {
  442. continue
  443. }
  444. // Get ratio, and keep track of lowest ratio.
  445. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  446. if ratio < leastRatio {
  447. leastRatio = ratio
  448. leastChannel = channel
  449. }
  450. }
  451. // Nothing to send?
  452. if leastChannel == nil {
  453. return true
  454. }
  455. // c.Logger.Info("Found a msgPacket to send")
  456. // Make & send a PacketMsg from this channel
  457. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  458. if err != nil {
  459. c.Logger.Error("Failed to write PacketMsg", "err", err)
  460. c.stopForError(err)
  461. return true
  462. }
  463. c.sendMonitor.Update(int(_n))
  464. c.flushTimer.Set()
  465. return false
  466. }
  467. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  468. // After a whole message has been assembled, it's pushed to onReceive().
  469. // Blocks depending on how the connection is throttled.
  470. // Otherwise, it never blocks.
  471. func (c *MConnection) recvRoutine() {
  472. defer c._recover()
  473. FOR_LOOP:
  474. for {
  475. // Block until .recvMonitor says we can read.
  476. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  477. // Peek into bufConnReader for debugging
  478. /*
  479. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  480. bz, err := c.bufConnReader.Peek(tmmath.MinInt(numBytes, 100))
  481. if err == nil {
  482. // return
  483. } else {
  484. c.Logger.Debug("Error peeking connection buffer", "err", err)
  485. // return nil
  486. }
  487. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  488. }
  489. */
  490. // Read packet type
  491. var packet Packet
  492. var _n int64
  493. var err error
  494. _n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize))
  495. c.recvMonitor.Update(int(_n))
  496. if err != nil {
  497. // stopServices was invoked and we are shutting down
  498. // receiving is excpected to fail since we will close the connection
  499. select {
  500. case <-c.quitRecvRoutine:
  501. break FOR_LOOP
  502. default:
  503. }
  504. if c.IsRunning() {
  505. if err == io.EOF {
  506. c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
  507. } else {
  508. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  509. }
  510. c.stopForError(err)
  511. }
  512. break FOR_LOOP
  513. }
  514. // Read more depending on packet type.
  515. switch pkt := packet.(type) {
  516. case PacketPing:
  517. // TODO: prevent abuse, as they cause flush()'s.
  518. // https://github.com/tendermint/tendermint/issues/1190
  519. c.Logger.Debug("Receive Ping")
  520. select {
  521. case c.pong <- struct{}{}:
  522. default:
  523. // never block
  524. }
  525. case PacketPong:
  526. c.Logger.Debug("Receive Pong")
  527. select {
  528. case c.pongTimeoutCh <- false:
  529. default:
  530. // never block
  531. }
  532. case PacketMsg:
  533. channel, ok := c.channelsIdx[pkt.ChannelID]
  534. if !ok || channel == nil {
  535. err := fmt.Errorf("unknown channel %X", pkt.ChannelID)
  536. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  537. c.stopForError(err)
  538. break FOR_LOOP
  539. }
  540. msgBytes, err := channel.recvPacketMsg(pkt)
  541. if err != nil {
  542. if c.IsRunning() {
  543. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  544. c.stopForError(err)
  545. }
  546. break FOR_LOOP
  547. }
  548. if msgBytes != nil {
  549. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  550. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  551. c.onReceive(pkt.ChannelID, msgBytes)
  552. }
  553. default:
  554. err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
  555. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  556. c.stopForError(err)
  557. break FOR_LOOP
  558. }
  559. }
  560. // Cleanup
  561. close(c.pong)
  562. for range c.pong {
  563. // Drain
  564. }
  565. }
  566. // not goroutine-safe
  567. func (c *MConnection) stopPongTimer() {
  568. if c.pongTimer != nil {
  569. _ = c.pongTimer.Stop()
  570. c.pongTimer = nil
  571. }
  572. }
  573. // maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead
  574. // of amino encoding.
  575. func (c *MConnection) maxPacketMsgSize() int {
  576. return len(cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{
  577. ChannelID: 0x01,
  578. EOF: 1,
  579. Bytes: make([]byte, c.config.MaxPacketMsgPayloadSize),
  580. })) + 10 // leave room for changes in amino
  581. }
  582. type ConnectionStatus struct {
  583. Duration time.Duration
  584. SendMonitor flow.Status
  585. RecvMonitor flow.Status
  586. Channels []ChannelStatus
  587. }
  588. type ChannelStatus struct {
  589. ID byte
  590. SendQueueCapacity int
  591. SendQueueSize int
  592. Priority int
  593. RecentlySent int64
  594. }
  595. func (c *MConnection) Status() ConnectionStatus {
  596. var status ConnectionStatus
  597. status.Duration = time.Since(c.created)
  598. status.SendMonitor = c.sendMonitor.Status()
  599. status.RecvMonitor = c.recvMonitor.Status()
  600. status.Channels = make([]ChannelStatus, len(c.channels))
  601. for i, channel := range c.channels {
  602. status.Channels[i] = ChannelStatus{
  603. ID: channel.desc.ID,
  604. SendQueueCapacity: cap(channel.sendQueue),
  605. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  606. Priority: channel.desc.Priority,
  607. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  608. }
  609. }
  610. return status
  611. }
  612. //-----------------------------------------------------------------------------
  613. type ChannelDescriptor struct {
  614. ID byte
  615. Priority int
  616. SendQueueCapacity int
  617. RecvBufferCapacity int
  618. RecvMessageCapacity int
  619. }
  620. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  621. if chDesc.SendQueueCapacity == 0 {
  622. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  623. }
  624. if chDesc.RecvBufferCapacity == 0 {
  625. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  626. }
  627. if chDesc.RecvMessageCapacity == 0 {
  628. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  629. }
  630. filled = chDesc
  631. return
  632. }
  633. // TODO: lowercase.
  634. // NOTE: not goroutine-safe.
  635. type Channel struct {
  636. conn *MConnection
  637. desc ChannelDescriptor
  638. sendQueue chan []byte
  639. sendQueueSize int32 // atomic.
  640. recving []byte
  641. sending []byte
  642. recentlySent int64 // exponential moving average
  643. maxPacketMsgPayloadSize int
  644. Logger log.Logger
  645. }
  646. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  647. desc = desc.FillDefaults()
  648. if desc.Priority <= 0 {
  649. panic("Channel default priority must be a positive integer")
  650. }
  651. return &Channel{
  652. conn: conn,
  653. desc: desc,
  654. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  655. recving: make([]byte, 0, desc.RecvBufferCapacity),
  656. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  657. }
  658. }
  659. func (ch *Channel) SetLogger(l log.Logger) {
  660. ch.Logger = l
  661. }
  662. // Queues message to send to this channel.
  663. // Goroutine-safe
  664. // Times out (and returns false) after defaultSendTimeout
  665. func (ch *Channel) sendBytes(bytes []byte) bool {
  666. select {
  667. case ch.sendQueue <- bytes:
  668. atomic.AddInt32(&ch.sendQueueSize, 1)
  669. return true
  670. case <-time.After(defaultSendTimeout):
  671. return false
  672. }
  673. }
  674. // Queues message to send to this channel.
  675. // Nonblocking, returns true if successful.
  676. // Goroutine-safe
  677. func (ch *Channel) trySendBytes(bytes []byte) bool {
  678. select {
  679. case ch.sendQueue <- bytes:
  680. atomic.AddInt32(&ch.sendQueueSize, 1)
  681. return true
  682. default:
  683. return false
  684. }
  685. }
  686. // Goroutine-safe
  687. func (ch *Channel) loadSendQueueSize() (size int) {
  688. return int(atomic.LoadInt32(&ch.sendQueueSize))
  689. }
  690. // Goroutine-safe
  691. // Use only as a heuristic.
  692. func (ch *Channel) canSend() bool {
  693. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  694. }
  695. // Returns true if any PacketMsgs are pending to be sent.
  696. // Call before calling nextPacketMsg()
  697. // Goroutine-safe
  698. func (ch *Channel) isSendPending() bool {
  699. if len(ch.sending) == 0 {
  700. if len(ch.sendQueue) == 0 {
  701. return false
  702. }
  703. ch.sending = <-ch.sendQueue
  704. }
  705. return true
  706. }
  707. // Creates a new PacketMsg to send.
  708. // Not goroutine-safe
  709. func (ch *Channel) nextPacketMsg() PacketMsg {
  710. packet := PacketMsg{}
  711. packet.ChannelID = ch.desc.ID
  712. maxSize := ch.maxPacketMsgPayloadSize
  713. packet.Bytes = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))]
  714. if len(ch.sending) <= maxSize {
  715. packet.EOF = byte(0x01)
  716. ch.sending = nil
  717. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  718. } else {
  719. packet.EOF = byte(0x00)
  720. ch.sending = ch.sending[tmmath.MinInt(maxSize, len(ch.sending)):]
  721. }
  722. return packet
  723. }
  724. // Writes next PacketMsg to w and updates c.recentlySent.
  725. // Not goroutine-safe
  726. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
  727. var packet = ch.nextPacketMsg()
  728. n, err = cdc.MarshalBinaryLengthPrefixedWriter(w, packet)
  729. atomic.AddInt64(&ch.recentlySent, n)
  730. return
  731. }
  732. // Handles incoming PacketMsgs. It returns a message bytes if message is
  733. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  734. // Not goroutine-safe
  735. func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
  736. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  737. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
  738. if recvCap < recvReceived {
  739. return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  740. }
  741. ch.recving = append(ch.recving, packet.Bytes...)
  742. if packet.EOF == byte(0x01) {
  743. msgBytes := ch.recving
  744. // clear the slice without re-allocating.
  745. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  746. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  747. // at which point the recving slice stops being used and should be garbage collected
  748. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  749. return msgBytes, nil
  750. }
  751. return nil, nil
  752. }
  753. // Call this periodically to update stats for throttling purposes.
  754. // Not goroutine-safe
  755. func (ch *Channel) updateStats() {
  756. // Exponential decay of stats.
  757. // TODO: optimize.
  758. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  759. }
  760. //----------------------------------------
  761. // Packet
  762. type Packet interface {
  763. AssertIsPacket()
  764. }
  765. func RegisterPacket(cdc *amino.Codec) {
  766. cdc.RegisterInterface((*Packet)(nil), nil)
  767. cdc.RegisterConcrete(PacketPing{}, "tendermint/p2p/PacketPing", nil)
  768. cdc.RegisterConcrete(PacketPong{}, "tendermint/p2p/PacketPong", nil)
  769. cdc.RegisterConcrete(PacketMsg{}, "tendermint/p2p/PacketMsg", nil)
  770. }
  771. func (PacketPing) AssertIsPacket() {}
  772. func (PacketPong) AssertIsPacket() {}
  773. func (PacketMsg) AssertIsPacket() {}
  774. type PacketPing struct {
  775. }
  776. type PacketPong struct {
  777. }
  778. type PacketMsg struct {
  779. ChannelID byte
  780. EOF byte // 1 means message ends here.
  781. Bytes []byte
  782. }
  783. func (mp PacketMsg) String() string {
  784. return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF)
  785. }