You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

781 lines
21 KiB

p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
4 years ago
  1. package conn
  2. import (
  3. "bufio"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "math"
  9. "net"
  10. "reflect"
  11. "runtime/debug"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/gogo/protobuf/proto"
  16. "github.com/tendermint/tendermint/internal/libs/flowrate"
  17. "github.com/tendermint/tendermint/internal/libs/protoio"
  18. "github.com/tendermint/tendermint/internal/libs/timer"
  19. "github.com/tendermint/tendermint/libs/log"
  20. tmmath "github.com/tendermint/tendermint/libs/math"
  21. "github.com/tendermint/tendermint/libs/service"
  22. tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  23. )
  24. const (
  25. // mirrors MaxPacketMsgPayloadSize from config/config.go
  26. defaultMaxPacketMsgPayloadSize = 1400
  27. numBatchPacketMsgs = 10
  28. minReadBufferSize = 1024
  29. minWriteBufferSize = 65536
  30. updateStats = 2 * time.Second
  31. // some of these defaults are written in the user config
  32. // flushThrottle, sendRate, recvRate
  33. // TODO: remove values present in config
  34. defaultFlushThrottle = 100 * time.Millisecond
  35. defaultSendQueueCapacity = 1
  36. defaultRecvBufferCapacity = 4096
  37. defaultRecvMessageCapacity = 22020096 // 21MB
  38. defaultSendRate = int64(512000) // 500KB/s
  39. defaultRecvRate = int64(512000) // 500KB/s
  40. defaultSendTimeout = 10 * time.Second
  41. defaultPingInterval = 60 * time.Second
  42. defaultPongTimeout = 90 * time.Second
  43. )
  44. type receiveCbFunc func(ctx context.Context, chID ChannelID, msgBytes []byte)
  45. type errorCbFunc func(context.Context, interface{})
  46. /*
  47. Each peer has one `MConnection` (multiplex connection) instance.
  48. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  49. several messages along a single channel of communication.
  50. Each `MConnection` handles message transmission on multiple abstract communication
  51. `Channel`s. Each channel has a globally unique byte id.
  52. The byte id and the relative priorities of each `Channel` are configured upon
  53. initialization of the connection.
  54. There are two methods for sending messages:
  55. func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
  56. `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
  57. successfully queued for the channel with the given id byte `chID`, or until the
  58. request times out. The message `msg` is serialized using Protobuf.
  59. Inbound message bytes are handled with an onReceive callback function.
  60. */
  61. type MConnection struct {
  62. service.BaseService
  63. logger log.Logger
  64. conn net.Conn
  65. bufConnReader *bufio.Reader
  66. bufConnWriter *bufio.Writer
  67. sendMonitor *flowrate.Monitor
  68. recvMonitor *flowrate.Monitor
  69. send chan struct{}
  70. pong chan struct{}
  71. channels []*channel
  72. channelsIdx map[ChannelID]*channel
  73. onReceive receiveCbFunc
  74. onError errorCbFunc
  75. errored uint32
  76. config MConnConfig
  77. // Closing quitSendRoutine will cause the sendRoutine to eventually quit.
  78. // doneSendRoutine is closed when the sendRoutine actually quits.
  79. quitSendRoutine chan struct{}
  80. doneSendRoutine chan struct{}
  81. // Closing quitRecvRouting will cause the recvRouting to eventually quit.
  82. quitRecvRoutine chan struct{}
  83. // used to ensure FlushStop and OnStop
  84. // are safe to call concurrently.
  85. stopMtx sync.Mutex
  86. cancel context.CancelFunc
  87. flushTimer *timer.ThrottleTimer // flush writes as necessary but throttled.
  88. pingTimer *time.Ticker // send pings periodically
  89. // close conn if pong is not received in pongTimeout
  90. lastMsgRecv struct {
  91. sync.Mutex
  92. at time.Time
  93. }
  94. chStatsTimer *time.Ticker // update channel stats periodically
  95. created time.Time // time of creation
  96. _maxPacketMsgSize int
  97. }
  98. // MConnConfig is a MConnection configuration.
  99. type MConnConfig struct {
  100. SendRate int64 `mapstructure:"send_rate"`
  101. RecvRate int64 `mapstructure:"recv_rate"`
  102. // Maximum payload size
  103. MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
  104. // Interval to flush writes (throttled)
  105. FlushThrottle time.Duration `mapstructure:"flush_throttle"`
  106. // Interval to send pings
  107. PingInterval time.Duration `mapstructure:"ping_interval"`
  108. // Maximum wait time for pongs
  109. PongTimeout time.Duration `mapstructure:"pong_timeout"`
  110. // Process/Transport Start time
  111. StartTime time.Time `mapstructure:",omitempty"`
  112. }
  113. // DefaultMConnConfig returns the default config.
  114. func DefaultMConnConfig() MConnConfig {
  115. return MConnConfig{
  116. SendRate: defaultSendRate,
  117. RecvRate: defaultRecvRate,
  118. MaxPacketMsgPayloadSize: defaultMaxPacketMsgPayloadSize,
  119. FlushThrottle: defaultFlushThrottle,
  120. PingInterval: defaultPingInterval,
  121. PongTimeout: defaultPongTimeout,
  122. StartTime: time.Now(),
  123. }
  124. }
  125. // NewMConnection wraps net.Conn and creates multiplex connection with a config
  126. func NewMConnection(
  127. logger log.Logger,
  128. conn net.Conn,
  129. chDescs []*ChannelDescriptor,
  130. onReceive receiveCbFunc,
  131. onError errorCbFunc,
  132. config MConnConfig,
  133. ) *MConnection {
  134. mconn := &MConnection{
  135. logger: logger,
  136. conn: conn,
  137. bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
  138. bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  139. sendMonitor: flowrate.New(config.StartTime, 0, 0),
  140. recvMonitor: flowrate.New(config.StartTime, 0, 0),
  141. send: make(chan struct{}, 1),
  142. pong: make(chan struct{}, 1),
  143. onReceive: onReceive,
  144. onError: onError,
  145. config: config,
  146. created: time.Now(),
  147. cancel: func() {},
  148. }
  149. mconn.BaseService = *service.NewBaseService(logger, "MConnection", mconn)
  150. // Create channels
  151. var channelsIdx = map[ChannelID]*channel{}
  152. var channels = []*channel{}
  153. for _, desc := range chDescs {
  154. channel := newChannel(mconn, *desc)
  155. channelsIdx[channel.desc.ID] = channel
  156. channels = append(channels, channel)
  157. }
  158. mconn.channels = channels
  159. mconn.channelsIdx = channelsIdx
  160. // maxPacketMsgSize() is a bit heavy, so call just once
  161. mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
  162. return mconn
  163. }
  164. // OnStart implements BaseService
  165. func (c *MConnection) OnStart(ctx context.Context) error {
  166. c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle)
  167. c.pingTimer = time.NewTicker(c.config.PingInterval)
  168. c.chStatsTimer = time.NewTicker(updateStats)
  169. c.quitSendRoutine = make(chan struct{})
  170. c.doneSendRoutine = make(chan struct{})
  171. c.quitRecvRoutine = make(chan struct{})
  172. c.setRecvLastMsgAt(time.Now())
  173. go c.sendRoutine(ctx)
  174. go c.recvRoutine(ctx)
  175. return nil
  176. }
  177. func (c *MConnection) setRecvLastMsgAt(t time.Time) {
  178. c.lastMsgRecv.Lock()
  179. defer c.lastMsgRecv.Unlock()
  180. c.lastMsgRecv.at = t
  181. }
  182. func (c *MConnection) getLastMessageAt() time.Time {
  183. c.lastMsgRecv.Lock()
  184. defer c.lastMsgRecv.Unlock()
  185. return c.lastMsgRecv.at
  186. }
  187. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
  188. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
  189. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
  190. func (c *MConnection) stopServices() (alreadyStopped bool) {
  191. c.stopMtx.Lock()
  192. defer c.stopMtx.Unlock()
  193. select {
  194. case <-c.quitSendRoutine:
  195. // already quit
  196. return true
  197. default:
  198. }
  199. select {
  200. case <-c.quitRecvRoutine:
  201. // already quit
  202. return true
  203. default:
  204. }
  205. c.flushTimer.Stop()
  206. c.pingTimer.Stop()
  207. c.chStatsTimer.Stop()
  208. // inform the recvRouting that we are shutting down
  209. close(c.quitRecvRoutine)
  210. close(c.quitSendRoutine)
  211. return false
  212. }
  213. // OnStop implements BaseService
  214. func (c *MConnection) OnStop() {
  215. if c.stopServices() {
  216. return
  217. }
  218. c.conn.Close()
  219. // We can't close pong safely here because
  220. // recvRoutine may write to it after we've stopped.
  221. // Though it doesn't need to get closed at all,
  222. // we close it @ recvRoutine.
  223. }
  224. func (c *MConnection) String() string {
  225. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  226. }
  227. func (c *MConnection) flush() {
  228. c.logger.Debug("Flush", "conn", c)
  229. err := c.bufConnWriter.Flush()
  230. if err != nil {
  231. c.logger.Debug("MConnection flush failed", "err", err)
  232. }
  233. }
  234. // Catch panics, usually caused by remote disconnects.
  235. func (c *MConnection) _recover(ctx context.Context) {
  236. if r := recover(); r != nil {
  237. c.logger.Error("MConnection panicked", "err", r, "stack", string(debug.Stack()))
  238. c.stopForError(ctx, fmt.Errorf("recovered from panic: %v", r))
  239. }
  240. }
  241. func (c *MConnection) stopForError(ctx context.Context, r interface{}) {
  242. c.Stop()
  243. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  244. if c.onError != nil {
  245. c.onError(ctx, r)
  246. }
  247. }
  248. }
  249. // Queues a message to be sent to channel.
  250. func (c *MConnection) Send(chID ChannelID, msgBytes []byte) bool {
  251. if !c.IsRunning() {
  252. return false
  253. }
  254. c.logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", msgBytes)
  255. // Send message to channel.
  256. channel, ok := c.channelsIdx[chID]
  257. if !ok {
  258. c.logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
  259. return false
  260. }
  261. success := channel.sendBytes(msgBytes)
  262. if success {
  263. // Wake up sendRoutine if necessary
  264. select {
  265. case c.send <- struct{}{}:
  266. default:
  267. }
  268. } else {
  269. c.logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", msgBytes)
  270. }
  271. return success
  272. }
  273. // sendRoutine polls for packets to send from channels.
  274. func (c *MConnection) sendRoutine(ctx context.Context) {
  275. defer c._recover(ctx)
  276. protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter)
  277. pongTimeout := time.NewTicker(c.config.PongTimeout)
  278. defer pongTimeout.Stop()
  279. FOR_LOOP:
  280. for {
  281. var _n int
  282. var err error
  283. SELECTION:
  284. select {
  285. case <-c.flushTimer.Ch:
  286. // NOTE: flushTimer.Set() must be called every time
  287. // something is written to .bufConnWriter.
  288. c.flush()
  289. case <-c.chStatsTimer.C:
  290. for _, channel := range c.channels {
  291. channel.updateStats()
  292. }
  293. case <-c.pingTimer.C:
  294. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
  295. if err != nil {
  296. c.logger.Error("Failed to send PacketPing", "err", err)
  297. break SELECTION
  298. }
  299. c.sendMonitor.Update(_n)
  300. c.flush()
  301. case <-c.pong:
  302. _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
  303. if err != nil {
  304. c.logger.Error("Failed to send PacketPong", "err", err)
  305. break SELECTION
  306. }
  307. c.sendMonitor.Update(_n)
  308. c.flush()
  309. case <-ctx.Done():
  310. break FOR_LOOP
  311. case <-c.quitSendRoutine:
  312. break FOR_LOOP
  313. case <-pongTimeout.C:
  314. // the point of the pong timer is to check to
  315. // see if we've seen a message recently, so we
  316. // want to make sure that we escape this
  317. // select statement on an interval to ensure
  318. // that we avoid hanging on to dead
  319. // connections for too long.
  320. break SELECTION
  321. case <-c.send:
  322. // Send some PacketMsgs
  323. eof := c.sendSomePacketMsgs(ctx)
  324. if !eof {
  325. // Keep sendRoutine awake.
  326. select {
  327. case c.send <- struct{}{}:
  328. default:
  329. }
  330. }
  331. }
  332. if time.Since(c.getLastMessageAt()) > c.config.PongTimeout {
  333. err = errors.New("pong timeout")
  334. }
  335. if err != nil {
  336. c.logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
  337. c.stopForError(ctx, err)
  338. break FOR_LOOP
  339. }
  340. if !c.IsRunning() {
  341. break FOR_LOOP
  342. }
  343. }
  344. // Cleanup
  345. close(c.doneSendRoutine)
  346. }
  347. // Returns true if messages from channels were exhausted.
  348. // Blocks in accordance to .sendMonitor throttling.
  349. func (c *MConnection) sendSomePacketMsgs(ctx context.Context) bool {
  350. // Block until .sendMonitor says we can write.
  351. // Once we're ready we send more than we asked for,
  352. // but amortized it should even out.
  353. c.sendMonitor.Limit(c._maxPacketMsgSize, c.config.SendRate, true)
  354. // Now send some PacketMsgs.
  355. for i := 0; i < numBatchPacketMsgs; i++ {
  356. if c.sendPacketMsg(ctx) {
  357. return true
  358. }
  359. }
  360. return false
  361. }
  362. // Returns true if messages from channels were exhausted.
  363. func (c *MConnection) sendPacketMsg(ctx context.Context) bool {
  364. // Choose a channel to create a PacketMsg from.
  365. // The chosen channel will be the one whose recentlySent/priority is the least.
  366. var leastRatio float32 = math.MaxFloat32
  367. var leastChannel *channel
  368. for _, channel := range c.channels {
  369. // If nothing to send, skip this channel
  370. if !channel.isSendPending() {
  371. continue
  372. }
  373. // Get ratio, and keep track of lowest ratio.
  374. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
  375. if ratio < leastRatio {
  376. leastRatio = ratio
  377. leastChannel = channel
  378. }
  379. }
  380. // Nothing to send?
  381. if leastChannel == nil {
  382. return true
  383. }
  384. // c.logger.Info("Found a msgPacket to send")
  385. // Make & send a PacketMsg from this channel
  386. _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
  387. if err != nil {
  388. c.logger.Error("Failed to write PacketMsg", "err", err)
  389. c.stopForError(ctx, err)
  390. return true
  391. }
  392. c.sendMonitor.Update(_n)
  393. c.flushTimer.Set()
  394. return false
  395. }
  396. // recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
  397. // After a whole message has been assembled, it's pushed to onReceive().
  398. // Blocks depending on how the connection is throttled.
  399. // Otherwise, it never blocks.
  400. func (c *MConnection) recvRoutine(ctx context.Context) {
  401. defer c._recover(ctx)
  402. protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)
  403. FOR_LOOP:
  404. for {
  405. select {
  406. case <-ctx.Done():
  407. break FOR_LOOP
  408. case <-c.doneSendRoutine:
  409. break FOR_LOOP
  410. default:
  411. }
  412. // Block until .recvMonitor says we can read.
  413. c.recvMonitor.Limit(c._maxPacketMsgSize, c.config.RecvRate, true)
  414. // Peek into bufConnReader for debugging
  415. /*
  416. if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
  417. bz, err := c.bufConnReader.Peek(tmmath.MinInt(numBytes, 100))
  418. if err == nil {
  419. // return
  420. } else {
  421. c.logger.Debug("error peeking connection buffer", "err", err)
  422. // return nil
  423. }
  424. c.logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
  425. }
  426. */
  427. // Read packet type
  428. var packet tmp2p.Packet
  429. _n, err := protoReader.ReadMsg(&packet)
  430. c.recvMonitor.Update(_n)
  431. if err != nil {
  432. // stopServices was invoked and we are shutting down
  433. // receiving is excpected to fail since we will close the connection
  434. select {
  435. case <-ctx.Done():
  436. case <-c.quitRecvRoutine:
  437. break FOR_LOOP
  438. default:
  439. }
  440. if c.IsRunning() {
  441. if err == io.EOF {
  442. c.logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c)
  443. } else {
  444. c.logger.Debug("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
  445. }
  446. c.stopForError(ctx, err)
  447. }
  448. break FOR_LOOP
  449. }
  450. // record for pong/heartbeat
  451. c.setRecvLastMsgAt(time.Now())
  452. // Read more depending on packet type.
  453. switch pkt := packet.Sum.(type) {
  454. case *tmp2p.Packet_PacketPing:
  455. // TODO: prevent abuse, as they cause flush()'s.
  456. // https://github.com/tendermint/tendermint/issues/1190
  457. select {
  458. case c.pong <- struct{}{}:
  459. default:
  460. // never block
  461. }
  462. case *tmp2p.Packet_PacketPong:
  463. // do nothing, we updated the "last message
  464. // received" timestamp above, so we can ignore
  465. // this message
  466. case *tmp2p.Packet_PacketMsg:
  467. channelID := ChannelID(pkt.PacketMsg.ChannelID)
  468. channel, ok := c.channelsIdx[channelID]
  469. if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil {
  470. err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
  471. c.logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
  472. c.stopForError(ctx, err)
  473. break FOR_LOOP
  474. }
  475. msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
  476. if err != nil {
  477. if c.IsRunning() {
  478. c.logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
  479. c.stopForError(ctx, err)
  480. }
  481. break FOR_LOOP
  482. }
  483. if msgBytes != nil {
  484. c.logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes)
  485. // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
  486. c.onReceive(ctx, channelID, msgBytes)
  487. }
  488. default:
  489. err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
  490. c.logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
  491. c.stopForError(ctx, err)
  492. break FOR_LOOP
  493. }
  494. }
  495. // Cleanup
  496. close(c.pong)
  497. for range c.pong {
  498. // Drain
  499. }
  500. }
  501. // maxPacketMsgSize returns a maximum size of PacketMsg
  502. func (c *MConnection) maxPacketMsgSize() int {
  503. bz, err := proto.Marshal(mustWrapPacket(&tmp2p.PacketMsg{
  504. ChannelID: 0x01,
  505. EOF: true,
  506. Data: make([]byte, c.config.MaxPacketMsgPayloadSize),
  507. }))
  508. if err != nil {
  509. panic(err)
  510. }
  511. return len(bz)
  512. }
  513. type ChannelStatus struct {
  514. ID byte
  515. SendQueueCapacity int
  516. SendQueueSize int
  517. Priority int
  518. RecentlySent int64
  519. }
  520. //-----------------------------------------------------------------------------
  521. // ChannelID is an arbitrary channel ID.
  522. type ChannelID uint16
  523. type ChannelDescriptor struct {
  524. ID ChannelID
  525. Priority int
  526. MessageType proto.Message
  527. // TODO: Remove once p2p refactor is complete.
  528. SendQueueCapacity int
  529. RecvMessageCapacity int
  530. // RecvBufferCapacity defines the max buffer size of inbound messages for a
  531. // given p2p Channel queue.
  532. RecvBufferCapacity int
  533. }
  534. func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
  535. if chDesc.SendQueueCapacity == 0 {
  536. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  537. }
  538. if chDesc.RecvBufferCapacity == 0 {
  539. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  540. }
  541. if chDesc.RecvMessageCapacity == 0 {
  542. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  543. }
  544. filled = chDesc
  545. return
  546. }
  547. // NOTE: not goroutine-safe.
  548. type channel struct {
  549. // Exponential moving average.
  550. // This field must be accessed atomically.
  551. // It is first in the struct to ensure correct alignment.
  552. // See https://github.com/tendermint/tendermint/issues/7000.
  553. recentlySent int64
  554. conn *MConnection
  555. desc ChannelDescriptor
  556. sendQueue chan []byte
  557. sendQueueSize int32 // atomic.
  558. recving []byte
  559. sending []byte
  560. maxPacketMsgPayloadSize int
  561. logger log.Logger
  562. }
  563. func newChannel(conn *MConnection, desc ChannelDescriptor) *channel {
  564. desc = desc.FillDefaults()
  565. if desc.Priority <= 0 {
  566. panic("Channel default priority must be a positive integer")
  567. }
  568. return &channel{
  569. conn: conn,
  570. desc: desc,
  571. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  572. recving: make([]byte, 0, desc.RecvBufferCapacity),
  573. maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
  574. logger: conn.logger,
  575. }
  576. }
  577. // Queues message to send to this channel.
  578. // Goroutine-safe
  579. // Times out (and returns false) after defaultSendTimeout
  580. func (ch *channel) sendBytes(bytes []byte) bool {
  581. select {
  582. case ch.sendQueue <- bytes:
  583. atomic.AddInt32(&ch.sendQueueSize, 1)
  584. return true
  585. case <-time.After(defaultSendTimeout):
  586. return false
  587. }
  588. }
  589. // Returns true if any PacketMsgs are pending to be sent.
  590. // Call before calling nextPacketMsg()
  591. // Goroutine-safe
  592. func (ch *channel) isSendPending() bool {
  593. if len(ch.sending) == 0 {
  594. if len(ch.sendQueue) == 0 {
  595. return false
  596. }
  597. ch.sending = <-ch.sendQueue
  598. }
  599. return true
  600. }
  601. // Creates a new PacketMsg to send.
  602. // Not goroutine-safe
  603. func (ch *channel) nextPacketMsg() tmp2p.PacketMsg {
  604. packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)}
  605. maxSize := ch.maxPacketMsgPayloadSize
  606. packet.Data = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))]
  607. if len(ch.sending) <= maxSize {
  608. packet.EOF = true
  609. ch.sending = nil
  610. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  611. } else {
  612. packet.EOF = false
  613. ch.sending = ch.sending[tmmath.MinInt(maxSize, len(ch.sending)):]
  614. }
  615. return packet
  616. }
  617. // Writes next PacketMsg to w and updates c.recentlySent.
  618. // Not goroutine-safe
  619. func (ch *channel) writePacketMsgTo(w io.Writer) (n int, err error) {
  620. packet := ch.nextPacketMsg()
  621. n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet))
  622. atomic.AddInt64(&ch.recentlySent, int64(n))
  623. return
  624. }
  625. // Handles incoming PacketMsgs. It returns a message bytes if message is
  626. // complete, which is owned by the caller and will not be modified.
  627. // Not goroutine-safe
  628. func (ch *channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
  629. ch.logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
  630. var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data)
  631. if recvCap < recvReceived {
  632. return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived)
  633. }
  634. ch.recving = append(ch.recving, packet.Data...)
  635. if packet.EOF {
  636. msgBytes := ch.recving
  637. ch.recving = make([]byte, 0, ch.desc.RecvBufferCapacity)
  638. return msgBytes, nil
  639. }
  640. return nil, nil
  641. }
  642. // Call this periodically to update stats for throttling purposes.
  643. // Not goroutine-safe
  644. func (ch *channel) updateStats() {
  645. // Exponential decay of stats.
  646. // TODO: optimize.
  647. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
  648. }
  649. //----------------------------------------
  650. // Packet
  651. // mustWrapPacket takes a packet kind (oneof) and wraps it in a tmp2p.Packet message.
  652. func mustWrapPacket(pb proto.Message) *tmp2p.Packet {
  653. var msg tmp2p.Packet
  654. switch pb := pb.(type) {
  655. case *tmp2p.Packet: // already a packet
  656. msg = *pb
  657. case *tmp2p.PacketPing:
  658. msg = tmp2p.Packet{
  659. Sum: &tmp2p.Packet_PacketPing{
  660. PacketPing: pb,
  661. },
  662. }
  663. case *tmp2p.PacketPong:
  664. msg = tmp2p.Packet{
  665. Sum: &tmp2p.Packet_PacketPong{
  666. PacketPong: pb,
  667. },
  668. }
  669. case *tmp2p.PacketMsg:
  670. msg = tmp2p.Packet{
  671. Sum: &tmp2p.Packet_PacketMsg{
  672. PacketMsg: pb,
  673. },
  674. }
  675. default:
  676. panic(fmt.Errorf("unknown packet type %T", pb))
  677. }
  678. return &msg
  679. }