You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

681 lines
18 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. cmn "github.com/tendermint/go-common"
  12. flow "github.com/tendermint/go-flowrate/flowrate"
  13. wire "github.com/tendermint/go-wire"
  14. )
  15. const (
  16. numBatchMsgPackets = 10
  17. minReadBufferSize = 1024
  18. minWriteBufferSize = 65536
  19. updateState = 2 * time.Second
  20. pingTimeout = 40 * time.Second
  21. flushThrottle = 100 * time.Millisecond
  22. defaultSendQueueCapacity = 1
  23. defaultSendRate = int64(512000) // 500KB/s
  24. defaultRecvBufferCapacity = 4096
  25. defaultRecvMessageCapacity = 22020096 // 21MB
  26. defaultRecvRate = int64(512000) // 500KB/s
  27. defaultSendTimeout = 10 * time.Second
  28. )
  29. type receiveCbFunc func(chID byte, msgBytes []byte)
  30. type errorCbFunc func(interface{})
  31. /*
  32. Each peer has one `MConnection` (multiplex connection) instance.
  33. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  34. several messages along a single channel of communication.
  35. Each `MConnection` handles message transmission on multiple abstract communication
  36. `Channel`s. Each channel has a globally unique byte id.
  37. The byte id and the relative priorities of each `Channel` are configured upon
  38. initialization of the connection.
  39. There are two methods for sending messages:
  40. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  41. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  42. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  43. for the channel with the given id byte `chID`, or until the request times out.
  44. The message `msg` is serialized using the `tendermint/wire` submodule's
  45. `WriteBinary()` reflection routine.
  46. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  47. queue is full.
  48. Inbound message bytes are handled with an onReceive callback function.
  49. */
  50. type MConnection struct {
  51. cmn.BaseService
  52. conn net.Conn
  53. bufReader *bufio.Reader
  54. bufWriter *bufio.Writer
  55. sendMonitor *flow.Monitor
  56. recvMonitor *flow.Monitor
  57. send chan struct{}
  58. pong chan struct{}
  59. channels []*Channel
  60. channelsIdx map[byte]*Channel
  61. onReceive receiveCbFunc
  62. onError errorCbFunc
  63. errored uint32
  64. config *MConnectionConfig
  65. quit chan struct{}
  66. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  67. pingTimer *cmn.RepeatTimer // send pings periodically
  68. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  69. LocalAddress *NetAddress
  70. RemoteAddress *NetAddress
  71. }
  72. // MConnectionConfig is a MConnection configuration
  73. type MConnectionConfig struct {
  74. SendRate int64
  75. RecvRate int64
  76. }
  77. // NewMConnection wraps net.Conn and creates multiplex connection
  78. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  79. return NewMConnectionWithConfig(
  80. conn,
  81. chDescs,
  82. onReceive,
  83. onError,
  84. &MConnectionConfig{
  85. SendRate: defaultSendRate,
  86. RecvRate: defaultRecvRate,
  87. })
  88. }
  89. // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
  90. func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnectionConfig) *MConnection {
  91. mconn := &MConnection{
  92. conn: conn,
  93. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  94. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  95. sendMonitor: flow.New(0, 0),
  96. recvMonitor: flow.New(0, 0),
  97. send: make(chan struct{}, 1),
  98. pong: make(chan struct{}),
  99. onReceive: onReceive,
  100. onError: onError,
  101. config: config,
  102. LocalAddress: NewNetAddress(conn.LocalAddr()),
  103. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  104. }
  105. // Create channels
  106. var channelsIdx = map[byte]*Channel{}
  107. var channels = []*Channel{}
  108. for _, desc := range chDescs {
  109. descCopy := *desc // copy the desc else unsafe access across connections
  110. channel := newChannel(mconn, &descCopy)
  111. channelsIdx[channel.id] = channel
  112. channels = append(channels, channel)
  113. }
  114. mconn.channels = channels
  115. mconn.channelsIdx = channelsIdx
  116. mconn.BaseService = *cmn.NewBaseService(log, "MConnection", mconn)
  117. return mconn
  118. }
  119. func (c *MConnection) OnStart() error {
  120. c.BaseService.OnStart()
  121. c.quit = make(chan struct{})
  122. c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
  123. c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
  124. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
  125. go c.sendRoutine()
  126. go c.recvRoutine()
  127. return nil
  128. }
  129. func (c *MConnection) OnStop() {
  130. c.BaseService.OnStop()
  131. c.flushTimer.Stop()
  132. c.pingTimer.Stop()
  133. c.chStatsTimer.Stop()
  134. if c.quit != nil {
  135. close(c.quit)
  136. }
  137. c.conn.Close()
  138. // We can't close pong safely here because
  139. // recvRoutine may write to it after we've stopped.
  140. // Though it doesn't need to get closed at all,
  141. // we close it @ recvRoutine.
  142. // close(c.pong)
  143. }
  144. func (c *MConnection) String() string {
  145. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  146. }
  147. func (c *MConnection) flush() {
  148. log.Debug("Flush", "conn", c)
  149. err := c.bufWriter.Flush()
  150. if err != nil {
  151. log.Warn("MConnection flush failed", "error", err)
  152. }
  153. }
  154. // Catch panics, usually caused by remote disconnects.
  155. func (c *MConnection) _recover() {
  156. if r := recover(); r != nil {
  157. stack := debug.Stack()
  158. err := cmn.StackError{r, stack}
  159. c.stopForError(err)
  160. }
  161. }
  162. func (c *MConnection) stopForError(r interface{}) {
  163. c.Stop()
  164. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  165. if c.onError != nil {
  166. c.onError(r)
  167. }
  168. }
  169. }
  170. // Queues a message to be sent to channel.
  171. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  172. if !c.IsRunning() {
  173. return false
  174. }
  175. log.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  176. // Send message to channel.
  177. channel, ok := c.channelsIdx[chID]
  178. if !ok {
  179. log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  180. return false
  181. }
  182. success := channel.sendBytes(wire.BinaryBytes(msg))
  183. if success {
  184. // Wake up sendRoutine if necessary
  185. select {
  186. case c.send <- struct{}{}:
  187. default:
  188. }
  189. } else {
  190. log.Warn("Send failed", "channel", chID, "conn", c, "msg", msg)
  191. }
  192. return success
  193. }
  194. // Queues a message to be sent to channel.
  195. // Nonblocking, returns true if successful.
  196. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  197. if !c.IsRunning() {
  198. return false
  199. }
  200. log.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  201. // Send message to channel.
  202. channel, ok := c.channelsIdx[chID]
  203. if !ok {
  204. log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  205. return false
  206. }
  207. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  208. if ok {
  209. // Wake up sendRoutine if necessary
  210. select {
  211. case c.send <- struct{}{}:
  212. default:
  213. }
  214. }
  215. return ok
  216. }
  217. func (c *MConnection) CanSend(chID byte) bool {
  218. if !c.IsRunning() {
  219. return false
  220. }
  221. channel, ok := c.channelsIdx[chID]
  222. if !ok {
  223. log.Error(cmn.Fmt("Unknown channel %X", chID))
  224. return false
  225. }
  226. return channel.canSend()
  227. }
  228. // sendRoutine polls for packets to send from channels.
  229. func (c *MConnection) sendRoutine() {
  230. defer c._recover()
  231. FOR_LOOP:
  232. for {
  233. var n int
  234. var err error
  235. select {
  236. case <-c.flushTimer.Ch:
  237. // NOTE: flushTimer.Set() must be called every time
  238. // something is written to .bufWriter.
  239. c.flush()
  240. case <-c.chStatsTimer.Ch:
  241. for _, channel := range c.channels {
  242. channel.updateStats()
  243. }
  244. case <-c.pingTimer.Ch:
  245. log.Debug("Send Ping")
  246. wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  247. c.sendMonitor.Update(int(n))
  248. c.flush()
  249. case <-c.pong:
  250. log.Debug("Send Pong")
  251. wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  252. c.sendMonitor.Update(int(n))
  253. c.flush()
  254. case <-c.quit:
  255. break FOR_LOOP
  256. case <-c.send:
  257. // Send some msgPackets
  258. eof := c.sendSomeMsgPackets()
  259. if !eof {
  260. // Keep sendRoutine awake.
  261. select {
  262. case c.send <- struct{}{}:
  263. default:
  264. }
  265. }
  266. }
  267. if !c.IsRunning() {
  268. break FOR_LOOP
  269. }
  270. if err != nil {
  271. log.Warn("Connection failed @ sendRoutine", "conn", c, "error", err)
  272. c.stopForError(err)
  273. break FOR_LOOP
  274. }
  275. }
  276. // Cleanup
  277. }
  278. // Returns true if messages from channels were exhausted.
  279. // Blocks in accordance to .sendMonitor throttling.
  280. func (c *MConnection) sendSomeMsgPackets() bool {
  281. // Block until .sendMonitor says we can write.
  282. // Once we're ready we send more than we asked for,
  283. // but amortized it should even out.
  284. c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
  285. // Now send some msgPackets.
  286. for i := 0; i < numBatchMsgPackets; i++ {
  287. if c.sendMsgPacket() {
  288. return true
  289. }
  290. }
  291. return false
  292. }
  293. // Returns true if messages from channels were exhausted.
  294. func (c *MConnection) sendMsgPacket() bool {
  295. // Choose a channel to create a msgPacket from.
  296. // The chosen channel will be the one whose recentlySent/priority is the least.
  297. var leastRatio float32 = math.MaxFloat32
  298. var leastChannel *Channel
  299. for _, channel := range c.channels {
  300. // If nothing to send, skip this channel
  301. if !channel.isSendPending() {
  302. continue
  303. }
  304. // Get ratio, and keep track of lowest ratio.
  305. ratio := float32(channel.recentlySent) / float32(channel.priority)
  306. if ratio < leastRatio {
  307. leastRatio = ratio
  308. leastChannel = channel
  309. }
  310. }
  311. // Nothing to send?
  312. if leastChannel == nil {
  313. return true
  314. } else {
  315. // log.Info("Found a msgPacket to send")
  316. }
  317. // Make & send a msgPacket from this channel
  318. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  319. if err != nil {
  320. log.Warn("Failed to write msgPacket", "error", err)
  321. c.stopForError(err)
  322. return true
  323. }
  324. c.sendMonitor.Update(int(n))
  325. c.flushTimer.Set()
  326. return false
  327. }
  328. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  329. // After a whole message has been assembled, it's pushed to onReceive().
  330. // Blocks depending on how the connection is throttled.
  331. func (c *MConnection) recvRoutine() {
  332. defer c._recover()
  333. FOR_LOOP:
  334. for {
  335. // Block until .recvMonitor says we can read.
  336. c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
  337. /*
  338. // Peek into bufReader for debugging
  339. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  340. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  341. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  342. if err == nil {
  343. return bytes
  344. } else {
  345. log.Warn("Error peeking connection buffer", "error", err)
  346. return nil
  347. }
  348. }})
  349. }
  350. */
  351. // Read packet type
  352. var n int
  353. var err error
  354. pktType := wire.ReadByte(c.bufReader, &n, &err)
  355. c.recvMonitor.Update(int(n))
  356. if err != nil {
  357. if c.IsRunning() {
  358. log.Warn("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err)
  359. c.stopForError(err)
  360. }
  361. break FOR_LOOP
  362. }
  363. // Read more depending on packet type.
  364. switch pktType {
  365. case packetTypePing:
  366. // TODO: prevent abuse, as they cause flush()'s.
  367. log.Debug("Receive Ping")
  368. c.pong <- struct{}{}
  369. case packetTypePong:
  370. // do nothing
  371. log.Debug("Receive Pong")
  372. case packetTypeMsg:
  373. pkt, n, err := msgPacket{}, int(0), error(nil)
  374. wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
  375. c.recvMonitor.Update(int(n))
  376. if err != nil {
  377. if c.IsRunning() {
  378. log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
  379. c.stopForError(err)
  380. }
  381. break FOR_LOOP
  382. }
  383. channel, ok := c.channelsIdx[pkt.ChannelID]
  384. if !ok || channel == nil {
  385. cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
  386. }
  387. msgBytes, err := channel.recvMsgPacket(pkt)
  388. if err != nil {
  389. if c.IsRunning() {
  390. log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
  391. c.stopForError(err)
  392. }
  393. break FOR_LOOP
  394. }
  395. if msgBytes != nil {
  396. log.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  397. c.onReceive(pkt.ChannelID, msgBytes)
  398. }
  399. default:
  400. cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
  401. }
  402. // TODO: shouldn't this go in the sendRoutine?
  403. // Better to send a ping packet when *we* haven't sent anything for a while.
  404. c.pingTimer.Reset()
  405. }
  406. // Cleanup
  407. close(c.pong)
  408. for _ = range c.pong {
  409. // Drain
  410. }
  411. }
  412. type ConnectionStatus struct {
  413. SendMonitor flow.Status
  414. RecvMonitor flow.Status
  415. Channels []ChannelStatus
  416. }
  417. type ChannelStatus struct {
  418. ID byte
  419. SendQueueCapacity int
  420. SendQueueSize int
  421. Priority int
  422. RecentlySent int64
  423. }
  424. func (c *MConnection) Status() ConnectionStatus {
  425. var status ConnectionStatus
  426. status.SendMonitor = c.sendMonitor.Status()
  427. status.RecvMonitor = c.recvMonitor.Status()
  428. status.Channels = make([]ChannelStatus, len(c.channels))
  429. for i, channel := range c.channels {
  430. status.Channels[i] = ChannelStatus{
  431. ID: channel.id,
  432. SendQueueCapacity: cap(channel.sendQueue),
  433. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  434. Priority: channel.priority,
  435. RecentlySent: channel.recentlySent,
  436. }
  437. }
  438. return status
  439. }
  440. //-----------------------------------------------------------------------------
  441. type ChannelDescriptor struct {
  442. ID byte
  443. Priority int
  444. SendQueueCapacity int
  445. RecvBufferCapacity int
  446. RecvMessageCapacity int
  447. }
  448. func (chDesc *ChannelDescriptor) FillDefaults() {
  449. if chDesc.SendQueueCapacity == 0 {
  450. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  451. }
  452. if chDesc.RecvBufferCapacity == 0 {
  453. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  454. }
  455. if chDesc.RecvMessageCapacity == 0 {
  456. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  457. }
  458. }
  459. // TODO: lowercase.
  460. // NOTE: not goroutine-safe.
  461. type Channel struct {
  462. conn *MConnection
  463. desc *ChannelDescriptor
  464. id byte
  465. sendQueue chan []byte
  466. sendQueueSize int32 // atomic.
  467. recving []byte
  468. sending []byte
  469. priority int
  470. recentlySent int64 // exponential moving average
  471. }
  472. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  473. desc.FillDefaults()
  474. if desc.Priority <= 0 {
  475. cmn.PanicSanity("Channel default priority must be a postive integer")
  476. }
  477. return &Channel{
  478. conn: conn,
  479. desc: desc,
  480. id: desc.ID,
  481. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  482. recving: make([]byte, 0, desc.RecvBufferCapacity),
  483. priority: desc.Priority,
  484. }
  485. }
  486. // Queues message to send to this channel.
  487. // Goroutine-safe
  488. // Times out (and returns false) after defaultSendTimeout
  489. func (ch *Channel) sendBytes(bytes []byte) bool {
  490. timeout := time.NewTimer(defaultSendTimeout)
  491. select {
  492. case <-timeout.C:
  493. // timeout
  494. return false
  495. case ch.sendQueue <- bytes:
  496. atomic.AddInt32(&ch.sendQueueSize, 1)
  497. return true
  498. }
  499. }
  500. // Queues message to send to this channel.
  501. // Nonblocking, returns true if successful.
  502. // Goroutine-safe
  503. func (ch *Channel) trySendBytes(bytes []byte) bool {
  504. select {
  505. case ch.sendQueue <- bytes:
  506. atomic.AddInt32(&ch.sendQueueSize, 1)
  507. return true
  508. default:
  509. return false
  510. }
  511. }
  512. // Goroutine-safe
  513. func (ch *Channel) loadSendQueueSize() (size int) {
  514. return int(atomic.LoadInt32(&ch.sendQueueSize))
  515. }
  516. // Goroutine-safe
  517. // Use only as a heuristic.
  518. func (ch *Channel) canSend() bool {
  519. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  520. }
  521. // Returns true if any msgPackets are pending to be sent.
  522. // Call before calling nextMsgPacket()
  523. // Goroutine-safe
  524. func (ch *Channel) isSendPending() bool {
  525. if len(ch.sending) == 0 {
  526. if len(ch.sendQueue) == 0 {
  527. return false
  528. }
  529. ch.sending = <-ch.sendQueue
  530. }
  531. return true
  532. }
  533. // Creates a new msgPacket to send.
  534. // Not goroutine-safe
  535. func (ch *Channel) nextMsgPacket() msgPacket {
  536. packet := msgPacket{}
  537. packet.ChannelID = byte(ch.id)
  538. packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
  539. if len(ch.sending) <= maxMsgPacketPayloadSize {
  540. packet.EOF = byte(0x01)
  541. ch.sending = nil
  542. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  543. } else {
  544. packet.EOF = byte(0x00)
  545. ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
  546. }
  547. return packet
  548. }
  549. // Writes next msgPacket to w.
  550. // Not goroutine-safe
  551. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  552. packet := ch.nextMsgPacket()
  553. log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  554. wire.WriteByte(packetTypeMsg, w, &n, &err)
  555. wire.WriteBinary(packet, w, &n, &err)
  556. if err == nil {
  557. ch.recentlySent += int64(n)
  558. }
  559. return
  560. }
  561. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  562. // Not goroutine-safe
  563. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  564. // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  565. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  566. return nil, wire.ErrBinaryReadOverflow
  567. }
  568. ch.recving = append(ch.recving, packet.Bytes...)
  569. if packet.EOF == byte(0x01) {
  570. msgBytes := ch.recving
  571. // clear the slice without re-allocating.
  572. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  573. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  574. // at which point the recving slice stops being used and should be garbage collected
  575. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  576. return msgBytes, nil
  577. }
  578. return nil, nil
  579. }
  580. // Call this periodically to update stats for throttling purposes.
  581. // Not goroutine-safe
  582. func (ch *Channel) updateStats() {
  583. // Exponential decay of stats.
  584. // TODO: optimize.
  585. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  586. }
  587. //-----------------------------------------------------------------------------
  588. const (
  589. maxMsgPacketPayloadSize = 1024
  590. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  591. maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  592. packetTypePing = byte(0x01)
  593. packetTypePong = byte(0x02)
  594. packetTypeMsg = byte(0x03)
  595. )
  596. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  597. type msgPacket struct {
  598. ChannelID byte
  599. EOF byte // 1 means message ends here.
  600. Bytes []byte
  601. }
  602. func (p msgPacket) String() string {
  603. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  604. }