You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

787 lines
21 KiB

9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
7 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package conn
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "reflect"
  10. "sync/atomic"
  11. "time"
  12. amino "github.com/tendermint/go-amino"
  13. cmn "github.com/tendermint/tmlibs/common"
  14. flow "github.com/tendermint/tmlibs/flowrate"
  15. "github.com/tendermint/tmlibs/log"
  16. )
  17. const (
  18. maxPacketMsgPayloadSizeDefault = 1024 // NOTE: Must be below 16,384 bytes for 14 below.
  19. maxPacketMsgOverheadSize = 14 // NOTE: See connection_test for derivation.
  20. numBatchPacketMsgs = 10
  21. minReadBufferSize = 1024
  22. minWriteBufferSize = 65536
  23. updateStats = 2 * time.Second
  24. // some of these defaults are written in the user config
  25. // flushThrottle, sendRate, recvRate
  26. // TODO: remove values present in config
  27. defaultFlushThrottle = 100 * time.Millisecond
  28. defaultSendQueueCapacity = 1
  29. defaultRecvBufferCapacity = 4096
  30. defaultRecvMessageCapacity = 22020096 // 21MB
  31. defaultSendRate = int64(512000) // 500KB/s
  32. defaultRecvRate = int64(512000) // 500KB/s
  33. defaultSendTimeout = 10 * time.Second
  34. defaultPingInterval = 60 * time.Second
  35. defaultPongTimeout = 45 * time.Second
  36. )
  37. type receiveCbFunc func(chID byte, msgBytes []byte)
  38. type errorCbFunc func(interface{})
  39. /*
  40. Each peer has one `MConnection` (multiplex connection) instance.
  41. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  42. several messages along a single channel of communication.
  43. Each `MConnection` handles message transmission on multiple abstract communication
  44. `Channel`s. Each channel has a globally unique byte id.
  45. The byte id and the relative priorities of each `Channel` are configured upon
  46. initialization of the connection.
  47. There are two methods for sending messages:
  48. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  49. func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
  50. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  51. successfully queued for the channel with the given id byte `chID`, or until the
  52. request times out. The message `msg` is serialized using Go-Amino.
  53. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
  54. channel's queue is full.
  55. Inbound message bytes are handled with an onReceive callback function.
  56. */
  57. type MConnection struct {
  58. cmn.BaseService
  59. conn net.Conn
  60. bufConnReader *bufio.Reader
  61. bufConnWriter *bufio.Writer
  62. sendMonitor *flow.Monitor
  63. recvMonitor *flow.Monitor
  64. send chan struct{}
  65. pong chan struct{}
  66. channels []*Channel
  67. channelsIdx map[byte]*Channel
  68. onReceive receiveCbFunc
  69. onError errorCbFunc
  70. errored uint32
  71. config *MConnConfig
  72. quit chan struct{}
  73. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  74. pingTimer *cmn.RepeatTimer // send pings periodically
  75. // close conn if pong is not received in pongTimeout
  76. pongTimer *time.Timer
  77. pongTimeoutCh chan bool // true - timeout, false - peer sent pong
  78. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  79. created time.Time // time of creation
  80. }
  81. // MConnConfig is a MConnection configuration.
  82. type MConnConfig struct {
  83. SendRate int64 `mapstructure:"send_rate"`
  84. RecvRate int64 `mapstructure:"recv_rate"`
  85. // Maximum payload size
  86. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  87. // Interval to flush writes (throttled)
  88. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  89. // Interval to send pings
  90. PingInterval time.Duration `mapstructure:"ping_interval"`
  91. // Maximum wait time for pongs
  92. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  93. }
  94. func (cfg *MConnConfig) maxPacketMsgTotalSize() int {
  95. return cfg.MaxPacketMsgPayloadSize + maxPacketMsgOverheadSize
  96. }
  97. // DefaultMConnConfig returns the default config.
  98. func DefaultMConnConfig() *MConnConfig {
  99. return &MConnConfig{
  100. SendRate: defaultSendRate,
  101. RecvRate: defaultRecvRate,
  102. MaxPacketMsgPayloadSize: maxPacketMsgPayloadSizeDefault,
  103. FlushThrottle: defaultFlushThrottle,
  104. PingInterval: defaultPingInterval,
  105. PongTimeout: defaultPongTimeout,
  106. }
  107. }
  108. // NewMConnection wraps net.Conn and creates multiplex connection
  109. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  110. return NewMConnectionWithConfig(
  111. conn,
  112. chDescs,
  113. onReceive,
  114. onError,
  115. DefaultMConnConfig())
  116. }
  117. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  118. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
  119. if config.PongTimeout >= config.PingInterval {
  120. panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
  121. }
  122. mconn := &MConnection{
  123. conn: conn,
  124. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  125. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  126. sendMonitor: flow.New(0, 0),
  127. recvMonitor: flow.New(0, 0),
  128. send: make(chan struct{}, 1),
  129. pong: make(chan struct{}, 1),
  130. onReceive: onReceive,
  131. onError: onError,
  132. config: config,
  133. }
  134. // Create channels
  135. var channelsIdx = map[byte]*Channel{}
  136. var channels = []*Channel{}
  137. for _, desc := range chDescs {
  138. channel := newChannel(mconn, *desc)
  139. channelsIdx[channel.desc.ID] = channel
  140. channels = append(channels, channel)
  141. }
  142. mconn.channels = channels
  143. mconn.channelsIdx = channelsIdx
  144. mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
  145. return mconn
  146. }
  147. func (c *MConnection) SetLogger(l log.Logger) {
  148. c.BaseService.SetLogger(l)
  149. for _, ch := range c.channels {
  150. ch.SetLogger(l)
  151. }
  152. }
  153. // OnStart implements BaseService
  154. func (c *MConnection) OnStart() error {
  155. if err := c.BaseService.OnStart(); err != nil {
  156. return err
  157. }
  158. c.quit = make(chan struct{})
  159. c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
  160. c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval)
  161. c.pongTimeoutCh = make(chan bool, 1)
  162. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
  163. go c.sendRoutine()
  164. go c.recvRoutine()
  165. return nil
  166. }
  167. // OnStop implements BaseService
  168. func (c *MConnection) OnStop() {
  169. c.BaseService.OnStop()
  170. c.flushTimer.Stop()
  171. c.pingTimer.Stop()
  172. c.chStatsTimer.Stop()
  173. if c.quit != nil {
  174. close(c.quit)
  175. }
  176. c.conn.Close() // nolint: errcheck
  177. // We can't close pong safely here because
  178. // recvRoutine may write to it after we've stopped.
  179. // Though it doesn't need to get closed at all,
  180. // we close it @ recvRoutine.
  181. }
  182. func (c *MConnection) String() string {
  183. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  184. }
  185. func (c *MConnection) flush() {
  186. c.Logger.Debug("Flush", "conn", c)
  187. err := c.bufConnWriter.Flush()
  188. if err != nil {
  189. c.Logger.Error("MConnection flush failed", "err", err)
  190. }
  191. }
  192. // Catch panics, usually caused by remote disconnects.
  193. func (c *MConnection) _recover() {
  194. if r := recover(); r != nil {
  195. err := cmn.ErrorWrap(r, "recovered panic in MConnection")
  196. c.stopForError(err)
  197. }
  198. }
  199. func (c *MConnection) stopForError(r interface{}) {
  200. c.Stop()
  201. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  202. if c.onError != nil {
  203. c.onError(r)
  204. }
  205. }
  206. }
  207. // Queues a message to be sent to channel.
  208. func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
  209. if !c.IsRunning() {
  210. return false
  211. }
  212. c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  213. // Send message to channel.
  214. channel, ok := c.channelsIdx[chID]
  215. if !ok {
  216. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  217. return false
  218. }
  219. success := channel.sendBytes(msgBytes)
  220. if success {
  221. // Wake up sendRoutine if necessary
  222. select {
  223. case c.send <- struct{}{}:
  224. default:
  225. }
  226. } else {
  227. c.Logger.Error("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  228. }
  229. return success
  230. }
  231. // Queues a message to be sent to channel.
  232. // Nonblocking, returns true if successful.
  233. func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
  234. if !c.IsRunning() {
  235. return false
  236. }
  237. c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
  238. // Send message to channel.
  239. channel, ok := c.channelsIdx[chID]
  240. if !ok {
  241. c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  242. return false
  243. }
  244. ok = channel.trySendBytes(msgBytes)
  245. if ok {
  246. // Wake up sendRoutine if necessary
  247. select {
  248. case c.send <- struct{}{}:
  249. default:
  250. }
  251. }
  252. return ok
  253. }
  254. // CanSend returns true if you can send more data onto the chID, false
  255. // otherwise. Use only as a heuristic.
  256. func (c *MConnection) CanSend(chID byte) bool {
  257. if !c.IsRunning() {
  258. return false
  259. }
  260. channel, ok := c.channelsIdx[chID]
  261. if !ok {
  262. c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
  263. return false
  264. }
  265. return channel.canSend()
  266. }
  267. // sendRoutine polls for packets to send from channels.
  268. func (c *MConnection) sendRoutine() {
  269. defer c._recover()
  270. FOR_LOOP:
  271. for {
  272. var _n int64
  273. var err error
  274. SELECTION:
  275. select {
  276. case <-c.flushTimer.Ch:
  277. // NOTE: flushTimer.Set() must be called every time
  278. // something is written to .bufConnWriter.
  279. c.flush()
  280. case <-c.chStatsTimer.Chan():
  281. for _, channel := range c.channels {
  282. channel.updateStats()
  283. }
  284. case <-c.pingTimer.Chan():
  285. c.Logger.Debug("Send Ping")
  286. _n, err = cdc.MarshalBinaryWriter(c.bufConnWriter, PacketPing{})
  287. if err != nil {
  288. break SELECTION
  289. }
  290. c.sendMonitor.Update(int(_n))
  291. c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
  292. c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
  293. select {
  294. case c.pongTimeoutCh <- true:
  295. default:
  296. }
  297. })
  298. c.flush()
  299. case timeout := <-c.pongTimeoutCh:
  300. if timeout {
  301. c.Logger.Debug("Pong timeout")
  302. err = errors.New("pong timeout")
  303. } else {
  304. c.stopPongTimer()
  305. }
  306. case <-c.pong:
  307. c.Logger.Debug("Send Pong")
  308. _n, err = cdc.MarshalBinaryWriter(c.bufConnWriter, PacketPong{})
  309. if err != nil {
  310. break SELECTION
  311. }
  312. c.sendMonitor.Update(int(_n))
  313. c.flush()
  314. case <-c.quit:
  315. break FOR_LOOP
  316. case <-c.send:
  317. // Send some PacketMsgs
  318. eof := c.sendSomePacketMsgs()
  319. if !eof {
  320. // Keep sendRoutine awake.
  321. select {
  322. case c.send <- struct{}{}:
  323. default:
  324. }
  325. }
  326. }
  327. if !c.IsRunning() {
  328. break FOR_LOOP
  329. }
  330. if err != nil {
  331. c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  332. c.stopForError(err)
  333. break FOR_LOOP
  334. }
  335. }
  336. // Cleanup
  337. c.stopPongTimer()
  338. }
  339. // Returns true if messages from channels were exhausted.
  340. // Blocks in accordance to .sendMonitor throttling.
  341. func (c *MConnection) sendSomePacketMsgs() bool {
  342. // Block until .sendMonitor says we can write.
  343. // Once we're ready we send more than we asked for,
  344. // but amortized it should even out.
  345. c.sendMonitor.Limit(c.config.maxPacketMsgTotalSize(), atomic.LoadInt64(&c.config.SendRate), true)
  346. // Now send some PacketMsgs.
  347. for i := 0; i < numBatchPacketMsgs; i++ {
  348. if c.sendPacketMsg() {
  349. return true
  350. }
  351. }
  352. return false
  353. }
  354. // Returns true if messages from channels were exhausted.
  355. func (c *MConnection) sendPacketMsg() bool {
  356. // Choose a channel to create a PacketMsg from.
  357. // The chosen channel will be the one whose recentlySent/priority is the least.
  358. var leastRatio float32 = math.MaxFloat32
  359. var leastChannel *Channel
  360. for _, channel := range c.channels {
  361. // If nothing to send, skip this channel
  362. if !channel.isSendPending() {
  363. continue
  364. }
  365. // Get ratio, and keep track of lowest ratio.
  366. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  367. if ratio < leastRatio {
  368. leastRatio = ratio
  369. leastChannel = channel
  370. }
  371. }
  372. // Nothing to send?
  373. if leastChannel == nil {
  374. return true
  375. } else {
  376. // c.Logger.Info("Found a PacketMsg to send")
  377. }
  378. // Make & send a PacketMsg from this channel
  379. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  380. if err != nil {
  381. c.Logger.Error("Failed to write PacketMsg", "err", err)
  382. c.stopForError(err)
  383. return true
  384. }
  385. c.sendMonitor.Update(int(_n))
  386. c.flushTimer.Set()
  387. return false
  388. }
  389. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  390. // After a whole message has been assembled, it's pushed to onReceive().
  391. // Blocks depending on how the connection is throttled.
  392. // Otherwise, it never blocks.
  393. func (c *MConnection) recvRoutine() {
  394. defer c._recover()
  395. FOR_LOOP:
  396. for {
  397. // Block until .recvMonitor says we can read.
  398. c.recvMonitor.Limit(c.config.maxPacketMsgTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true)
  399. // Peek into bufConnReader for debugging
  400. /*
  401. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  402. bz, err := c.bufConnReader.Peek(cmn.MinInt(numBytes, 100))
  403. if err == nil {
  404. // return
  405. } else {
  406. c.Logger.Debug("Error peeking connection buffer", "err", err)
  407. // return nil
  408. }
  409. c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  410. }
  411. */
  412. // Read packet type
  413. var packet Packet
  414. var _n int64
  415. var err error
  416. _n, err = cdc.UnmarshalBinaryReader(c.bufConnReader, &packet, int64(c.config.maxPacketMsgTotalSize()))
  417. c.recvMonitor.Update(int(_n))
  418. if err != nil {
  419. if c.IsRunning() {
  420. c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  421. c.stopForError(err)
  422. }
  423. break FOR_LOOP
  424. }
  425. // Read more depending on packet type.
  426. switch pkt := packet.(type) {
  427. case PacketPing:
  428. // TODO: prevent abuse, as they cause flush()'s.
  429. // https://github.com/tendermint/tendermint/issues/1190
  430. c.Logger.Debug("Receive Ping")
  431. select {
  432. case c.pong <- struct{}{}:
  433. default:
  434. // never block
  435. }
  436. case PacketPong:
  437. c.Logger.Debug("Receive Pong")
  438. select {
  439. case c.pongTimeoutCh <- false:
  440. default:
  441. // never block
  442. }
  443. case PacketMsg:
  444. channel, ok := c.channelsIdx[pkt.ChannelID]
  445. if !ok || channel == nil {
  446. err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
  447. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  448. c.stopForError(err)
  449. break FOR_LOOP
  450. }
  451. msgBytes, err := channel.recvPacketMsg(pkt)
  452. if err != nil {
  453. if c.IsRunning() {
  454. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  455. c.stopForError(err)
  456. }
  457. break FOR_LOOP
  458. }
  459. if msgBytes != nil {
  460. c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  461. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  462. c.onReceive(pkt.ChannelID, msgBytes)
  463. }
  464. default:
  465. err := fmt.Errorf("Unknown message type %v", reflect.TypeOf(packet))
  466. c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  467. c.stopForError(err)
  468. break FOR_LOOP
  469. }
  470. }
  471. // Cleanup
  472. close(c.pong)
  473. for range c.pong {
  474. // Drain
  475. }
  476. }
  477. // not goroutine-safe
  478. func (c *MConnection) stopPongTimer() {
  479. if c.pongTimer != nil {
  480. if !c.pongTimer.Stop() {
  481. <-c.pongTimer.C
  482. }
  483. c.pongTimer = nil
  484. }
  485. }
  486. type ConnectionStatus struct {
  487. Duration time.Duration
  488. SendMonitor flow.Status
  489. RecvMonitor flow.Status
  490. Channels []ChannelStatus
  491. }
  492. type ChannelStatus struct {
  493. ID byte
  494. SendQueueCapacity int
  495. SendQueueSize int
  496. Priority int
  497. RecentlySent int64
  498. }
  499. func (c *MConnection) Status() ConnectionStatus {
  500. var status ConnectionStatus
  501. status.Duration = time.Since(c.created)
  502. status.SendMonitor = c.sendMonitor.Status()
  503. status.RecvMonitor = c.recvMonitor.Status()
  504. status.Channels = make([]ChannelStatus, len(c.channels))
  505. for i, channel := range c.channels {
  506. status.Channels[i] = ChannelStatus{
  507. ID: channel.desc.ID,
  508. SendQueueCapacity: cap(channel.sendQueue),
  509. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  510. Priority: channel.desc.Priority,
  511. RecentlySent: channel.recentlySent,
  512. }
  513. }
  514. return status
  515. }
  516. //-----------------------------------------------------------------------------
  517. type ChannelDescriptor struct {
  518. ID byte
  519. Priority int
  520. SendQueueCapacity int
  521. RecvBufferCapacity int
  522. RecvMessageCapacity int
  523. }
  524. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  525. if chDesc.SendQueueCapacity == 0 {
  526. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  527. }
  528. if chDesc.RecvBufferCapacity == 0 {
  529. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  530. }
  531. if chDesc.RecvMessageCapacity == 0 {
  532. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  533. }
  534. filled = chDesc
  535. return
  536. }
  537. // TODO: lowercase.
  538. // NOTE: not goroutine-safe.
  539. type Channel struct {
  540. conn *MConnection
  541. desc ChannelDescriptor
  542. sendQueue chan []byte
  543. sendQueueSize int32 // atomic.
  544. recving []byte
  545. sending []byte
  546. recentlySent int64 // exponential moving average
  547. maxPacketMsgPayloadSize int
  548. Logger log.Logger
  549. }
  550. func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
  551. desc = desc.FillDefaults()
  552. if desc.Priority <= 0 {
  553. cmn.PanicSanity("Channel default priority must be a positive integer")
  554. }
  555. return &Channel{
  556. conn: conn,
  557. desc: desc,
  558. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  559. recving: make([]byte, 0, desc.RecvBufferCapacity),
  560. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  561. }
  562. }
  563. func (ch *Channel) SetLogger(l log.Logger) {
  564. ch.Logger = l
  565. }
  566. // Queues message to send to this channel.
  567. // Goroutine-safe
  568. // Times out (and returns false) after defaultSendTimeout
  569. func (ch *Channel) sendBytes(bytes []byte) bool {
  570. select {
  571. case ch.sendQueue <- bytes:
  572. atomic.AddInt32(&ch.sendQueueSize, 1)
  573. return true
  574. case <-time.After(defaultSendTimeout):
  575. return false
  576. }
  577. }
  578. // Queues message to send to this channel.
  579. // Nonblocking, returns true if successful.
  580. // Goroutine-safe
  581. func (ch *Channel) trySendBytes(bytes []byte) bool {
  582. select {
  583. case ch.sendQueue <- bytes:
  584. atomic.AddInt32(&ch.sendQueueSize, 1)
  585. return true
  586. default:
  587. return false
  588. }
  589. }
  590. // Goroutine-safe
  591. func (ch *Channel) loadSendQueueSize() (size int) {
  592. return int(atomic.LoadInt32(&ch.sendQueueSize))
  593. }
  594. // Goroutine-safe
  595. // Use only as a heuristic.
  596. func (ch *Channel) canSend() bool {
  597. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  598. }
  599. // Returns true if any PacketMsgs are pending to be sent.
  600. // Call before calling nextPacketMsg()
  601. // Goroutine-safe
  602. func (ch *Channel) isSendPending() bool {
  603. if len(ch.sending) == 0 {
  604. if len(ch.sendQueue) == 0 {
  605. return false
  606. }
  607. ch.sending = <-ch.sendQueue
  608. }
  609. return true
  610. }
  611. // Creates a new PacketMsg to send.
  612. // Not goroutine-safe
  613. func (ch *Channel) nextPacketMsg() PacketMsg {
  614. packet := PacketMsg{}
  615. packet.ChannelID = byte(ch.desc.ID)
  616. maxSize := ch.maxPacketMsgPayloadSize
  617. packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
  618. if len(ch.sending) <= maxSize {
  619. packet.EOF = byte(0x01)
  620. ch.sending = nil
  621. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  622. } else {
  623. packet.EOF = byte(0x00)
  624. ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
  625. }
  626. return packet
  627. }
  628. // Writes next PacketMsg to w and updates c.recentlySent.
  629. // Not goroutine-safe
  630. func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
  631. var packet = ch.nextPacketMsg()
  632. n, err = cdc.MarshalBinaryWriter(w, packet)
  633. ch.recentlySent += n
  634. return
  635. }
  636. // Handles incoming PacketMsgs. It returns a message bytes if message is
  637. // complete. NOTE message bytes may change on next call to recvPacketMsg.
  638. // Not goroutine-safe
  639. func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
  640. ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  641. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
  642. if recvCap < recvReceived {
  643. return nil, fmt.Errorf("Received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  644. }
  645. ch.recving = append(ch.recving, packet.Bytes...)
  646. if packet.EOF == byte(0x01) {
  647. msgBytes := ch.recving
  648. // clear the slice without re-allocating.
  649. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  650. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  651. // at which point the recving slice stops being used and should be garbage collected
  652. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  653. return msgBytes, nil
  654. }
  655. return nil, nil
  656. }
  657. // Call this periodically to update stats for throttling purposes.
  658. // Not goroutine-safe
  659. func (ch *Channel) updateStats() {
  660. // Exponential decay of stats.
  661. // TODO: optimize.
  662. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  663. }
  664. //----------------------------------------
  665. // Packet
  666. type Packet interface {
  667. AssertIsPacket()
  668. }
  669. func RegisterPacket(cdc *amino.Codec) {
  670. cdc.RegisterInterface((*Packet)(nil), nil)
  671. cdc.RegisterConcrete(PacketPing{}, "tendermint/p2p/PacketPing", nil)
  672. cdc.RegisterConcrete(PacketPong{}, "tendermint/p2p/PacketPong", nil)
  673. cdc.RegisterConcrete(PacketMsg{}, "tendermint/p2p/PacketMsg", nil)
  674. }
  675. func (_ PacketPing) AssertIsPacket() {}
  676. func (_ PacketPong) AssertIsPacket() {}
  677. func (_ PacketMsg) AssertIsPacket() {}
  678. type PacketPing struct {
  679. }
  680. type PacketPong struct {
  681. }
  682. type PacketMsg struct {
  683. ChannelID byte
  684. EOF byte // 1 means message ends here.
  685. Bytes []byte
  686. }
  687. func (mp PacketMsg) String() string {
  688. return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF)
  689. }