You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

892 lines
24 KiB

  1. package conn
  2. import (
  3. "bufio"
  4. "runtime/debug"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/pkg/errors"
  14. amino "github.com/tendermint/go-amino"
  15. cmn "github.com/tendermint/tendermint/libs/common"
  16. flow "github.com/tendermint/tendermint/libs/flowrate"
  17. "github.com/tendermint/tendermint/libs/log"
  18. )
  19. const (
  20. defaultMaxPacketMsgPayloadSize = 1024
  21. numBatchPacketMsgs = 10
  22. minReadBufferSize = 1024
  23. minWriteBufferSize = 65536
  24. updateStats = 2 * time.Second
  25. // some of these defaults are written in the user config
  26. // flushThrottle, sendRate, recvRate
  27. // TODO: remove values present in config
  28. defaultFlushThrottle = 100 * time.Millisecond
  29. defaultSendQueueCapacity = 1
  30. defaultRecvBufferCapacity = 4096
  31. defaultRecvMessageCapacity = 22020096 // 21MB
  32. defaultSendRate = int64(512000) // 500KB/s
  33. defaultRecvRate = int64(512000) // 500KB/s
  34. defaultSendTimeout = 10 * time.Second
  35. defaultPingInterval = 60 * time.Second
  36. defaultPongTimeout = 45 * time.Second
  37. )
  38. type receiveCbFunc func(chID byte, msgBytes []byte)
  39. type errorCbFunc func(interface{})
  40. /*
  41. Each peer has one `MConnection` (multiplex connection) instance.
  42. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  43. several messages along a single channel of communication.
  44. Each `MConnection` handles message transmission on multiple abstract communication
  45. `Channel`s. Each channel has a globally unique byte id.
  46. The byte id and the relative priorities of each `Channel` are configured upon
  47. initialization of the connection.
  48. There are two methods for sending messages:
  49. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  50. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  51. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  52. successfully queued for the channel with the given id byte `chID`, or until the
  53. request times out. The message `msg` is serialized using Go-Amino.
  54. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  55. channel's queue is full.
  56. Inbound message bytes are handled with an onReceive callback function.
  57. */
  58. type MConnection struct {
  59. cmn.BaseService
  60. conn net.Conn
  61. bufConnReader *bufio.Reader
  62. bufConnWriter *bufio.Writer
  63. sendMonitor *flow.Monitor
  64. recvMonitor *flow.Monitor
  65. send chan struct{}
  66. pong chan struct{}
  67. channels []*Channel
  68. channelsIdx map[byte]*Channel
  69. onReceive receiveCbFunc
  70. onError errorCbFunc
  71. errored uint32
  72. config MConnConfig
  73. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  74. // doneSendRoutine is closed when the sendRoutine actually quits.
  75. quitSendRoutine chan struct{}
  76. doneSendRoutine chan struct{}
  77. // Closing quitRecvRouting will cause the recvRouting to eventually quit.
  78. quitRecvRoutine chan struct{}
  79. // used to ensure FlushStop and OnStop
  80. // are safe to call concurrently.
  81. stopMtx sync.Mutex
  82. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  83. pingTimer *time.Ticker // send pings periodically
  84. // close conn if pong is not received in pongTimeout
  85. pongTimer *time.Timer
  86. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  87. chStatsTimer *time.Ticker // update channel stats periodically
  88. created time.Time // time of creation
  89. _maxPacketMsgSize int
  90. }
  91. // MConnConfig is a MConnection configuration.
  92. type MConnConfig struct {
  93. SendRate int64 `mapstructure:"send_rate"`
  94. RecvRate int64 `mapstructure:"recv_rate"`
  95. // Maximum payload size
  96. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  97. // Interval to flush writes (throttled)
  98. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  99. // Interval to send pings
  100. PingInterval time.Duration `mapstructure:"ping_interval"`
  101. // Maximum wait time for pongs
  102. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  103. }
  104. // DefaultMConnConfig returns the default config.
  105. func DefaultMConnConfig() MConnConfig {
  106. return MConnConfig{
  107. SendRate: defaultSendRate,
  108. RecvRate: defaultRecvRate,
  109. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  110. FlushThrottle: defaultFlushThrottle,
  111. PingInterval: defaultPingInterval,
  112. PongTimeout: defaultPongTimeout,
  113. }
  114. }
  115. // NewMConnection wraps net.Conn and creates multiplex connection
  116. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  117. return NewMConnectionWithConfig(
  118. conn,
  119. chDescs,
  120. onReceive,
  121. onError,
  122. DefaultMConnConfig())
  123. }
  124. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  125. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig) *MConnection {
  126. if config.PongTimeout >= config.PingInterval {
  127. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  128. }
  129. mconn := &MConnection{
  130. conn: conn,
  131. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  132. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  133. sendMonitor: flow.New(0, 0),
  134. recvMonitor: flow.New(0, 0),
  135. send: make(chan struct{}, 1),
  136. pong: make(chan struct{}, 1),
  137. onReceive: onReceive,
  138. onError: onError,
  139. config: config,
  140. created: time.Now(),
  141. }
  142. // Create channels
  143. var channelsIdx = map[byte]*Channel{}
  144. var channels = []*Channel{}
  145. for _, desc := range chDescs {
  146. channel := newChannel(mconn, *desc)
  147. channelsIdx[channel.desc.ID] = channel
  148. channels = append(channels, channel)
  149. }
  150. mconn.channels = channels
  151. mconn.channelsIdx = channelsIdx
  152. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  153. // maxPacketMsgSize() is a bit heavy, so call just once
  154. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  155. return mconn
  156. }
  157. func (c *MConnection) SetLogger(l log.Logger) {
  158. c.BaseService.SetLogger(l)
  159. for _, ch := range c.channels {
  160. ch.SetLogger(l)
  161. }
  162. }
  163. // OnStart implements BaseService
  164. func (c *MConnection) OnStart() error {
  165. if err := c.BaseService.OnStart(); err != nil {
  166. return err
  167. }
  168. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
  169. c.pingTimer = time.NewTicker(c.config.PingInterval)
  170. c.pongTimeoutCh = make(chan bool, 1)
  171. c.chStatsTimer = time.NewTicker(updateStats)
  172. c.quitSendRoutine = make(chan struct{})
  173. c.doneSendRoutine = make(chan struct{})
  174. c.quitRecvRoutine = make(chan struct{})
  175. go c.sendRoutine()
  176. go c.recvRoutine()
  177. return nil
  178. }
  179. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  180. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  181. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  182. func (c *MConnection) stopServices() (alreadyStopped bool) {
  183. c.stopMtx.Lock()
  184. defer c.stopMtx.Unlock()
  185. select {
  186. case <-c.quitSendRoutine:
  187. // already quit
  188. return true
  189. default:
  190. }
  191. select {
  192. case <-c.quitRecvRoutine:
  193. // already quit
  194. return true
  195. default:
  196. }
  197. c.BaseService.OnStop()
  198. c.flushTimer.Stop()
  199. c.pingTimer.Stop()
  200. c.chStatsTimer.Stop()
  201. // inform the recvRouting that we are shutting down
  202. close(c.quitRecvRoutine)
  203. close(c.quitSendRoutine)
  204. return false
  205. }
  206. // FlushStop replicates the logic of OnStop.
  207. // It additionally ensures that all successful
  208. // .Send() calls will get flushed before closing
  209. // the connection.
  210. func (c *MConnection) FlushStop() {
  211. if c.stopServices() {
  212. return
  213. }
  214. // this block is unique to FlushStop
  215. {
  216. // wait until the sendRoutine exits
  217. // so we dont race on calling sendSomePacketMsgs
  218. <-c.doneSendRoutine
  219. // Send and flush all pending msgs.
  220. // Since sendRoutine has exited, we can call this
  221. // safely
  222. eof := c.sendSomePacketMsgs()
  223. for !eof {
  224. eof = c.sendSomePacketMsgs()
  225. }
  226. c.flush()
  227. // Now we can close the connection
  228. }
  229. c.conn.Close() // nolint: errcheck
  230. // We can't close pong safely here because
  231. // recvRoutine may write to it after we've stopped.
  232. // Though it doesn't need to get closed at all,
  233. // we close it @ recvRoutine.
  234. // c.Stop()
  235. }
  236. // OnStop implements BaseService
  237. func (c *MConnection) OnStop() {
  238. if c.stopServices() {
  239. return
  240. }
  241. c.conn.Close() // nolint: errcheck
  242. // We can't close pong safely here because
  243. // recvRoutine may write to it after we've stopped.
  244. // Though it doesn't need to get closed at all,
  245. // we close it @ recvRoutine.
  246. }
  247. func (c *MConnection) String() string {
  248. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  249. }
  250. func (c *MConnection) flush() {
  251. c.Logger.Debug("Flush", "conn", c)
  252. err := c.bufConnWriter.Flush()
  253. if err != nil {
  254. c.Logger.Error("MConnection flush failed", "err", err)
  255. }
  256. }
  257. // Catch panics, usually caused by remote disconnects.
  258. func (c *MConnection) _recover() {
  259. if r := recover(); r != nil {
  260. c.Logger.Error("MConnection panicked", "err", r, "stack", string(debug.Stack()))
  261. c.stopForError(errors.Errorf("recovered from panic: %v", r))
  262. }
  263. }
  264. func (c *MConnection) stopForError(r interface{}) {
  265. c.Stop()
  266. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  267. if c.onError != nil {
  268. c.onError(r)
  269. }
  270. }
  271. }
  272. // Queues a message to be sent to channel.
  273. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  274. if !c.IsRunning() {
  275. return false
  276. }
  277. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  278. // Send message to channel.
  279. channel, ok := c.channelsIdx[chID]
  280. if !ok {
  281. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  282. return false
  283. }
  284. success := channel.sendBytes(msgBytes)
  285. if success {
  286. // Wake up sendRoutine if necessary
  287. select {
  288. case c.send <- struct{}{}:
  289. default:
  290. }
  291. } else {
  292. c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  293. }
  294. return success
  295. }
  296. // Queues a message to be sent to channel.
  297. // Nonblocking, returns true if successful.
  298. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  299. if !c.IsRunning() {
  300. return false
  301. }
  302. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  303. // Send message to channel.
  304. channel, ok := c.channelsIdx[chID]
  305. if !ok {
  306. c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  307. return false
  308. }
  309. ok = channel.trySendBytes(msgBytes)
  310. if ok {
  311. // Wake up sendRoutine if necessary
  312. select {
  313. case c.send <- struct{}{}:
  314. default:
  315. }
  316. }
  317. return ok
  318. }
  319. // CanSend returns true if you can send more data onto the chID, false
  320. // otherwise. Use only as a heuristic.
  321. func (c *MConnection) CanSend(chID byte) bool {
  322. if !c.IsRunning() {
  323. return false
  324. }
  325. channel, ok := c.channelsIdx[chID]
  326. if !ok {
  327. c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
  328. return false
  329. }
  330. return channel.canSend()
  331. }
  332. // sendRoutine polls for packets to send from channels.
  333. func (c *MConnection) sendRoutine() {
  334. defer c._recover()
  335. FOR_LOOP:
  336. for {
  337. var _n int64
  338. var err error
  339. SELECTION:
  340. select {
  341. case <-c.flushTimer.Ch:
  342. // NOTE: flushTimer.Set() must be called every time
  343. // something is written to .bufConnWriter.
  344. c.flush()
  345. case <-c.chStatsTimer.C:
  346. for _, channel := range c.channels {
  347. channel.updateStats()
  348. }
  349. case <-c.pingTimer.C:
  350. c.Logger.Debug("Send Ping")
  351. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
  352. if err != nil {
  353. break SELECTION
  354. }
  355. c.sendMonitor.Update(int(_n))
  356. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  357. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  358. select {
  359. case c.pongTimeoutCh <- true:
  360. default:
  361. }
  362. })
  363. c.flush()
  364. case timeout := <-c.pongTimeoutCh:
  365. if timeout {
  366. c.Logger.Debug("Pong timeout")
  367. err = errors.New("pong timeout")
  368. } else {
  369. c.stopPongTimer()
  370. }
  371. case <-c.pong:
  372. c.Logger.Debug("Send Pong")
  373. _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{})
  374. if err != nil {
  375. break SELECTION
  376. }
  377. c.sendMonitor.Update(int(_n))
  378. c.flush()
  379. case <-c.quitSendRoutine:
  380. break FOR_LOOP
  381. case <-c.send:
  382. // Send some PacketMsgs
  383. eof := c.sendSomePacketMsgs()
  384. if !eof {
  385. // Keep sendRoutine awake.
  386. select {
  387. case c.send <- struct{}{}:
  388. default:
  389. }
  390. }
  391. }
  392. if !c.IsRunning() {
  393. break FOR_LOOP
  394. }
  395. if err != nil {
  396. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  397. c.stopForError(err)
  398. break FOR_LOOP
  399. }
  400. }
  401. // Cleanup
  402. c.stopPongTimer()
  403. close(c.doneSendRoutine)
  404. }
  405. // Returns true if messages from channels were exhausted.
  406. // Blocks in accordance to .sendMonitor throttling.
  407. func (c *MConnection) sendSomePacketMsgs() bool {
  408. // Block until .sendMonitor says we can write.
  409. // Once we're ready we send more than we asked for,
  410. // but amortized it should even out.
  411. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
  412. // Now send some PacketMsgs.
  413. for i := 0; i < numBatchPacketMsgs; i++ {
  414. if c.sendPacketMsg() {
  415. return true
  416. }
  417. }
  418. return false
  419. }
  420. // Returns true if messages from channels were exhausted.
  421. func (c *MConnection) sendPacketMsg() bool {
  422. // Choose a channel to create a PacketMsg from.
  423. // The chosen channel will be the one whose recentlySent/priority is the least.
  424. var leastRatio float32 = math.MaxFloat32
  425. var leastChannel *Channel
  426. for _, channel := range c.channels {
  427. // If nothing to send, skip this channel
  428. if !channel.isSendPending() {
  429. continue
  430. }
  431. // Get ratio, and keep track of lowest ratio.
  432. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  433. if ratio < leastRatio {
  434. leastRatio = ratio
  435. leastChannel = channel
  436. }
  437. }
  438. // Nothing to send?
  439. if leastChannel == nil {
  440. return true
  441. }
  442. // c.Logger.Info("Found a msgPacket to send")
  443. // Make & send a PacketMsg from this channel
  444. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  445. if err != nil {
  446. c.Logger.Error("Failed to write PacketMsg", "err", err)
  447. c.stopForError(err)
  448. return true
  449. }
  450. c.sendMonitor.Update(int(_n))
  451. c.flushTimer.Set()
  452. return false
  453. }
  454. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  455. // After a whole message has been assembled, it's pushed to onReceive().
  456. // Blocks depending on how the connection is throttled.
  457. // Otherwise, it never blocks.
  458. func (c *MConnection) recvRoutine() {
  459. defer c._recover()
  460. FOR_LOOP:
  461. for {
  462. // Block until .recvMonitor says we can read.
  463. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
  464. // Peek into bufConnReader for debugging
  465. /*
  466. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  467. bz, err := c.bufConnReader.Peek(cmn.MinInt(numBytes, 100))
  468. if err == nil {
  469. // return
  470. } else {
  471. c.Logger.Debug("Error peeking connection buffer", "err", err)
  472. // return nil
  473. }
  474. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  475. }
  476. */
  477. // Read packet type
  478. var packet Packet
  479. var _n int64
  480. var err error
  481. _n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize))
  482. c.recvMonitor.Update(int(_n))
  483. if err != nil {
  484. // stopServices was invoked and we are shutting down
  485. // receiving is excpected to fail since we will close the connection
  486. select {
  487. case <-c.quitRecvRoutine:
  488. break FOR_LOOP
  489. default:
  490. }
  491. if c.IsRunning() {
  492. if err == io.EOF {
  493. c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
  494. } else {
  495. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  496. }
  497. c.stopForError(err)
  498. }
  499. break FOR_LOOP
  500. }
  501. // Read more depending on packet type.
  502. switch pkt := packet.(type) {
  503. case PacketPing:
  504. // TODO: prevent abuse, as they cause flush()'s.
  505. // https://github.com/tendermint/tendermint/issues/1190
  506. c.Logger.Debug("Receive Ping")
  507. select {
  508. case c.pong <- struct{}{}:
  509. default:
  510. // never block
  511. }
  512. case PacketPong:
  513. c.Logger.Debug("Receive Pong")
  514. select {
  515. case c.pongTimeoutCh <- false:
  516. default:
  517. // never block
  518. }
  519. case PacketMsg:
  520. channel, ok := c.channelsIdx[pkt.ChannelID]
  521. if !ok || channel == nil {
  522. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  523. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  524. c.stopForError(err)
  525. break FOR_LOOP
  526. }
  527. msgBytes, err := channel.recvPacketMsg(pkt)
  528. if err != nil {
  529. if c.IsRunning() {
  530. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  531. c.stopForError(err)
  532. }
  533. break FOR_LOOP
  534. }
  535. if msgBytes != nil {
  536. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  537. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  538. c.onReceive(pkt.ChannelID, msgBytes)
  539. }
  540. default:
  541. err := fmt.Errorf("Unknown message type %v", reflect.TypeOf(packet))
  542. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  543. c.stopForError(err)
  544. break FOR_LOOP
  545. }
  546. }
  547. // Cleanup
  548. close(c.pong)
  549. for range c.pong {
  550. // Drain
  551. }
  552. }
  553. // not goroutine-safe
  554. func (c *MConnection) stopPongTimer() {
  555. if c.pongTimer != nil {
  556. _ = c.pongTimer.Stop()
  557. c.pongTimer = nil
  558. }
  559. }
  560. // maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead
  561. // of amino encoding.
  562. func (c *MConnection) maxPacketMsgSize() int {
  563. return len(cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{
  564. ChannelID: 0x01,
  565. EOF: 1,
  566. Bytes: make([]byte, c.config.MaxPacketMsgPayloadSize),
  567. })) + 10 // leave room for changes in amino
  568. }
  569. type ConnectionStatus struct {
  570. Duration time.Duration
  571. SendMonitor flow.Status
  572. RecvMonitor flow.Status
  573. Channels []ChannelStatus
  574. }
  575. type ChannelStatus struct {
  576. ID byte
  577. SendQueueCapacity int
  578. SendQueueSize int
  579. Priority int
  580. RecentlySent int64
  581. }
  582. func (c *MConnection) Status() ConnectionStatus {
  583. var status ConnectionStatus
  584. status.Duration = time.Since(c.created)
  585. status.SendMonitor = c.sendMonitor.Status()
  586. status.RecvMonitor = c.recvMonitor.Status()
  587. status.Channels = make([]ChannelStatus, len(c.channels))
  588. for i, channel := range c.channels {
  589. status.Channels[i] = ChannelStatus{
  590. ID: channel.desc.ID,
  591. SendQueueCapacity: cap(channel.sendQueue),
  592. SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
  593. Priority: channel.desc.Priority,
  594. RecentlySent: atomic.LoadInt64(&channel.recentlySent),
  595. }
  596. }
  597. return status
  598. }
  599. //-----------------------------------------------------------------------------
  600. type ChannelDescriptor struct {
  601. ID byte
  602. Priority int
  603. SendQueueCapacity int
  604. RecvBufferCapacity int
  605. RecvMessageCapacity int
  606. }
  607. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  608. if chDesc.SendQueueCapacity == 0 {
  609. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  610. }
  611. if chDesc.RecvBufferCapacity == 0 {
  612. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  613. }
  614. if chDesc.RecvMessageCapacity == 0 {
  615. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  616. }
  617. filled = chDesc
  618. return
  619. }
  620. // TODO: lowercase.
  621. // NOTE: not goroutine-safe.
  622. type Channel struct {
  623. conn *MConnection
  624. desc ChannelDescriptor
  625. sendQueue chan []byte
  626. sendQueueSize int32 // atomic.
  627. recving []byte
  628. sending []byte
  629. recentlySent int64 // exponential moving average
  630. maxPacketMsgPayloadSize int
  631. Logger log.Logger
  632. }
  633. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  634. desc = desc.FillDefaults()
  635. if desc.Priority <= 0 {
  636. panic("Channel default priority must be a positive integer")
  637. }
  638. return &Channel{
  639. conn: conn,
  640. desc: desc,
  641. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  642. recving: make([]byte, 0, desc.RecvBufferCapacity),
  643. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  644. }
  645. }
  646. func (ch *Channel) SetLogger(l log.Logger) {
  647. ch.Logger = l
  648. }
  649. // Queues message to send to this channel.
  650. // Goroutine-safe
  651. // Times out (and returns false) after defaultSendTimeout
  652. func (ch *Channel) sendBytes(bytes []byte) bool {
  653. select {
  654. case ch.sendQueue <- bytes:
  655. atomic.AddInt32(&ch.sendQueueSize, 1)
  656. return true
  657. case <-time.After(defaultSendTimeout):
  658. return false
  659. }
  660. }
  661. // Queues message to send to this channel.
  662. // Nonblocking, returns true if successful.
  663. // Goroutine-safe
  664. func (ch *Channel) trySendBytes(bytes []byte) bool {
  665. select {
  666. case ch.sendQueue <- bytes:
  667. atomic.AddInt32(&ch.sendQueueSize, 1)
  668. return true
  669. default:
  670. return false
  671. }
  672. }
  673. // Goroutine-safe
  674. func (ch *Channel) loadSendQueueSize() (size int) {
  675. return int(atomic.LoadInt32(&ch.sendQueueSize))
  676. }
  677. // Goroutine-safe
  678. // Use only as a heuristic.
  679. func (ch *Channel) canSend() bool {
  680. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  681. }
  682. // Returns true if any PacketMsgs are pending to be sent.
  683. // Call before calling nextPacketMsg()
  684. // Goroutine-safe
  685. func (ch *Channel) isSendPending() bool {
  686. if len(ch.sending) == 0 {
  687. if len(ch.sendQueue) == 0 {
  688. return false
  689. }
  690. ch.sending = <-ch.sendQueue
  691. }
  692. return true
  693. }
  694. // Creates a new PacketMsg to send.
  695. // Not goroutine-safe
  696. func (ch *Channel) nextPacketMsg() PacketMsg {
  697. packet := PacketMsg{}
  698. packet.ChannelID = ch.desc.ID
  699. maxSize := ch.maxPacketMsgPayloadSize
  700. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  701. if len(ch.sending) <= maxSize {
  702. packet.EOF = byte(0x01)
  703. ch.sending = nil
  704. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  705. } else {
  706. packet.EOF = byte(0x00)
  707. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  708. }
  709. return packet
  710. }
  711. // Writes next PacketMsg to w and updates c.recentlySent.
  712. // Not goroutine-safe
  713. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
  714. var packet = ch.nextPacketMsg()
  715. n, err = cdc.MarshalBinaryLengthPrefixedWriter(w, packet)
  716. atomic.AddInt64(&ch.recentlySent, n)
  717. return
  718. }
  719. // Handles incoming PacketMsgs. It returns a message bytes if message is
  720. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  721. // Not goroutine-safe
  722. func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
  723. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  724. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
  725. if recvCap < recvReceived {
  726. return nil, fmt.Errorf("Received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  727. }
  728. ch.recving = append(ch.recving, packet.Bytes...)
  729. if packet.EOF == byte(0x01) {
  730. msgBytes := ch.recving
  731. // clear the slice without re-allocating.
  732. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  733. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  734. // at which point the recving slice stops being used and should be garbage collected
  735. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  736. return msgBytes, nil
  737. }
  738. return nil, nil
  739. }
  740. // Call this periodically to update stats for throttling purposes.
  741. // Not goroutine-safe
  742. func (ch *Channel) updateStats() {
  743. // Exponential decay of stats.
  744. // TODO: optimize.
  745. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  746. }
  747. //----------------------------------------
  748. // Packet
  749. type Packet interface {
  750. AssertIsPacket()
  751. }
  752. func RegisterPacket(cdc *amino.Codec) {
  753. cdc.RegisterInterface((*Packet)(nil), nil)
  754. cdc.RegisterConcrete(PacketPing{}, "tendermint/p2p/PacketPing", nil)
  755. cdc.RegisterConcrete(PacketPong{}, "tendermint/p2p/PacketPong", nil)
  756. cdc.RegisterConcrete(PacketMsg{}, "tendermint/p2p/PacketMsg", nil)
  757. }
  758. func (_ PacketPing) AssertIsPacket() {}
  759. func (_ PacketPong) AssertIsPacket() {}
  760. func (_ PacketMsg) AssertIsPacket() {}
  761. type PacketPing struct {
  762. }
  763. type PacketPong struct {
  764. }
  765. type PacketMsg struct {
  766. ChannelID byte
  767. EOF byte // 1 means message ends here.
  768. Bytes []byte
  769. }
  770. func (mp PacketMsg) String() string {
  771. return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF)
  772. }