You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

865 lines
23 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. amino "github.com/tendermint/go-amino"
  14. cmn "github.com/tendermint/tendermint/libs/common"
  15. flow "github.com/tendermint/tendermint/libs/flowrate"
  16. "github.com/tendermint/tendermint/libs/log"
  17. )
  18. const (
  19. defaultMaxPacketMsgPayloadSize = 1024
  20. numBatchPacketMsgs = 10
  21. minReadBufferSize = 1024
  22. minWriteBufferSize = 65536
  23. updateStats = 2 * time.Second
  24. // some of these defaults are written in the user config
  25. // flushThrottle, sendRate, recvRate
  26. // TODO: remove values present in config
  27. defaultFlushThrottle = 100 * time.Millisecond
  28. defaultSendQueueCapacity = 1
  29. defaultRecvBufferCapacity = 4096
  30. defaultRecvMessageCapacity = 22020096 // 21MB
  31. defaultSendRate = int64(512000) // 500KB/s
  32. defaultRecvRate = int64(512000) // 500KB/s
  33. defaultSendTimeout = 10 * time.Second
  34. defaultPingInterval = 60 * time.Second
  35. defaultPongTimeout = 45 * time.Second
  36. )
  37. type receiveCbFunc func(chID byte, msgBytes []byte)
  38. type errorCbFunc func(interface{})
  39. /*
  40. Each peer has one `MConnection` (multiplex connection) instance.
  41. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  42. several messages along a single channel of communication.
  43. Each `MConnection` handles message transmission on multiple abstract communication
  44. `Channel`s. Each channel has a globally unique byte id.
  45. The byte id and the relative priorities of each `Channel` are configured upon
  46. initialization of the connection.
  47. There are two methods for sending messages:
  48. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  49. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  50. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  51. successfully queued for the channel with the given id byte `chID`, or until the
  52. request times out. The message `msg` is serialized using Go-Amino.
  53. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  54. channel's queue is full.
  55. Inbound message bytes are handled with an onReceive callback function.
  56. */
  57. type MConnection struct {
  58. cmn.BaseService
  59. conn net.Conn
  60. bufConnReader *bufio.Reader
  61. bufConnWriter *bufio.Writer
  62. sendMonitor *flow.Monitor
  63. recvMonitor *flow.Monitor
  64. send chan struct{}
  65. pong chan struct{}
  66. channels []*Channel
  67. channelsIdx map[byte]*Channel
  68. onReceive receiveCbFunc
  69. onError errorCbFunc
  70. errored uint32
  71. config MConnConfig
  72. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  73. // doneSendRoutine is closed when the sendRoutine actually quits.
  74. quitSendRoutine chan struct{}
  75. doneSendRoutine chan struct{}
  76. // used to ensure FlushStop and OnStop
  77. // are safe to call concurrently.
  78. stopMtx sync.Mutex
  79. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  80. pingTimer *time.Ticker // send pings periodically
  81. // close conn if pong is not received in pongTimeout
  82. pongTimer *time.Timer
  83. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  84. chStatsTimer *time.Ticker // update channel stats periodically
  85. created time.Time // time of creation
  86. _maxPacketMsgSize int
  87. }
  88. // MConnConfig is a MConnection configuration.
  89. type MConnConfig struct {
  90. SendRate int64 `mapstructure:"send_rate"`
  91. RecvRate int64 `mapstructure:"recv_rate"`
  92. // Maximum payload size
  93. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  94. // Interval to flush writes (throttled)
  95. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  96. // Interval to send pings
  97. PingInterval time.Duration `mapstructure:"ping_interval"`
  98. // Maximum wait time for pongs
  99. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  100. }
  101. // DefaultMConnConfig returns the default config.
  102. func DefaultMConnConfig() MConnConfig {
  103. return MConnConfig{
  104. SendRate: defaultSendRate,
  105. RecvRate: defaultRecvRate,
  106. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  107. FlushThrottle: defaultFlushThrottle,
  108. PingInterval: defaultPingInterval,
  109. PongTimeout: defaultPongTimeout,
  110. }
  111. }
  112. // NewMConnection wraps net.Conn and creates multiplex connection
  113. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  114. return NewMConnectionWithConfig(
  115. conn,
  116. chDescs,
  117. onReceive,
  118. onError,
  119. DefaultMConnConfig())
  120. }
  121. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  122. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig) *MConnection {
  123. if config.PongTimeout >= config.PingInterval {
  124. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  125. }
  126. mconn := &MConnection{
  127. conn: conn,
  128. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  129. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  130. sendMonitor: flow.New(0, 0),
  131. recvMonitor: flow.New(0, 0),
  132. send: make(chan struct{}, 1),
  133. pong: make(chan struct{}, 1),
  134. onReceive: onReceive,
  135. onError: onError,
  136. config: config,
  137. created: time.Now(),
  138. }
  139. // Create channels
  140. var channelsIdx = map[byte]*Channel{}
  141. var channels = []*Channel{}
  142. for _, desc := range chDescs {
  143. channel := newChannel(mconn, *desc)
  144. channelsIdx[channel.desc.ID] = channel
  145. channels = append(channels, channel)
  146. }
  147. mconn.channels = channels
  148. mconn.channelsIdx = channelsIdx
  149. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  150. // maxPacketMsgSize() is a bit heavy, so call just once
  151. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  152. return mconn
  153. }
  154. func (c *MConnection) SetLogger(l log.Logger) {
  155. c.BaseService.SetLogger(l)
  156. for _, ch := range c.channels {
  157. ch.SetLogger(l)
  158. }
  159. }
  160. // OnStart implements BaseService
  161. func (c *MConnection) OnStart() error {
  162. if err := c.BaseService.OnStart(); err != nil {
  163. return err
  164. }
  165. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
  166. c.pingTimer = time.NewTicker(c.config.PingInterval)
  167. c.pongTimeoutCh = make(chan bool, 1)
  168. c.chStatsTimer = time.NewTicker(updateStats)
  169. c.quitSendRoutine = make(chan struct{})
  170. c.doneSendRoutine = make(chan struct{})
  171. go c.sendRoutine()
  172. go c.recvRoutine()
  173. return nil
  174. }
  175. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  176. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  177. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  178. func (c *MConnection) stopServices() (alreadyStopped bool) {
  179. c.stopMtx.Lock()
  180. defer c.stopMtx.Unlock()
  181. select {
  182. case <-c.quitSendRoutine:
  183. // already quit via FlushStop or OnStop
  184. return true
  185. default:
  186. }
  187. c.BaseService.OnStop()
  188. c.flushTimer.Stop()
  189. c.pingTimer.Stop()
  190. c.chStatsTimer.Stop()
  191. close(c.quitSendRoutine)
  192. return false
  193. }
  194. // FlushStop replicates the logic of OnStop.
  195. // It additionally ensures that all successful
  196. // .Send() calls will get flushed before closing
  197. // the connection.
  198. func (c *MConnection) FlushStop() {
  199. if c.stopServices() {
  200. return
  201. }
  202. // this block is unique to FlushStop
  203. {
  204. // wait until the sendRoutine exits
  205. // so we dont race on calling sendSomePacketMsgs
  206. <-c.doneSendRoutine
  207. // Send and flush all pending msgs.
  208. // By now, IsRunning == false,
  209. // so any concurrent attempts to send will fail.
  210. // Since sendRoutine has exited, we can call this
  211. // safely
  212. eof := c.sendSomePacketMsgs()
  213. for !eof {
  214. eof = c.sendSomePacketMsgs()
  215. }
  216. c.flush()
  217. // Now we can close the connection
  218. }
  219. c.conn.Close() // nolint: errcheck
  220. // We can't close pong safely here because
  221. // recvRoutine may write to it after we've stopped.
  222. // Though it doesn't need to get closed at all,
  223. // we close it @ recvRoutine.
  224. // c.Stop()
  225. }
  226. // OnStop implements BaseService
  227. func (c *MConnection) OnStop() {
  228. if c.stopServices() {
  229. return
  230. }
  231. c.conn.Close() // nolint: errcheck
  232. // We can't close pong safely here because
  233. // recvRoutine may write to it after we've stopped.
  234. // Though it doesn't need to get closed at all,
  235. // we close it @ recvRoutine.
  236. }
  237. func (c *MConnection) String() string {
  238. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  239. }
  240. func (c *MConnection) flush() {
  241. c.Logger.Debug("Flush", "conn", c)
  242. err := c.bufConnWriter.Flush()
  243. if err != nil {
  244. c.Logger.Error("MConnection flush failed", "err", err)
  245. }
  246. }
  247. // Catch panics, usually caused by remote disconnects.
  248. func (c *MConnection) _recover() {
  249. if r := recover(); r != nil {
  250. err := cmn.ErrorWrap(r, "recovered panic in MConnection")
  251. c.stopForError(err)
  252. }
  253. }
  254. func (c *MConnection) stopForError(r interface{}) {
  255. c.Stop()
  256. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  257. if c.onError != nil {
  258. c.onError(r)
  259. }
  260. }
  261. }
  262. // Queues a message to be sent to channel.
  263. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  264. if !c.IsRunning() {
  265. return false
  266. }
  267. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  268. // Send message to channel.
  269. channel, ok := c.channelsIdx[chID]
  270. if !ok {
  271. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  272. return false
  273. }
  274. success := channel.sendBytes(msgBytes)
  275. if success {
  276. // Wake up sendRoutine if necessary
  277. select {
  278. case c.send <- struct{}{}:
  279. default:
  280. }
  281. } else {
  282. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  283. }
  284. return success
  285. }
  286. // Queues a message to be sent to channel.
  287. // Nonblocking, returns true if successful.
  288. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  289. if !c.IsRunning() {
  290. return false
  291. }
  292. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  293. // Send message to channel.
  294. channel, ok := c.channelsIdx[chID]
  295. if !ok {
  296. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  297. return false
  298. }
  299. ok = channel.trySendBytes(msgBytes)
  300. if ok {
  301. // Wake up sendRoutine if necessary
  302. select {
  303. case c.send <- struct{}{}:
  304. default:
  305. }
  306. }
  307. return ok
  308. }
  309. // CanSend returns true if you can send more data onto the chID, false
  310. // otherwise. Use only as a heuristic.
  311. func (c *MConnection) CanSend(chID byte) bool {
  312. if !c.IsRunning() {
  313. return false
  314. }
  315. channel, ok := c.channelsIdx[chID]
  316. if !ok {
  317. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  318. return false
  319. }
  320. return channel.canSend()
  321. }
  322. // sendRoutine polls for packets to send from channels.
  323. func (c *MConnection) sendRoutine() {
  324. defer c._recover()
  325. FOR_LOOP:
  326. for {
  327. var _n int64
  328. var err error
  329. SELECTION:
  330. select {
  331. case <-c.flushTimer.Ch:
  332. // NOTE: flushTimer.Set() must be called every time
  333. // something is written to .bufConnWriter.
  334. c.flush()
  335. case <-c.chStatsTimer.C:
  336. for _, channel := range c.channels {
  337. channel.updateStats()
  338. }
  339. case <-c.pingTimer.C:
  340. c.Logger.Debug("Send Ping")
  341. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
  342. if err != nil {
  343. break SELECTION
  344. }
  345. c.sendMonitor.Update(int(_n))
  346. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  347. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  348. select {
  349. case c.pongTimeoutCh <- true:
  350. default:
  351. }
  352. })
  353. c.flush()
  354. case timeout := <-c.pongTimeoutCh:
  355. if timeout {
  356. c.Logger.Debug("Pong timeout")
  357. err = errors.New("pong timeout")
  358. } else {
  359. c.stopPongTimer()
  360. }
  361. case <-c.pong:
  362. c.Logger.Debug("Send Pong")
  363. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{})
  364. if err != nil {
  365. break SELECTION
  366. }
  367. c.sendMonitor.Update(int(_n))
  368. c.flush()
  369. case <-c.quitSendRoutine:
  370. break FOR_LOOP
  371. case <-c.send:
  372. // Send some PacketMsgs
  373. eof := c.sendSomePacketMsgs()
  374. if !eof {
  375. // Keep sendRoutine awake.
  376. select {
  377. case c.send <- struct{}{}:
  378. default:
  379. }
  380. }
  381. }
  382. if !c.IsRunning() {
  383. break FOR_LOOP
  384. }
  385. if err != nil {
  386. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  387. c.stopForError(err)
  388. break FOR_LOOP
  389. }
  390. }
  391. // Cleanup
  392. c.stopPongTimer()
  393. close(c.doneSendRoutine)
  394. }
  395. // Returns true if messages from channels were exhausted.
  396. // Blocks in accordance to .sendMonitor throttling.
  397. func (c *MConnection) sendSomePacketMsgs() bool {
  398. // Block until .sendMonitor says we can write.
  399. // Once we're ready we send more than we asked for,
  400. // but amortized it should even out.
  401. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  402. // Now send some PacketMsgs.
  403. for i := 0; i < numBatchPacketMsgs; i++ {
  404. if c.sendPacketMsg() {
  405. return true
  406. }
  407. }
  408. return false
  409. }
  410. // Returns true if messages from channels were exhausted.
  411. func (c *MConnection) sendPacketMsg() bool {
  412. // Choose a channel to create a PacketMsg from.
  413. // The chosen channel will be the one whose recentlySent/priority is the least.
  414. var leastRatio float32 = math.MaxFloat32
  415. var leastChannel *Channel
  416. for _, channel := range c.channels {
  417. // If nothing to send, skip this channel
  418. if !channel.isSendPending() {
  419. continue
  420. }
  421. // Get ratio, and keep track of lowest ratio.
  422. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  423. if ratio < leastRatio {
  424. leastRatio = ratio
  425. leastChannel = channel
  426. }
  427. }
  428. // Nothing to send?
  429. if leastChannel == nil {
  430. return true
  431. }
  432. // c.Logger.Info("Found a msgPacket to send")
  433. // Make & send a PacketMsg from this channel
  434. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  435. if err != nil {
  436. c.Logger.Error("Failed to write PacketMsg", "err", err)
  437. c.stopForError(err)
  438. return true
  439. }
  440. c.sendMonitor.Update(int(_n))
  441. c.flushTimer.Set()
  442. return false
  443. }
  444. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  445. // After a whole message has been assembled, it's pushed to onReceive().
  446. // Blocks depending on how the connection is throttled.
  447. // Otherwise, it never blocks.
  448. func (c *MConnection) recvRoutine() {
  449. defer c._recover()
  450. FOR_LOOP:
  451. for {
  452. // Block until .recvMonitor says we can read.
  453. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  454. // Peek into bufConnReader for debugging
  455. /*
  456. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  457. bz, err := c.bufConnReader.Peek(cmn.MinInt(numBytes, 100))
  458. if err == nil {
  459. // return
  460. } else {
  461. c.Logger.Debug("Error peeking connection buffer", "err", err)
  462. // return nil
  463. }
  464. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  465. }
  466. */
  467. // Read packet type
  468. var packet Packet
  469. var _n int64
  470. var err error
  471. _n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize))
  472. c.recvMonitor.Update(int(_n))
  473. if err != nil {
  474. if c.IsRunning() {
  475. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  476. c.stopForError(err)
  477. }
  478. break FOR_LOOP
  479. }
  480. // Read more depending on packet type.
  481. switch pkt := packet.(type) {
  482. case PacketPing:
  483. // TODO: prevent abuse, as they cause flush()'s.
  484. // https://github.com/tendermint/tendermint/issues/1190
  485. c.Logger.Debug("Receive Ping")
  486. select {
  487. case c.pong <- struct{}{}:
  488. default:
  489. // never block
  490. }
  491. case PacketPong:
  492. c.Logger.Debug("Receive Pong")
  493. select {
  494. case c.pongTimeoutCh <- false:
  495. default:
  496. // never block
  497. }
  498. case PacketMsg:
  499. channel, ok := c.channelsIdx[pkt.ChannelID]
  500. if !ok || channel == nil {
  501. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  502. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  503. c.stopForError(err)
  504. break FOR_LOOP
  505. }
  506. msgBytes, err := channel.recvPacketMsg(pkt)
  507. if err != nil {
  508. if c.IsRunning() {
  509. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  510. c.stopForError(err)
  511. }
  512. break FOR_LOOP
  513. }
  514. if msgBytes != nil {
  515. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  516. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  517. c.onReceive(pkt.ChannelID, msgBytes)
  518. }
  519. default:
  520. err := fmt.Errorf("Unknown message type %v", reflect.TypeOf(packet))
  521. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  522. c.stopForError(err)
  523. break FOR_LOOP
  524. }
  525. }
  526. // Cleanup
  527. close(c.pong)
  528. for range c.pong {
  529. // Drain
  530. }
  531. }
  532. // not goroutine-safe
  533. func (c *MConnection) stopPongTimer() {
  534. if c.pongTimer != nil {
  535. _ = c.pongTimer.Stop()
  536. c.pongTimer = nil
  537. }
  538. }
  539. // maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead
  540. // of amino encoding.
  541. func (c *MConnection) maxPacketMsgSize() int {
  542. return len(cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{
  543. ChannelID: 0x01,
  544. EOF: 1,
  545. Bytes: make([]byte, c.config.MaxPacketMsgPayloadSize),
  546. })) + 10 // leave room for changes in amino
  547. }
  548. type ConnectionStatus struct {
  549. Duration time.Duration
  550. SendMonitor flow.Status
  551. RecvMonitor flow.Status
  552. Channels []ChannelStatus
  553. }
  554. type ChannelStatus struct {
  555. ID byte
  556. SendQueueCapacity int
  557. SendQueueSize int
  558. Priority int
  559. RecentlySent int64
  560. }
  561. func (c *MConnection) Status() ConnectionStatus {
  562. var status ConnectionStatus
  563. status.Duration = time.Since(c.created)
  564. status.SendMonitor = c.sendMonitor.Status()
  565. status.RecvMonitor = c.recvMonitor.Status()
  566. status.Channels = make([]ChannelStatus, len(c.channels))
  567. for i, channel := range c.channels {
  568. status.Channels[i] = ChannelStatus{
  569. ID: channel.desc.ID,
  570. SendQueueCapacity: cap(channel.sendQueue),
  571. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  572. Priority: channel.desc.Priority,
  573. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  574. }
  575. }
  576. return status
  577. }
  578. //-----------------------------------------------------------------------------
  579. type ChannelDescriptor struct {
  580. ID byte
  581. Priority int
  582. SendQueueCapacity int
  583. RecvBufferCapacity int
  584. RecvMessageCapacity int
  585. }
  586. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  587. if chDesc.SendQueueCapacity == 0 {
  588. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  589. }
  590. if chDesc.RecvBufferCapacity == 0 {
  591. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  592. }
  593. if chDesc.RecvMessageCapacity == 0 {
  594. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  595. }
  596. filled = chDesc
  597. return
  598. }
  599. // TODO: lowercase.
  600. // NOTE: not goroutine-safe.
  601. type Channel struct {
  602. conn *MConnection
  603. desc ChannelDescriptor
  604. sendQueue chan []byte
  605. sendQueueSize int32 // atomic.
  606. recving []byte
  607. sending []byte
  608. recentlySent int64 // exponential moving average
  609. maxPacketMsgPayloadSize int
  610. Logger log.Logger
  611. }
  612. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  613. desc = desc.FillDefaults()
  614. if desc.Priority <= 0 {
  615. panic("Channel default priority must be a positive integer")
  616. }
  617. return &Channel{
  618. conn: conn,
  619. desc: desc,
  620. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  621. recving: make([]byte, 0, desc.RecvBufferCapacity),
  622. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  623. }
  624. }
  625. func (ch *Channel) SetLogger(l log.Logger) {
  626. ch.Logger = l
  627. }
  628. // Queues message to send to this channel.
  629. // Goroutine-safe
  630. // Times out (and returns false) after defaultSendTimeout
  631. func (ch *Channel) sendBytes(bytes []byte) bool {
  632. select {
  633. case ch.sendQueue <- bytes:
  634. atomic.AddInt32(&ch.sendQueueSize, 1)
  635. return true
  636. case <-time.After(defaultSendTimeout):
  637. return false
  638. }
  639. }
  640. // Queues message to send to this channel.
  641. // Nonblocking, returns true if successful.
  642. // Goroutine-safe
  643. func (ch *Channel) trySendBytes(bytes []byte) bool {
  644. select {
  645. case ch.sendQueue <- bytes:
  646. atomic.AddInt32(&ch.sendQueueSize, 1)
  647. return true
  648. default:
  649. return false
  650. }
  651. }
  652. // Goroutine-safe
  653. func (ch *Channel) loadSendQueueSize() (size int) {
  654. return int(atomic.LoadInt32(&ch.sendQueueSize))
  655. }
  656. // Goroutine-safe
  657. // Use only as a heuristic.
  658. func (ch *Channel) canSend() bool {
  659. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  660. }
  661. // Returns true if any PacketMsgs are pending to be sent.
  662. // Call before calling nextPacketMsg()
  663. // Goroutine-safe
  664. func (ch *Channel) isSendPending() bool {
  665. if len(ch.sending) == 0 {
  666. if len(ch.sendQueue) == 0 {
  667. return false
  668. }
  669. ch.sending = <-ch.sendQueue
  670. }
  671. return true
  672. }
  673. // Creates a new PacketMsg to send.
  674. // Not goroutine-safe
  675. func (ch *Channel) nextPacketMsg() PacketMsg {
  676. packet := PacketMsg{}
  677. packet.ChannelID = byte(ch.desc.ID)
  678. maxSize := ch.maxPacketMsgPayloadSize
  679. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  680. if len(ch.sending) <= maxSize {
  681. packet.EOF = byte(0x01)
  682. ch.sending = nil
  683. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  684. } else {
  685. packet.EOF = byte(0x00)
  686. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  687. }
  688. return packet
  689. }
  690. // Writes next PacketMsg to w and updates c.recentlySent.
  691. // Not goroutine-safe
  692. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
  693. var packet = ch.nextPacketMsg()
  694. n, err = cdc.MarshalBinaryLengthPrefixedWriter(w, packet)
  695. atomic.AddInt64(&ch.recentlySent, n)
  696. return
  697. }
  698. // Handles incoming PacketMsgs. It returns a message bytes if message is
  699. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  700. // Not goroutine-safe
  701. func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
  702. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  703. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
  704. if recvCap < recvReceived {
  705. return nil, fmt.Errorf("Received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  706. }
  707. ch.recving = append(ch.recving, packet.Bytes...)
  708. if packet.EOF == byte(0x01) {
  709. msgBytes := ch.recving
  710. // clear the slice without re-allocating.
  711. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  712. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  713. // at which point the recving slice stops being used and should be garbage collected
  714. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  715. return msgBytes, nil
  716. }
  717. return nil, nil
  718. }
  719. // Call this periodically to update stats for throttling purposes.
  720. // Not goroutine-safe
  721. func (ch *Channel) updateStats() {
  722. // Exponential decay of stats.
  723. // TODO: optimize.
  724. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  725. }
  726. //----------------------------------------
  727. // Packet
  728. type Packet interface {
  729. AssertIsPacket()
  730. }
  731. func RegisterPacket(cdc *amino.Codec) {
  732. cdc.RegisterInterface((*Packet)(nil), nil)
  733. cdc.RegisterConcrete(PacketPing{}, "tendermint/p2p/PacketPing", nil)
  734. cdc.RegisterConcrete(PacketPong{}, "tendermint/p2p/PacketPong", nil)
  735. cdc.RegisterConcrete(PacketMsg{}, "tendermint/p2p/PacketMsg", nil)
  736. }
  737. func (_ PacketPing) AssertIsPacket() {}
  738. func (_ PacketPong) AssertIsPacket() {}
  739. func (_ PacketMsg) AssertIsPacket() {}
  740. type PacketPing struct {
  741. }
  742. type PacketPong struct {
  743. }
  744. type PacketMsg struct {
  745. ChannelID byte
  746. EOF byte // 1 means message ends here.
  747. Bytes []byte
  748. }
  749. func (mp PacketMsg) String() string {
  750. return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF)
  751. }