You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

903 lines
24 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "runtime/debug"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/pkg/errors"
  14. amino "github.com/tendermint/go-amino"
  15. cmn "github.com/tendermint/tendermint/libs/common"
  16. flow "github.com/tendermint/tendermint/libs/flowrate"
  17. "github.com/tendermint/tendermint/libs/log"
  18. )
  19. const (
  20. defaultMaxPacketMsgPayloadSize = 1024
  21. numBatchPacketMsgs = 10
  22. minReadBufferSize = 1024
  23. minWriteBufferSize = 65536
  24. updateStats = 2 * time.Second
  25. // some of these defaults are written in the user config
  26. // flushThrottle, sendRate, recvRate
  27. // TODO: remove values present in config
  28. defaultFlushThrottle = 100 * time.Millisecond
  29. defaultSendQueueCapacity = 1
  30. defaultRecvBufferCapacity = 4096
  31. defaultRecvMessageCapacity = 22020096 // 21MB
  32. defaultSendRate = int64(512000) // 500KB/s
  33. defaultRecvRate = int64(512000) // 500KB/s
  34. defaultSendTimeout = 10 * time.Second
  35. defaultPingInterval = 60 * time.Second
  36. defaultPongTimeout = 45 * time.Second
  37. )
  38. type receiveCbFunc func(chID byte, msgBytes []byte)
  39. type errorCbFunc func(interface{})
  40. /*
  41. Each peer has one `MConnection` (multiplex connection) instance.
  42. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  43. several messages along a single channel of communication.
  44. Each `MConnection` handles message transmission on multiple abstract communication
  45. `Channel`s. Each channel has a globally unique byte id.
  46. The byte id and the relative priorities of each `Channel` are configured upon
  47. initialization of the connection.
  48. There are two methods for sending messages:
  49. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  50. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  51. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  52. successfully queued for the channel with the given id byte `chID`, or until the
  53. request times out. The message `msg` is serialized using Go-Amino.
  54. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  55. channel's queue is full.
  56. Inbound message bytes are handled with an onReceive callback function.
  57. */
  58. type MConnection struct {
  59. cmn.BaseService
  60. conn net.Conn
  61. bufConnReader *bufio.Reader
  62. bufConnWriter *bufio.Writer
  63. sendMonitor *flow.Monitor
  64. recvMonitor *flow.Monitor
  65. send chan struct{}
  66. pong chan struct{}
  67. channels []*Channel
  68. channelsIdx map[byte]*Channel
  69. onReceive receiveCbFunc
  70. onError errorCbFunc
  71. errored uint32
  72. config MConnConfig
  73. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  74. // doneSendRoutine is closed when the sendRoutine actually quits.
  75. quitSendRoutine chan struct{}
  76. doneSendRoutine chan struct{}
  77. // Closing quitRecvRouting will cause the recvRouting to eventually quit.
  78. quitRecvRoutine chan struct{}
  79. // used to ensure FlushStop and OnStop
  80. // are safe to call concurrently.
  81. stopMtx sync.Mutex
  82. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  83. pingTimer *time.Ticker // send pings periodically
  84. // close conn if pong is not received in pongTimeout
  85. pongTimer *time.Timer
  86. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  87. chStatsTimer *time.Ticker // update channel stats periodically
  88. created time.Time // time of creation
  89. _maxPacketMsgSize int
  90. }
  91. // MConnConfig is a MConnection configuration.
  92. type MConnConfig struct {
  93. SendRate int64 `mapstructure:"send_rate"`
  94. RecvRate int64 `mapstructure:"recv_rate"`
  95. // Maximum payload size
  96. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  97. // Interval to flush writes (throttled)
  98. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  99. // Interval to send pings
  100. PingInterval time.Duration `mapstructure:"ping_interval"`
  101. // Maximum wait time for pongs
  102. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  103. }
  104. // DefaultMConnConfig returns the default config.
  105. func DefaultMConnConfig() MConnConfig {
  106. return MConnConfig{
  107. SendRate: defaultSendRate,
  108. RecvRate: defaultRecvRate,
  109. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  110. FlushThrottle: defaultFlushThrottle,
  111. PingInterval: defaultPingInterval,
  112. PongTimeout: defaultPongTimeout,
  113. }
  114. }
  115. // NewMConnection wraps net.Conn and creates multiplex connection
  116. func NewMConnection(
  117. conn net.Conn,
  118. chDescs []*ChannelDescriptor,
  119. onReceive receiveCbFunc,
  120. onError errorCbFunc,
  121. ) *MConnection {
  122. return NewMConnectionWithConfig(
  123. conn,
  124. chDescs,
  125. onReceive,
  126. onError,
  127. DefaultMConnConfig())
  128. }
  129. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  130. func NewMConnectionWithConfig(
  131. conn net.Conn,
  132. chDescs []*ChannelDescriptor,
  133. onReceive receiveCbFunc,
  134. onError errorCbFunc,
  135. config MConnConfig,
  136. ) *MConnection {
  137. if config.PongTimeout >= config.PingInterval {
  138. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  139. }
  140. mconn := &MConnection{
  141. conn: conn,
  142. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  143. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  144. sendMonitor: flow.New(0, 0),
  145. recvMonitor: flow.New(0, 0),
  146. send: make(chan struct{}, 1),
  147. pong: make(chan struct{}, 1),
  148. onReceive: onReceive,
  149. onError: onError,
  150. config: config,
  151. created: time.Now(),
  152. }
  153. // Create channels
  154. var channelsIdx = map[byte]*Channel{}
  155. var channels = []*Channel{}
  156. for _, desc := range chDescs {
  157. channel := newChannel(mconn, *desc)
  158. channelsIdx[channel.desc.ID] = channel
  159. channels = append(channels, channel)
  160. }
  161. mconn.channels = channels
  162. mconn.channelsIdx = channelsIdx
  163. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  164. // maxPacketMsgSize() is a bit heavy, so call just once
  165. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  166. return mconn
  167. }
  168. func (c *MConnection) SetLogger(l log.Logger) {
  169. c.BaseService.SetLogger(l)
  170. for _, ch := range c.channels {
  171. ch.SetLogger(l)
  172. }
  173. }
  174. // OnStart implements BaseService
  175. func (c *MConnection) OnStart() error {
  176. if err := c.BaseService.OnStart(); err != nil {
  177. return err
  178. }
  179. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
  180. c.pingTimer = time.NewTicker(c.config.PingInterval)
  181. c.pongTimeoutCh = make(chan bool, 1)
  182. c.chStatsTimer = time.NewTicker(updateStats)
  183. c.quitSendRoutine = make(chan struct{})
  184. c.doneSendRoutine = make(chan struct{})
  185. c.quitRecvRoutine = make(chan struct{})
  186. go c.sendRoutine()
  187. go c.recvRoutine()
  188. return nil
  189. }
  190. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  191. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  192. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  193. func (c *MConnection) stopServices() (alreadyStopped bool) {
  194. c.stopMtx.Lock()
  195. defer c.stopMtx.Unlock()
  196. select {
  197. case <-c.quitSendRoutine:
  198. // already quit
  199. return true
  200. default:
  201. }
  202. select {
  203. case <-c.quitRecvRoutine:
  204. // already quit
  205. return true
  206. default:
  207. }
  208. c.BaseService.OnStop()
  209. c.flushTimer.Stop()
  210. c.pingTimer.Stop()
  211. c.chStatsTimer.Stop()
  212. // inform the recvRouting that we are shutting down
  213. close(c.quitRecvRoutine)
  214. close(c.quitSendRoutine)
  215. return false
  216. }
  217. // FlushStop replicates the logic of OnStop.
  218. // It additionally ensures that all successful
  219. // .Send() calls will get flushed before closing
  220. // the connection.
  221. func (c *MConnection) FlushStop() {
  222. if c.stopServices() {
  223. return
  224. }
  225. // this block is unique to FlushStop
  226. {
  227. // wait until the sendRoutine exits
  228. // so we dont race on calling sendSomePacketMsgs
  229. <-c.doneSendRoutine
  230. // Send and flush all pending msgs.
  231. // Since sendRoutine has exited, we can call this
  232. // safely
  233. eof := c.sendSomePacketMsgs()
  234. for !eof {
  235. eof = c.sendSomePacketMsgs()
  236. }
  237. c.flush()
  238. // Now we can close the connection
  239. }
  240. c.conn.Close() // nolint: errcheck
  241. // We can't close pong safely here because
  242. // recvRoutine may write to it after we've stopped.
  243. // Though it doesn't need to get closed at all,
  244. // we close it @ recvRoutine.
  245. // c.Stop()
  246. }
  247. // OnStop implements BaseService
  248. func (c *MConnection) OnStop() {
  249. if c.stopServices() {
  250. return
  251. }
  252. c.conn.Close() // nolint: errcheck
  253. // We can't close pong safely here because
  254. // recvRoutine may write to it after we've stopped.
  255. // Though it doesn't need to get closed at all,
  256. // we close it @ recvRoutine.
  257. }
  258. func (c *MConnection) String() string {
  259. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  260. }
  261. func (c *MConnection) flush() {
  262. c.Logger.Debug("Flush", "conn", c)
  263. err := c.bufConnWriter.Flush()
  264. if err != nil {
  265. c.Logger.Error("MConnection flush failed", "err", err)
  266. }
  267. }
  268. // Catch panics, usually caused by remote disconnects.
  269. func (c *MConnection) _recover() {
  270. if r := recover(); r != nil {
  271. c.Logger.Error("MConnection panicked", "err", r, "stack", string(debug.Stack()))
  272. c.stopForError(errors.Errorf("recovered from panic: %v", r))
  273. }
  274. }
  275. func (c *MConnection) stopForError(r interface{}) {
  276. c.Stop()
  277. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  278. if c.onError != nil {
  279. c.onError(r)
  280. }
  281. }
  282. }
  283. // Queues a message to be sent to channel.
  284. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  285. if !c.IsRunning() {
  286. return false
  287. }
  288. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  289. // Send message to channel.
  290. channel, ok := c.channelsIdx[chID]
  291. if !ok {
  292. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  293. return false
  294. }
  295. success := channel.sendBytes(msgBytes)
  296. if success {
  297. // Wake up sendRoutine if necessary
  298. select {
  299. case c.send <- struct{}{}:
  300. default:
  301. }
  302. } else {
  303. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  304. }
  305. return success
  306. }
  307. // Queues a message to be sent to channel.
  308. // Nonblocking, returns true if successful.
  309. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  310. if !c.IsRunning() {
  311. return false
  312. }
  313. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  314. // Send message to channel.
  315. channel, ok := c.channelsIdx[chID]
  316. if !ok {
  317. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  318. return false
  319. }
  320. ok = channel.trySendBytes(msgBytes)
  321. if ok {
  322. // Wake up sendRoutine if necessary
  323. select {
  324. case c.send <- struct{}{}:
  325. default:
  326. }
  327. }
  328. return ok
  329. }
  330. // CanSend returns true if you can send more data onto the chID, false
  331. // otherwise. Use only as a heuristic.
  332. func (c *MConnection) CanSend(chID byte) bool {
  333. if !c.IsRunning() {
  334. return false
  335. }
  336. channel, ok := c.channelsIdx[chID]
  337. if !ok {
  338. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  339. return false
  340. }
  341. return channel.canSend()
  342. }
  343. // sendRoutine polls for packets to send from channels.
  344. func (c *MConnection) sendRoutine() {
  345. defer c._recover()
  346. FOR_LOOP:
  347. for {
  348. var _n int64
  349. var err error
  350. SELECTION:
  351. select {
  352. case <-c.flushTimer.Ch:
  353. // NOTE: flushTimer.Set() must be called every time
  354. // something is written to .bufConnWriter.
  355. c.flush()
  356. case <-c.chStatsTimer.C:
  357. for _, channel := range c.channels {
  358. channel.updateStats()
  359. }
  360. case <-c.pingTimer.C:
  361. c.Logger.Debug("Send Ping")
  362. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
  363. if err != nil {
  364. break SELECTION
  365. }
  366. c.sendMonitor.Update(int(_n))
  367. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  368. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  369. select {
  370. case c.pongTimeoutCh <- true:
  371. default:
  372. }
  373. })
  374. c.flush()
  375. case timeout := <-c.pongTimeoutCh:
  376. if timeout {
  377. c.Logger.Debug("Pong timeout")
  378. err = errors.New("pong timeout")
  379. } else {
  380. c.stopPongTimer()
  381. }
  382. case <-c.pong:
  383. c.Logger.Debug("Send Pong")
  384. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{})
  385. if err != nil {
  386. break SELECTION
  387. }
  388. c.sendMonitor.Update(int(_n))
  389. c.flush()
  390. case <-c.quitSendRoutine:
  391. break FOR_LOOP
  392. case <-c.send:
  393. // Send some PacketMsgs
  394. eof := c.sendSomePacketMsgs()
  395. if !eof {
  396. // Keep sendRoutine awake.
  397. select {
  398. case c.send <- struct{}{}:
  399. default:
  400. }
  401. }
  402. }
  403. if !c.IsRunning() {
  404. break FOR_LOOP
  405. }
  406. if err != nil {
  407. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  408. c.stopForError(err)
  409. break FOR_LOOP
  410. }
  411. }
  412. // Cleanup
  413. c.stopPongTimer()
  414. close(c.doneSendRoutine)
  415. }
  416. // Returns true if messages from channels were exhausted.
  417. // Blocks in accordance to .sendMonitor throttling.
  418. func (c *MConnection) sendSomePacketMsgs() bool {
  419. // Block until .sendMonitor says we can write.
  420. // Once we're ready we send more than we asked for,
  421. // but amortized it should even out.
  422. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  423. // Now send some PacketMsgs.
  424. for i := 0; i < numBatchPacketMsgs; i++ {
  425. if c.sendPacketMsg() {
  426. return true
  427. }
  428. }
  429. return false
  430. }
  431. // Returns true if messages from channels were exhausted.
  432. func (c *MConnection) sendPacketMsg() bool {
  433. // Choose a channel to create a PacketMsg from.
  434. // The chosen channel will be the one whose recentlySent/priority is the least.
  435. var leastRatio float32 = math.MaxFloat32
  436. var leastChannel *Channel
  437. for _, channel := range c.channels {
  438. // If nothing to send, skip this channel
  439. if !channel.isSendPending() {
  440. continue
  441. }
  442. // Get ratio, and keep track of lowest ratio.
  443. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  444. if ratio < leastRatio {
  445. leastRatio = ratio
  446. leastChannel = channel
  447. }
  448. }
  449. // Nothing to send?
  450. if leastChannel == nil {
  451. return true
  452. }
  453. // c.Logger.Info("Found a msgPacket to send")
  454. // Make & send a PacketMsg from this channel
  455. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  456. if err != nil {
  457. c.Logger.Error("Failed to write PacketMsg", "err", err)
  458. c.stopForError(err)
  459. return true
  460. }
  461. c.sendMonitor.Update(int(_n))
  462. c.flushTimer.Set()
  463. return false
  464. }
  465. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  466. // After a whole message has been assembled, it's pushed to onReceive().
  467. // Blocks depending on how the connection is throttled.
  468. // Otherwise, it never blocks.
  469. func (c *MConnection) recvRoutine() {
  470. defer c._recover()
  471. FOR_LOOP:
  472. for {
  473. // Block until .recvMonitor says we can read.
  474. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  475. // Peek into bufConnReader for debugging
  476. /*
  477. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  478. bz, err := c.bufConnReader.Peek(cmn.MinInt(numBytes, 100))
  479. if err == nil {
  480. // return
  481. } else {
  482. c.Logger.Debug("Error peeking connection buffer", "err", err)
  483. // return nil
  484. }
  485. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  486. }
  487. */
  488. // Read packet type
  489. var packet Packet
  490. var _n int64
  491. var err error
  492. _n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize))
  493. c.recvMonitor.Update(int(_n))
  494. if err != nil {
  495. // stopServices was invoked and we are shutting down
  496. // receiving is excpected to fail since we will close the connection
  497. select {
  498. case <-c.quitRecvRoutine:
  499. break FOR_LOOP
  500. default:
  501. }
  502. if c.IsRunning() {
  503. if err == io.EOF {
  504. c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
  505. } else {
  506. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  507. }
  508. c.stopForError(err)
  509. }
  510. break FOR_LOOP
  511. }
  512. // Read more depending on packet type.
  513. switch pkt := packet.(type) {
  514. case PacketPing:
  515. // TODO: prevent abuse, as they cause flush()'s.
  516. // https://github.com/tendermint/tendermint/issues/1190
  517. c.Logger.Debug("Receive Ping")
  518. select {
  519. case c.pong <- struct{}{}:
  520. default:
  521. // never block
  522. }
  523. case PacketPong:
  524. c.Logger.Debug("Receive Pong")
  525. select {
  526. case c.pongTimeoutCh <- false:
  527. default:
  528. // never block
  529. }
  530. case PacketMsg:
  531. channel, ok := c.channelsIdx[pkt.ChannelID]
  532. if !ok || channel == nil {
  533. err := fmt.Errorf("unknown channel %X", pkt.ChannelID)
  534. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  535. c.stopForError(err)
  536. break FOR_LOOP
  537. }
  538. msgBytes, err := channel.recvPacketMsg(pkt)
  539. if err != nil {
  540. if c.IsRunning() {
  541. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  542. c.stopForError(err)
  543. }
  544. break FOR_LOOP
  545. }
  546. if msgBytes != nil {
  547. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  548. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  549. c.onReceive(pkt.ChannelID, msgBytes)
  550. }
  551. default:
  552. err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
  553. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  554. c.stopForError(err)
  555. break FOR_LOOP
  556. }
  557. }
  558. // Cleanup
  559. close(c.pong)
  560. for range c.pong {
  561. // Drain
  562. }
  563. }
  564. // not goroutine-safe
  565. func (c *MConnection) stopPongTimer() {
  566. if c.pongTimer != nil {
  567. _ = c.pongTimer.Stop()
  568. c.pongTimer = nil
  569. }
  570. }
  571. // maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead
  572. // of amino encoding.
  573. func (c *MConnection) maxPacketMsgSize() int {
  574. return len(cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{
  575. ChannelID: 0x01,
  576. EOF: 1,
  577. Bytes: make([]byte, c.config.MaxPacketMsgPayloadSize),
  578. })) + 10 // leave room for changes in amino
  579. }
  580. type ConnectionStatus struct {
  581. Duration time.Duration
  582. SendMonitor flow.Status
  583. RecvMonitor flow.Status
  584. Channels []ChannelStatus
  585. }
  586. type ChannelStatus struct {
  587. ID byte
  588. SendQueueCapacity int
  589. SendQueueSize int
  590. Priority int
  591. RecentlySent int64
  592. }
  593. func (c *MConnection) Status() ConnectionStatus {
  594. var status ConnectionStatus
  595. status.Duration = time.Since(c.created)
  596. status.SendMonitor = c.sendMonitor.Status()
  597. status.RecvMonitor = c.recvMonitor.Status()
  598. status.Channels = make([]ChannelStatus, len(c.channels))
  599. for i, channel := range c.channels {
  600. status.Channels[i] = ChannelStatus{
  601. ID: channel.desc.ID,
  602. SendQueueCapacity: cap(channel.sendQueue),
  603. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  604. Priority: channel.desc.Priority,
  605. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  606. }
  607. }
  608. return status
  609. }
  610. //-----------------------------------------------------------------------------
  611. type ChannelDescriptor struct {
  612. ID byte
  613. Priority int
  614. SendQueueCapacity int
  615. RecvBufferCapacity int
  616. RecvMessageCapacity int
  617. }
  618. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  619. if chDesc.SendQueueCapacity == 0 {
  620. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  621. }
  622. if chDesc.RecvBufferCapacity == 0 {
  623. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  624. }
  625. if chDesc.RecvMessageCapacity == 0 {
  626. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  627. }
  628. filled = chDesc
  629. return
  630. }
  631. // TODO: lowercase.
  632. // NOTE: not goroutine-safe.
  633. type Channel struct {
  634. conn *MConnection
  635. desc ChannelDescriptor
  636. sendQueue chan []byte
  637. sendQueueSize int32 // atomic.
  638. recving []byte
  639. sending []byte
  640. recentlySent int64 // exponential moving average
  641. maxPacketMsgPayloadSize int
  642. Logger log.Logger
  643. }
  644. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  645. desc = desc.FillDefaults()
  646. if desc.Priority <= 0 {
  647. panic("Channel default priority must be a positive integer")
  648. }
  649. return &Channel{
  650. conn: conn,
  651. desc: desc,
  652. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  653. recving: make([]byte, 0, desc.RecvBufferCapacity),
  654. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  655. }
  656. }
  657. func (ch *Channel) SetLogger(l log.Logger) {
  658. ch.Logger = l
  659. }
  660. // Queues message to send to this channel.
  661. // Goroutine-safe
  662. // Times out (and returns false) after defaultSendTimeout
  663. func (ch *Channel) sendBytes(bytes []byte) bool {
  664. select {
  665. case ch.sendQueue <- bytes:
  666. atomic.AddInt32(&ch.sendQueueSize, 1)
  667. return true
  668. case <-time.After(defaultSendTimeout):
  669. return false
  670. }
  671. }
  672. // Queues message to send to this channel.
  673. // Nonblocking, returns true if successful.
  674. // Goroutine-safe
  675. func (ch *Channel) trySendBytes(bytes []byte) bool {
  676. select {
  677. case ch.sendQueue <- bytes:
  678. atomic.AddInt32(&ch.sendQueueSize, 1)
  679. return true
  680. default:
  681. return false
  682. }
  683. }
  684. // Goroutine-safe
  685. func (ch *Channel) loadSendQueueSize() (size int) {
  686. return int(atomic.LoadInt32(&ch.sendQueueSize))
  687. }
  688. // Goroutine-safe
  689. // Use only as a heuristic.
  690. func (ch *Channel) canSend() bool {
  691. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  692. }
  693. // Returns true if any PacketMsgs are pending to be sent.
  694. // Call before calling nextPacketMsg()
  695. // Goroutine-safe
  696. func (ch *Channel) isSendPending() bool {
  697. if len(ch.sending) == 0 {
  698. if len(ch.sendQueue) == 0 {
  699. return false
  700. }
  701. ch.sending = <-ch.sendQueue
  702. }
  703. return true
  704. }
  705. // Creates a new PacketMsg to send.
  706. // Not goroutine-safe
  707. func (ch *Channel) nextPacketMsg() PacketMsg {
  708. packet := PacketMsg{}
  709. packet.ChannelID = ch.desc.ID
  710. maxSize := ch.maxPacketMsgPayloadSize
  711. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  712. if len(ch.sending) <= maxSize {
  713. packet.EOF = byte(0x01)
  714. ch.sending = nil
  715. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  716. } else {
  717. packet.EOF = byte(0x00)
  718. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  719. }
  720. return packet
  721. }
  722. // Writes next PacketMsg to w and updates c.recentlySent.
  723. // Not goroutine-safe
  724. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
  725. var packet = ch.nextPacketMsg()
  726. n, err = cdc.MarshalBinaryLengthPrefixedWriter(w, packet)
  727. atomic.AddInt64(&ch.recentlySent, n)
  728. return
  729. }
  730. // Handles incoming PacketMsgs. It returns a message bytes if message is
  731. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  732. // Not goroutine-safe
  733. func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
  734. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  735. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
  736. if recvCap < recvReceived {
  737. return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  738. }
  739. ch.recving = append(ch.recving, packet.Bytes...)
  740. if packet.EOF == byte(0x01) {
  741. msgBytes := ch.recving
  742. // clear the slice without re-allocating.
  743. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  744. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  745. // at which point the recving slice stops being used and should be garbage collected
  746. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  747. return msgBytes, nil
  748. }
  749. return nil, nil
  750. }
  751. // Call this periodically to update stats for throttling purposes.
  752. // Not goroutine-safe
  753. func (ch *Channel) updateStats() {
  754. // Exponential decay of stats.
  755. // TODO: optimize.
  756. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  757. }
  758. //----------------------------------------
  759. // Packet
  760. type Packet interface {
  761. AssertIsPacket()
  762. }
  763. func RegisterPacket(cdc *amino.Codec) {
  764. cdc.RegisterInterface((*Packet)(nil), nil)
  765. cdc.RegisterConcrete(PacketPing{}, "tendermint/p2p/PacketPing", nil)
  766. cdc.RegisterConcrete(PacketPong{}, "tendermint/p2p/PacketPong", nil)
  767. cdc.RegisterConcrete(PacketMsg{}, "tendermint/p2p/PacketMsg", nil)
  768. }
  769. func (PacketPing) AssertIsPacket() {}
  770. func (PacketPong) AssertIsPacket() {}
  771. func (PacketMsg) AssertIsPacket() {}
  772. type PacketPing struct {
  773. }
  774. type PacketPong struct {
  775. }
  776. type PacketMsg struct {
  777. ChannelID byte
  778. EOF byte // 1 means message ends here.
  779. Bytes []byte
  780. }
  781. func (mp PacketMsg) String() string {
  782. return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF)
  783. }