You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

904 lines
24 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "runtime/debug"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/pkg/errors"
  14. amino "github.com/tendermint/go-amino"
  15. cmn "github.com/tendermint/tendermint/libs/common"
  16. flow "github.com/tendermint/tendermint/libs/flowrate"
  17. "github.com/tendermint/tendermint/libs/log"
  18. "github.com/tendermint/tendermint/libs/service"
  19. )
  20. const (
  21. defaultMaxPacketMsgPayloadSize = 1024
  22. numBatchPacketMsgs = 10
  23. minReadBufferSize = 1024
  24. minWriteBufferSize = 65536
  25. updateStats = 2 * time.Second
  26. // some of these defaults are written in the user config
  27. // flushThrottle, sendRate, recvRate
  28. // TODO: remove values present in config
  29. defaultFlushThrottle = 100 * time.Millisecond
  30. defaultSendQueueCapacity = 1
  31. defaultRecvBufferCapacity = 4096
  32. defaultRecvMessageCapacity = 22020096 // 21MB
  33. defaultSendRate = int64(512000) // 500KB/s
  34. defaultRecvRate = int64(512000) // 500KB/s
  35. defaultSendTimeout = 10 * time.Second
  36. defaultPingInterval = 60 * time.Second
  37. defaultPongTimeout = 45 * time.Second
  38. )
  39. type receiveCbFunc func(chID byte, msgBytes []byte)
  40. type errorCbFunc func(interface{})
  41. /*
  42. Each peer has one `MConnection` (multiplex connection) instance.
  43. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  44. several messages along a single channel of communication.
  45. Each `MConnection` handles message transmission on multiple abstract communication
  46. `Channel`s. Each channel has a globally unique byte id.
  47. The byte id and the relative priorities of each `Channel` are configured upon
  48. initialization of the connection.
  49. There are two methods for sending messages:
  50. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  51. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  52. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  53. successfully queued for the channel with the given id byte `chID`, or until the
  54. request times out. The message `msg` is serialized using Go-Amino.
  55. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  56. channel's queue is full.
  57. Inbound message bytes are handled with an onReceive callback function.
  58. */
  59. type MConnection struct {
  60. service.BaseService
  61. conn net.Conn
  62. bufConnReader *bufio.Reader
  63. bufConnWriter *bufio.Writer
  64. sendMonitor *flow.Monitor
  65. recvMonitor *flow.Monitor
  66. send chan struct{}
  67. pong chan struct{}
  68. channels []*Channel
  69. channelsIdx map[byte]*Channel
  70. onReceive receiveCbFunc
  71. onError errorCbFunc
  72. errored uint32
  73. config MConnConfig
  74. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  75. // doneSendRoutine is closed when the sendRoutine actually quits.
  76. quitSendRoutine chan struct{}
  77. doneSendRoutine chan struct{}
  78. // Closing quitRecvRouting will cause the recvRouting to eventually quit.
  79. quitRecvRoutine chan struct{}
  80. // used to ensure FlushStop and OnStop
  81. // are safe to call concurrently.
  82. stopMtx sync.Mutex
  83. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  84. pingTimer *time.Ticker // send pings periodically
  85. // close conn if pong is not received in pongTimeout
  86. pongTimer *time.Timer
  87. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  88. chStatsTimer *time.Ticker // update channel stats periodically
  89. created time.Time // time of creation
  90. _maxPacketMsgSize int
  91. }
  92. // MConnConfig is a MConnection configuration.
  93. type MConnConfig struct {
  94. SendRate int64 `mapstructure:"send_rate"`
  95. RecvRate int64 `mapstructure:"recv_rate"`
  96. // Maximum payload size
  97. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  98. // Interval to flush writes (throttled)
  99. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  100. // Interval to send pings
  101. PingInterval time.Duration `mapstructure:"ping_interval"`
  102. // Maximum wait time for pongs
  103. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  104. }
  105. // DefaultMConnConfig returns the default config.
  106. func DefaultMConnConfig() MConnConfig {
  107. return MConnConfig{
  108. SendRate: defaultSendRate,
  109. RecvRate: defaultRecvRate,
  110. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  111. FlushThrottle: defaultFlushThrottle,
  112. PingInterval: defaultPingInterval,
  113. PongTimeout: defaultPongTimeout,
  114. }
  115. }
  116. // NewMConnection wraps net.Conn and creates multiplex connection
  117. func NewMConnection(
  118. conn net.Conn,
  119. chDescs []*ChannelDescriptor,
  120. onReceive receiveCbFunc,
  121. onError errorCbFunc,
  122. ) *MConnection {
  123. return NewMConnectionWithConfig(
  124. conn,
  125. chDescs,
  126. onReceive,
  127. onError,
  128. DefaultMConnConfig())
  129. }
  130. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  131. func NewMConnectionWithConfig(
  132. conn net.Conn,
  133. chDescs []*ChannelDescriptor,
  134. onReceive receiveCbFunc,
  135. onError errorCbFunc,
  136. config MConnConfig,
  137. ) *MConnection {
  138. if config.PongTimeout >= config.PingInterval {
  139. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  140. }
  141. mconn := &MConnection{
  142. conn: conn,
  143. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  144. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  145. sendMonitor: flow.New(0, 0),
  146. recvMonitor: flow.New(0, 0),
  147. send: make(chan struct{}, 1),
  148. pong: make(chan struct{}, 1),
  149. onReceive: onReceive,
  150. onError: onError,
  151. config: config,
  152. created: time.Now(),
  153. }
  154. // Create channels
  155. var channelsIdx = map[byte]*Channel{}
  156. var channels = []*Channel{}
  157. for _, desc := range chDescs {
  158. channel := newChannel(mconn, *desc)
  159. channelsIdx[channel.desc.ID] = channel
  160. channels = append(channels, channel)
  161. }
  162. mconn.channels = channels
  163. mconn.channelsIdx = channelsIdx
  164. mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn)
  165. // maxPacketMsgSize() is a bit heavy, so call just once
  166. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  167. return mconn
  168. }
  169. func (c *MConnection) SetLogger(l log.Logger) {
  170. c.BaseService.SetLogger(l)
  171. for _, ch := range c.channels {
  172. ch.SetLogger(l)
  173. }
  174. }
  175. // OnStart implements BaseService
  176. func (c *MConnection) OnStart() error {
  177. if err := c.BaseService.OnStart(); err != nil {
  178. return err
  179. }
  180. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
  181. c.pingTimer = time.NewTicker(c.config.PingInterval)
  182. c.pongTimeoutCh = make(chan bool, 1)
  183. c.chStatsTimer = time.NewTicker(updateStats)
  184. c.quitSendRoutine = make(chan struct{})
  185. c.doneSendRoutine = make(chan struct{})
  186. c.quitRecvRoutine = make(chan struct{})
  187. go c.sendRoutine()
  188. go c.recvRoutine()
  189. return nil
  190. }
  191. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  192. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  193. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  194. func (c *MConnection) stopServices() (alreadyStopped bool) {
  195. c.stopMtx.Lock()
  196. defer c.stopMtx.Unlock()
  197. select {
  198. case <-c.quitSendRoutine:
  199. // already quit
  200. return true
  201. default:
  202. }
  203. select {
  204. case <-c.quitRecvRoutine:
  205. // already quit
  206. return true
  207. default:
  208. }
  209. c.BaseService.OnStop()
  210. c.flushTimer.Stop()
  211. c.pingTimer.Stop()
  212. c.chStatsTimer.Stop()
  213. // inform the recvRouting that we are shutting down
  214. close(c.quitRecvRoutine)
  215. close(c.quitSendRoutine)
  216. return false
  217. }
  218. // FlushStop replicates the logic of OnStop.
  219. // It additionally ensures that all successful
  220. // .Send() calls will get flushed before closing
  221. // the connection.
  222. func (c *MConnection) FlushStop() {
  223. if c.stopServices() {
  224. return
  225. }
  226. // this block is unique to FlushStop
  227. {
  228. // wait until the sendRoutine exits
  229. // so we dont race on calling sendSomePacketMsgs
  230. <-c.doneSendRoutine
  231. // Send and flush all pending msgs.
  232. // Since sendRoutine has exited, we can call this
  233. // safely
  234. eof := c.sendSomePacketMsgs()
  235. for !eof {
  236. eof = c.sendSomePacketMsgs()
  237. }
  238. c.flush()
  239. // Now we can close the connection
  240. }
  241. c.conn.Close() // nolint: errcheck
  242. // We can't close pong safely here because
  243. // recvRoutine may write to it after we've stopped.
  244. // Though it doesn't need to get closed at all,
  245. // we close it @ recvRoutine.
  246. // c.Stop()
  247. }
  248. // OnStop implements BaseService
  249. func (c *MConnection) OnStop() {
  250. if c.stopServices() {
  251. return
  252. }
  253. c.conn.Close() // nolint: errcheck
  254. // We can't close pong safely here because
  255. // recvRoutine may write to it after we've stopped.
  256. // Though it doesn't need to get closed at all,
  257. // we close it @ recvRoutine.
  258. }
  259. func (c *MConnection) String() string {
  260. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  261. }
  262. func (c *MConnection) flush() {
  263. c.Logger.Debug("Flush", "conn", c)
  264. err := c.bufConnWriter.Flush()
  265. if err != nil {
  266. c.Logger.Error("MConnection flush failed", "err", err)
  267. }
  268. }
  269. // Catch panics, usually caused by remote disconnects.
  270. func (c *MConnection) _recover() {
  271. if r := recover(); r != nil {
  272. c.Logger.Error("MConnection panicked", "err", r, "stack", string(debug.Stack()))
  273. c.stopForError(errors.Errorf("recovered from panic: %v", r))
  274. }
  275. }
  276. func (c *MConnection) stopForError(r interface{}) {
  277. c.Stop()
  278. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  279. if c.onError != nil {
  280. c.onError(r)
  281. }
  282. }
  283. }
  284. // Queues a message to be sent to channel.
  285. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  286. if !c.IsRunning() {
  287. return false
  288. }
  289. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  290. // Send message to channel.
  291. channel, ok := c.channelsIdx[chID]
  292. if !ok {
  293. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  294. return false
  295. }
  296. success := channel.sendBytes(msgBytes)
  297. if success {
  298. // Wake up sendRoutine if necessary
  299. select {
  300. case c.send <- struct{}{}:
  301. default:
  302. }
  303. } else {
  304. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  305. }
  306. return success
  307. }
  308. // Queues a message to be sent to channel.
  309. // Nonblocking, returns true if successful.
  310. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  311. if !c.IsRunning() {
  312. return false
  313. }
  314. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  315. // Send message to channel.
  316. channel, ok := c.channelsIdx[chID]
  317. if !ok {
  318. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  319. return false
  320. }
  321. ok = channel.trySendBytes(msgBytes)
  322. if ok {
  323. // Wake up sendRoutine if necessary
  324. select {
  325. case c.send <- struct{}{}:
  326. default:
  327. }
  328. }
  329. return ok
  330. }
  331. // CanSend returns true if you can send more data onto the chID, false
  332. // otherwise. Use only as a heuristic.
  333. func (c *MConnection) CanSend(chID byte) bool {
  334. if !c.IsRunning() {
  335. return false
  336. }
  337. channel, ok := c.channelsIdx[chID]
  338. if !ok {
  339. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  340. return false
  341. }
  342. return channel.canSend()
  343. }
  344. // sendRoutine polls for packets to send from channels.
  345. func (c *MConnection) sendRoutine() {
  346. defer c._recover()
  347. FOR_LOOP:
  348. for {
  349. var _n int64
  350. var err error
  351. SELECTION:
  352. select {
  353. case <-c.flushTimer.Ch:
  354. // NOTE: flushTimer.Set() must be called every time
  355. // something is written to .bufConnWriter.
  356. c.flush()
  357. case <-c.chStatsTimer.C:
  358. for _, channel := range c.channels {
  359. channel.updateStats()
  360. }
  361. case <-c.pingTimer.C:
  362. c.Logger.Debug("Send Ping")
  363. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
  364. if err != nil {
  365. break SELECTION
  366. }
  367. c.sendMonitor.Update(int(_n))
  368. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  369. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  370. select {
  371. case c.pongTimeoutCh <- true:
  372. default:
  373. }
  374. })
  375. c.flush()
  376. case timeout := <-c.pongTimeoutCh:
  377. if timeout {
  378. c.Logger.Debug("Pong timeout")
  379. err = errors.New("pong timeout")
  380. } else {
  381. c.stopPongTimer()
  382. }
  383. case <-c.pong:
  384. c.Logger.Debug("Send Pong")
  385. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{})
  386. if err != nil {
  387. break SELECTION
  388. }
  389. c.sendMonitor.Update(int(_n))
  390. c.flush()
  391. case <-c.quitSendRoutine:
  392. break FOR_LOOP
  393. case <-c.send:
  394. // Send some PacketMsgs
  395. eof := c.sendSomePacketMsgs()
  396. if !eof {
  397. // Keep sendRoutine awake.
  398. select {
  399. case c.send <- struct{}{}:
  400. default:
  401. }
  402. }
  403. }
  404. if !c.IsRunning() {
  405. break FOR_LOOP
  406. }
  407. if err != nil {
  408. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  409. c.stopForError(err)
  410. break FOR_LOOP
  411. }
  412. }
  413. // Cleanup
  414. c.stopPongTimer()
  415. close(c.doneSendRoutine)
  416. }
  417. // Returns true if messages from channels were exhausted.
  418. // Blocks in accordance to .sendMonitor throttling.
  419. func (c *MConnection) sendSomePacketMsgs() bool {
  420. // Block until .sendMonitor says we can write.
  421. // Once we're ready we send more than we asked for,
  422. // but amortized it should even out.
  423. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  424. // Now send some PacketMsgs.
  425. for i := 0; i < numBatchPacketMsgs; i++ {
  426. if c.sendPacketMsg() {
  427. return true
  428. }
  429. }
  430. return false
  431. }
  432. // Returns true if messages from channels were exhausted.
  433. func (c *MConnection) sendPacketMsg() bool {
  434. // Choose a channel to create a PacketMsg from.
  435. // The chosen channel will be the one whose recentlySent/priority is the least.
  436. var leastRatio float32 = math.MaxFloat32
  437. var leastChannel *Channel
  438. for _, channel := range c.channels {
  439. // If nothing to send, skip this channel
  440. if !channel.isSendPending() {
  441. continue
  442. }
  443. // Get ratio, and keep track of lowest ratio.
  444. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  445. if ratio < leastRatio {
  446. leastRatio = ratio
  447. leastChannel = channel
  448. }
  449. }
  450. // Nothing to send?
  451. if leastChannel == nil {
  452. return true
  453. }
  454. // c.Logger.Info("Found a msgPacket to send")
  455. // Make & send a PacketMsg from this channel
  456. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  457. if err != nil {
  458. c.Logger.Error("Failed to write PacketMsg", "err", err)
  459. c.stopForError(err)
  460. return true
  461. }
  462. c.sendMonitor.Update(int(_n))
  463. c.flushTimer.Set()
  464. return false
  465. }
  466. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  467. // After a whole message has been assembled, it's pushed to onReceive().
  468. // Blocks depending on how the connection is throttled.
  469. // Otherwise, it never blocks.
  470. func (c *MConnection) recvRoutine() {
  471. defer c._recover()
  472. FOR_LOOP:
  473. for {
  474. // Block until .recvMonitor says we can read.
  475. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  476. // Peek into bufConnReader for debugging
  477. /*
  478. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  479. bz, err := c.bufConnReader.Peek(cmn.MinInt(numBytes, 100))
  480. if err == nil {
  481. // return
  482. } else {
  483. c.Logger.Debug("Error peeking connection buffer", "err", err)
  484. // return nil
  485. }
  486. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  487. }
  488. */
  489. // Read packet type
  490. var packet Packet
  491. var _n int64
  492. var err error
  493. _n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize))
  494. c.recvMonitor.Update(int(_n))
  495. if err != nil {
  496. // stopServices was invoked and we are shutting down
  497. // receiving is excpected to fail since we will close the connection
  498. select {
  499. case <-c.quitRecvRoutine:
  500. break FOR_LOOP
  501. default:
  502. }
  503. if c.IsRunning() {
  504. if err == io.EOF {
  505. c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
  506. } else {
  507. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  508. }
  509. c.stopForError(err)
  510. }
  511. break FOR_LOOP
  512. }
  513. // Read more depending on packet type.
  514. switch pkt := packet.(type) {
  515. case PacketPing:
  516. // TODO: prevent abuse, as they cause flush()'s.
  517. // https://github.com/tendermint/tendermint/issues/1190
  518. c.Logger.Debug("Receive Ping")
  519. select {
  520. case c.pong <- struct{}{}:
  521. default:
  522. // never block
  523. }
  524. case PacketPong:
  525. c.Logger.Debug("Receive Pong")
  526. select {
  527. case c.pongTimeoutCh <- false:
  528. default:
  529. // never block
  530. }
  531. case PacketMsg:
  532. channel, ok := c.channelsIdx[pkt.ChannelID]
  533. if !ok || channel == nil {
  534. err := fmt.Errorf("unknown channel %X", pkt.ChannelID)
  535. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  536. c.stopForError(err)
  537. break FOR_LOOP
  538. }
  539. msgBytes, err := channel.recvPacketMsg(pkt)
  540. if err != nil {
  541. if c.IsRunning() {
  542. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  543. c.stopForError(err)
  544. }
  545. break FOR_LOOP
  546. }
  547. if msgBytes != nil {
  548. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  549. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  550. c.onReceive(pkt.ChannelID, msgBytes)
  551. }
  552. default:
  553. err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
  554. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  555. c.stopForError(err)
  556. break FOR_LOOP
  557. }
  558. }
  559. // Cleanup
  560. close(c.pong)
  561. for range c.pong {
  562. // Drain
  563. }
  564. }
  565. // not goroutine-safe
  566. func (c *MConnection) stopPongTimer() {
  567. if c.pongTimer != nil {
  568. _ = c.pongTimer.Stop()
  569. c.pongTimer = nil
  570. }
  571. }
  572. // maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead
  573. // of amino encoding.
  574. func (c *MConnection) maxPacketMsgSize() int {
  575. return len(cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{
  576. ChannelID: 0x01,
  577. EOF: 1,
  578. Bytes: make([]byte, c.config.MaxPacketMsgPayloadSize),
  579. })) + 10 // leave room for changes in amino
  580. }
  581. type ConnectionStatus struct {
  582. Duration time.Duration
  583. SendMonitor flow.Status
  584. RecvMonitor flow.Status
  585. Channels []ChannelStatus
  586. }
  587. type ChannelStatus struct {
  588. ID byte
  589. SendQueueCapacity int
  590. SendQueueSize int
  591. Priority int
  592. RecentlySent int64
  593. }
  594. func (c *MConnection) Status() ConnectionStatus {
  595. var status ConnectionStatus
  596. status.Duration = time.Since(c.created)
  597. status.SendMonitor = c.sendMonitor.Status()
  598. status.RecvMonitor = c.recvMonitor.Status()
  599. status.Channels = make([]ChannelStatus, len(c.channels))
  600. for i, channel := range c.channels {
  601. status.Channels[i] = ChannelStatus{
  602. ID: channel.desc.ID,
  603. SendQueueCapacity: cap(channel.sendQueue),
  604. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  605. Priority: channel.desc.Priority,
  606. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  607. }
  608. }
  609. return status
  610. }
  611. //-----------------------------------------------------------------------------
  612. type ChannelDescriptor struct {
  613. ID byte
  614. Priority int
  615. SendQueueCapacity int
  616. RecvBufferCapacity int
  617. RecvMessageCapacity int
  618. }
  619. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  620. if chDesc.SendQueueCapacity == 0 {
  621. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  622. }
  623. if chDesc.RecvBufferCapacity == 0 {
  624. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  625. }
  626. if chDesc.RecvMessageCapacity == 0 {
  627. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  628. }
  629. filled = chDesc
  630. return
  631. }
  632. // TODO: lowercase.
  633. // NOTE: not goroutine-safe.
  634. type Channel struct {
  635. conn *MConnection
  636. desc ChannelDescriptor
  637. sendQueue chan []byte
  638. sendQueueSize int32 // atomic.
  639. recving []byte
  640. sending []byte
  641. recentlySent int64 // exponential moving average
  642. maxPacketMsgPayloadSize int
  643. Logger log.Logger
  644. }
  645. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  646. desc = desc.FillDefaults()
  647. if desc.Priority <= 0 {
  648. panic("Channel default priority must be a positive integer")
  649. }
  650. return &Channel{
  651. conn: conn,
  652. desc: desc,
  653. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  654. recving: make([]byte, 0, desc.RecvBufferCapacity),
  655. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  656. }
  657. }
  658. func (ch *Channel) SetLogger(l log.Logger) {
  659. ch.Logger = l
  660. }
  661. // Queues message to send to this channel.
  662. // Goroutine-safe
  663. // Times out (and returns false) after defaultSendTimeout
  664. func (ch *Channel) sendBytes(bytes []byte) bool {
  665. select {
  666. case ch.sendQueue <- bytes:
  667. atomic.AddInt32(&ch.sendQueueSize, 1)
  668. return true
  669. case <-time.After(defaultSendTimeout):
  670. return false
  671. }
  672. }
  673. // Queues message to send to this channel.
  674. // Nonblocking, returns true if successful.
  675. // Goroutine-safe
  676. func (ch *Channel) trySendBytes(bytes []byte) bool {
  677. select {
  678. case ch.sendQueue <- bytes:
  679. atomic.AddInt32(&ch.sendQueueSize, 1)
  680. return true
  681. default:
  682. return false
  683. }
  684. }
  685. // Goroutine-safe
  686. func (ch *Channel) loadSendQueueSize() (size int) {
  687. return int(atomic.LoadInt32(&ch.sendQueueSize))
  688. }
  689. // Goroutine-safe
  690. // Use only as a heuristic.
  691. func (ch *Channel) canSend() bool {
  692. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  693. }
  694. // Returns true if any PacketMsgs are pending to be sent.
  695. // Call before calling nextPacketMsg()
  696. // Goroutine-safe
  697. func (ch *Channel) isSendPending() bool {
  698. if len(ch.sending) == 0 {
  699. if len(ch.sendQueue) == 0 {
  700. return false
  701. }
  702. ch.sending = <-ch.sendQueue
  703. }
  704. return true
  705. }
  706. // Creates a new PacketMsg to send.
  707. // Not goroutine-safe
  708. func (ch *Channel) nextPacketMsg() PacketMsg {
  709. packet := PacketMsg{}
  710. packet.ChannelID = ch.desc.ID
  711. maxSize := ch.maxPacketMsgPayloadSize
  712. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  713. if len(ch.sending) <= maxSize {
  714. packet.EOF = byte(0x01)
  715. ch.sending = nil
  716. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  717. } else {
  718. packet.EOF = byte(0x00)
  719. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  720. }
  721. return packet
  722. }
  723. // Writes next PacketMsg to w and updates c.recentlySent.
  724. // Not goroutine-safe
  725. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
  726. var packet = ch.nextPacketMsg()
  727. n, err = cdc.MarshalBinaryLengthPrefixedWriter(w, packet)
  728. atomic.AddInt64(&ch.recentlySent, n)
  729. return
  730. }
  731. // Handles incoming PacketMsgs. It returns a message bytes if message is
  732. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  733. // Not goroutine-safe
  734. func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
  735. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  736. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
  737. if recvCap < recvReceived {
  738. return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  739. }
  740. ch.recving = append(ch.recving, packet.Bytes...)
  741. if packet.EOF == byte(0x01) {
  742. msgBytes := ch.recving
  743. // clear the slice without re-allocating.
  744. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  745. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  746. // at which point the recving slice stops being used and should be garbage collected
  747. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  748. return msgBytes, nil
  749. }
  750. return nil, nil
  751. }
  752. // Call this periodically to update stats for throttling purposes.
  753. // Not goroutine-safe
  754. func (ch *Channel) updateStats() {
  755. // Exponential decay of stats.
  756. // TODO: optimize.
  757. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  758. }
  759. //----------------------------------------
  760. // Packet
  761. type Packet interface {
  762. AssertIsPacket()
  763. }
  764. func RegisterPacket(cdc *amino.Codec) {
  765. cdc.RegisterInterface((*Packet)(nil), nil)
  766. cdc.RegisterConcrete(PacketPing{}, "tendermint/p2p/PacketPing", nil)
  767. cdc.RegisterConcrete(PacketPong{}, "tendermint/p2p/PacketPong", nil)
  768. cdc.RegisterConcrete(PacketMsg{}, "tendermint/p2p/PacketMsg", nil)
  769. }
  770. func (PacketPing) AssertIsPacket() {}
  771. func (PacketPong) AssertIsPacket() {}
  772. func (PacketMsg) AssertIsPacket() {}
  773. type PacketPing struct {
  774. }
  775. type PacketPong struct {
  776. }
  777. type PacketMsg struct {
  778. ChannelID byte
  779. EOF byte // 1 means message ends here.
  780. Bytes []byte
  781. }
  782. func (mp PacketMsg) String() string {
  783. return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF)
  784. }