You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

668 lines
18 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. cmn "github.com/tendermint/go-common"
  12. cfg "github.com/tendermint/go-config"
  13. flow "github.com/tendermint/go-flowrate/flowrate"
  14. wire "github.com/tendermint/go-wire"
  15. )
  16. const (
  17. numBatchMsgPackets = 10
  18. minReadBufferSize = 1024
  19. minWriteBufferSize = 65536
  20. updateState = 2 * time.Second
  21. pingTimeout = 40 * time.Second
  22. flushThrottle = 100 * time.Millisecond
  23. defaultSendQueueCapacity = 1
  24. defaultRecvBufferCapacity = 4096
  25. defaultRecvMessageCapacity = 22020096 // 21MB
  26. defaultSendTimeout = 10 * time.Second
  27. )
  28. type receiveCbFunc func(chID byte, msgBytes []byte)
  29. type errorCbFunc func(interface{})
  30. /*
  31. Each peer has one `MConnection` (multiplex connection) instance.
  32. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  33. several messages along a single channel of communication.
  34. Each `MConnection` handles message transmission on multiple abstract communication
  35. `Channel`s. Each channel has a globally unique byte id.
  36. The byte id and the relative priorities of each `Channel` are configured upon
  37. initialization of the connection.
  38. There are two methods for sending messages:
  39. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  40. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  41. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  42. for the channel with the given id byte `chID`, or until the request times out.
  43. The message `msg` is serialized using the `tendermint/wire` submodule's
  44. `WriteBinary()` reflection routine.
  45. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  46. queue is full.
  47. Inbound message bytes are handled with an onReceive callback function.
  48. */
  49. type MConnection struct {
  50. cmn.BaseService
  51. conn net.Conn
  52. bufReader *bufio.Reader
  53. bufWriter *bufio.Writer
  54. sendMonitor *flow.Monitor
  55. recvMonitor *flow.Monitor
  56. sendRate int64
  57. recvRate int64
  58. send chan struct{}
  59. pong chan struct{}
  60. channels []*Channel
  61. channelsIdx map[byte]*Channel
  62. onReceive receiveCbFunc
  63. onError errorCbFunc
  64. errored uint32
  65. quit chan struct{}
  66. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  67. pingTimer *cmn.RepeatTimer // send pings periodically
  68. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  69. LocalAddress *NetAddress
  70. RemoteAddress *NetAddress
  71. }
  72. func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  73. mconn := &MConnection{
  74. conn: conn,
  75. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  76. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  77. sendMonitor: flow.New(0, 0),
  78. recvMonitor: flow.New(0, 0),
  79. sendRate: int64(config.GetInt(configKeySendRate)),
  80. recvRate: int64(config.GetInt(configKeyRecvRate)),
  81. send: make(chan struct{}, 1),
  82. pong: make(chan struct{}),
  83. onReceive: onReceive,
  84. onError: onError,
  85. // Initialized in Start()
  86. quit: nil,
  87. flushTimer: nil,
  88. pingTimer: nil,
  89. chStatsTimer: nil,
  90. LocalAddress: NewNetAddress(conn.LocalAddr()),
  91. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  92. }
  93. // Create channels
  94. var channelsIdx = map[byte]*Channel{}
  95. var channels = []*Channel{}
  96. for _, desc := range chDescs {
  97. descCopy := *desc // copy the desc else unsafe access across connections
  98. channel := newChannel(mconn, &descCopy)
  99. channelsIdx[channel.id] = channel
  100. channels = append(channels, channel)
  101. }
  102. mconn.channels = channels
  103. mconn.channelsIdx = channelsIdx
  104. mconn.BaseService = *cmn.NewBaseService(log, "MConnection", mconn)
  105. return mconn
  106. }
  107. func (c *MConnection) OnStart() error {
  108. c.BaseService.OnStart()
  109. c.quit = make(chan struct{})
  110. c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
  111. c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
  112. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
  113. go c.sendRoutine()
  114. go c.recvRoutine()
  115. return nil
  116. }
  117. func (c *MConnection) OnStop() {
  118. c.BaseService.OnStop()
  119. c.flushTimer.Stop()
  120. c.pingTimer.Stop()
  121. c.chStatsTimer.Stop()
  122. if c.quit != nil {
  123. close(c.quit)
  124. }
  125. c.conn.Close()
  126. // We can't close pong safely here because
  127. // recvRoutine may write to it after we've stopped.
  128. // Though it doesn't need to get closed at all,
  129. // we close it @ recvRoutine.
  130. // close(c.pong)
  131. }
  132. func (c *MConnection) String() string {
  133. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  134. }
  135. func (c *MConnection) flush() {
  136. log.Debug("Flush", "conn", c)
  137. err := c.bufWriter.Flush()
  138. if err != nil {
  139. log.Warn("MConnection flush failed", "error", err)
  140. }
  141. }
  142. // Catch panics, usually caused by remote disconnects.
  143. func (c *MConnection) _recover() {
  144. if r := recover(); r != nil {
  145. stack := debug.Stack()
  146. err := cmn.StackError{r, stack}
  147. c.stopForError(err)
  148. }
  149. }
  150. func (c *MConnection) stopForError(r interface{}) {
  151. c.Stop()
  152. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  153. if c.onError != nil {
  154. c.onError(r)
  155. }
  156. }
  157. }
  158. // Queues a message to be sent to channel.
  159. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  160. if !c.IsRunning() {
  161. return false
  162. }
  163. log.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  164. // Send message to channel.
  165. channel, ok := c.channelsIdx[chID]
  166. if !ok {
  167. log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  168. return false
  169. }
  170. success := channel.sendBytes(wire.BinaryBytes(msg))
  171. if success {
  172. // Wake up sendRoutine if necessary
  173. select {
  174. case c.send <- struct{}{}:
  175. default:
  176. }
  177. } else {
  178. log.Warn("Send failed", "channel", chID, "conn", c, "msg", msg)
  179. }
  180. return success
  181. }
  182. // Queues a message to be sent to channel.
  183. // Nonblocking, returns true if successful.
  184. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  185. if !c.IsRunning() {
  186. return false
  187. }
  188. log.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  189. // Send message to channel.
  190. channel, ok := c.channelsIdx[chID]
  191. if !ok {
  192. log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  193. return false
  194. }
  195. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  196. if ok {
  197. // Wake up sendRoutine if necessary
  198. select {
  199. case c.send <- struct{}{}:
  200. default:
  201. }
  202. }
  203. return ok
  204. }
  205. func (c *MConnection) CanSend(chID byte) bool {
  206. if !c.IsRunning() {
  207. return false
  208. }
  209. channel, ok := c.channelsIdx[chID]
  210. if !ok {
  211. log.Error(cmn.Fmt("Unknown channel %X", chID))
  212. return false
  213. }
  214. return channel.canSend()
  215. }
  216. // sendRoutine polls for packets to send from channels.
  217. func (c *MConnection) sendRoutine() {
  218. defer c._recover()
  219. FOR_LOOP:
  220. for {
  221. var n int
  222. var err error
  223. select {
  224. case <-c.flushTimer.Ch:
  225. // NOTE: flushTimer.Set() must be called every time
  226. // something is written to .bufWriter.
  227. c.flush()
  228. case <-c.chStatsTimer.Ch:
  229. for _, channel := range c.channels {
  230. channel.updateStats()
  231. }
  232. case <-c.pingTimer.Ch:
  233. log.Debug("Send Ping")
  234. wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  235. c.sendMonitor.Update(int(n))
  236. c.flush()
  237. case <-c.pong:
  238. log.Debug("Send Pong")
  239. wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  240. c.sendMonitor.Update(int(n))
  241. c.flush()
  242. case <-c.quit:
  243. break FOR_LOOP
  244. case <-c.send:
  245. // Send some msgPackets
  246. eof := c.sendSomeMsgPackets()
  247. if !eof {
  248. // Keep sendRoutine awake.
  249. select {
  250. case c.send <- struct{}{}:
  251. default:
  252. }
  253. }
  254. }
  255. if !c.IsRunning() {
  256. break FOR_LOOP
  257. }
  258. if err != nil {
  259. log.Warn("Connection failed @ sendRoutine", "conn", c, "error", err)
  260. c.stopForError(err)
  261. break FOR_LOOP
  262. }
  263. }
  264. // Cleanup
  265. }
  266. // Returns true if messages from channels were exhausted.
  267. // Blocks in accordance to .sendMonitor throttling.
  268. func (c *MConnection) sendSomeMsgPackets() bool {
  269. // Block until .sendMonitor says we can write.
  270. // Once we're ready we send more than we asked for,
  271. // but amortized it should even out.
  272. c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true)
  273. // Now send some msgPackets.
  274. for i := 0; i < numBatchMsgPackets; i++ {
  275. if c.sendMsgPacket() {
  276. return true
  277. }
  278. }
  279. return false
  280. }
  281. // Returns true if messages from channels were exhausted.
  282. func (c *MConnection) sendMsgPacket() bool {
  283. // Choose a channel to create a msgPacket from.
  284. // The chosen channel will be the one whose recentlySent/priority is the least.
  285. var leastRatio float32 = math.MaxFloat32
  286. var leastChannel *Channel
  287. for _, channel := range c.channels {
  288. // If nothing to send, skip this channel
  289. if !channel.isSendPending() {
  290. continue
  291. }
  292. // Get ratio, and keep track of lowest ratio.
  293. ratio := float32(channel.recentlySent) / float32(channel.priority)
  294. if ratio < leastRatio {
  295. leastRatio = ratio
  296. leastChannel = channel
  297. }
  298. }
  299. // Nothing to send?
  300. if leastChannel == nil {
  301. return true
  302. } else {
  303. // log.Info("Found a msgPacket to send")
  304. }
  305. // Make & send a msgPacket from this channel
  306. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  307. if err != nil {
  308. log.Warn("Failed to write msgPacket", "error", err)
  309. c.stopForError(err)
  310. return true
  311. }
  312. c.sendMonitor.Update(int(n))
  313. c.flushTimer.Set()
  314. return false
  315. }
  316. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  317. // After a whole message has been assembled, it's pushed to onReceive().
  318. // Blocks depending on how the connection is throttled.
  319. func (c *MConnection) recvRoutine() {
  320. defer c._recover()
  321. FOR_LOOP:
  322. for {
  323. // Block until .recvMonitor says we can read.
  324. c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true)
  325. /*
  326. // Peek into bufReader for debugging
  327. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  328. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  329. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  330. if err == nil {
  331. return bytes
  332. } else {
  333. log.Warn("Error peeking connection buffer", "error", err)
  334. return nil
  335. }
  336. }})
  337. }
  338. */
  339. // Read packet type
  340. var n int
  341. var err error
  342. pktType := wire.ReadByte(c.bufReader, &n, &err)
  343. c.recvMonitor.Update(int(n))
  344. if err != nil {
  345. if c.IsRunning() {
  346. log.Warn("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err)
  347. c.stopForError(err)
  348. }
  349. break FOR_LOOP
  350. }
  351. // Read more depending on packet type.
  352. switch pktType {
  353. case packetTypePing:
  354. // TODO: prevent abuse, as they cause flush()'s.
  355. log.Debug("Receive Ping")
  356. c.pong <- struct{}{}
  357. case packetTypePong:
  358. // do nothing
  359. log.Debug("Receive Pong")
  360. case packetTypeMsg:
  361. pkt, n, err := msgPacket{}, int(0), error(nil)
  362. wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
  363. c.recvMonitor.Update(int(n))
  364. if err != nil {
  365. if c.IsRunning() {
  366. log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
  367. c.stopForError(err)
  368. }
  369. break FOR_LOOP
  370. }
  371. channel, ok := c.channelsIdx[pkt.ChannelID]
  372. if !ok || channel == nil {
  373. cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
  374. }
  375. msgBytes, err := channel.recvMsgPacket(pkt)
  376. if err != nil {
  377. if c.IsRunning() {
  378. log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
  379. c.stopForError(err)
  380. }
  381. break FOR_LOOP
  382. }
  383. if msgBytes != nil {
  384. log.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  385. c.onReceive(pkt.ChannelID, msgBytes)
  386. }
  387. default:
  388. cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
  389. }
  390. // TODO: shouldn't this go in the sendRoutine?
  391. // Better to send a ping packet when *we* haven't sent anything for a while.
  392. c.pingTimer.Reset()
  393. }
  394. // Cleanup
  395. close(c.pong)
  396. for _ = range c.pong {
  397. // Drain
  398. }
  399. }
  400. type ConnectionStatus struct {
  401. SendMonitor flow.Status
  402. RecvMonitor flow.Status
  403. Channels []ChannelStatus
  404. }
  405. type ChannelStatus struct {
  406. ID byte
  407. SendQueueCapacity int
  408. SendQueueSize int
  409. Priority int
  410. RecentlySent int64
  411. }
  412. func (c *MConnection) Status() ConnectionStatus {
  413. var status ConnectionStatus
  414. status.SendMonitor = c.sendMonitor.Status()
  415. status.RecvMonitor = c.recvMonitor.Status()
  416. status.Channels = make([]ChannelStatus, len(c.channels))
  417. for i, channel := range c.channels {
  418. status.Channels[i] = ChannelStatus{
  419. ID: channel.id,
  420. SendQueueCapacity: cap(channel.sendQueue),
  421. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  422. Priority: channel.priority,
  423. RecentlySent: channel.recentlySent,
  424. }
  425. }
  426. return status
  427. }
  428. //-----------------------------------------------------------------------------
  429. type ChannelDescriptor struct {
  430. ID byte
  431. Priority int
  432. SendQueueCapacity int
  433. RecvBufferCapacity int
  434. RecvMessageCapacity int
  435. }
  436. func (chDesc *ChannelDescriptor) FillDefaults() {
  437. if chDesc.SendQueueCapacity == 0 {
  438. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  439. }
  440. if chDesc.RecvBufferCapacity == 0 {
  441. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  442. }
  443. if chDesc.RecvMessageCapacity == 0 {
  444. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  445. }
  446. }
  447. // TODO: lowercase.
  448. // NOTE: not goroutine-safe.
  449. type Channel struct {
  450. conn *MConnection
  451. desc *ChannelDescriptor
  452. id byte
  453. sendQueue chan []byte
  454. sendQueueSize int32 // atomic.
  455. recving []byte
  456. sending []byte
  457. priority int
  458. recentlySent int64 // exponential moving average
  459. }
  460. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  461. desc.FillDefaults()
  462. if desc.Priority <= 0 {
  463. cmn.PanicSanity("Channel default priority must be a postive integer")
  464. }
  465. return &Channel{
  466. conn: conn,
  467. desc: desc,
  468. id: desc.ID,
  469. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  470. recving: make([]byte, 0, desc.RecvBufferCapacity),
  471. priority: desc.Priority,
  472. }
  473. }
  474. // Queues message to send to this channel.
  475. // Goroutine-safe
  476. // Times out (and returns false) after defaultSendTimeout
  477. func (ch *Channel) sendBytes(bytes []byte) bool {
  478. timeout := time.NewTimer(defaultSendTimeout)
  479. select {
  480. case <-timeout.C:
  481. // timeout
  482. return false
  483. case ch.sendQueue <- bytes:
  484. atomic.AddInt32(&ch.sendQueueSize, 1)
  485. return true
  486. }
  487. }
  488. // Queues message to send to this channel.
  489. // Nonblocking, returns true if successful.
  490. // Goroutine-safe
  491. func (ch *Channel) trySendBytes(bytes []byte) bool {
  492. select {
  493. case ch.sendQueue <- bytes:
  494. atomic.AddInt32(&ch.sendQueueSize, 1)
  495. return true
  496. default:
  497. return false
  498. }
  499. }
  500. // Goroutine-safe
  501. func (ch *Channel) loadSendQueueSize() (size int) {
  502. return int(atomic.LoadInt32(&ch.sendQueueSize))
  503. }
  504. // Goroutine-safe
  505. // Use only as a heuristic.
  506. func (ch *Channel) canSend() bool {
  507. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  508. }
  509. // Returns true if any msgPackets are pending to be sent.
  510. // Call before calling nextMsgPacket()
  511. // Goroutine-safe
  512. func (ch *Channel) isSendPending() bool {
  513. if len(ch.sending) == 0 {
  514. if len(ch.sendQueue) == 0 {
  515. return false
  516. }
  517. ch.sending = <-ch.sendQueue
  518. }
  519. return true
  520. }
  521. // Creates a new msgPacket to send.
  522. // Not goroutine-safe
  523. func (ch *Channel) nextMsgPacket() msgPacket {
  524. packet := msgPacket{}
  525. packet.ChannelID = byte(ch.id)
  526. packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
  527. if len(ch.sending) <= maxMsgPacketPayloadSize {
  528. packet.EOF = byte(0x01)
  529. ch.sending = nil
  530. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  531. } else {
  532. packet.EOF = byte(0x00)
  533. ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
  534. }
  535. return packet
  536. }
  537. // Writes next msgPacket to w.
  538. // Not goroutine-safe
  539. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  540. packet := ch.nextMsgPacket()
  541. log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  542. wire.WriteByte(packetTypeMsg, w, &n, &err)
  543. wire.WriteBinary(packet, w, &n, &err)
  544. if err == nil {
  545. ch.recentlySent += int64(n)
  546. }
  547. return
  548. }
  549. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  550. // Not goroutine-safe
  551. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  552. // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  553. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  554. return nil, wire.ErrBinaryReadOverflow
  555. }
  556. ch.recving = append(ch.recving, packet.Bytes...)
  557. if packet.EOF == byte(0x01) {
  558. msgBytes := ch.recving
  559. // clear the slice without re-allocating.
  560. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  561. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  562. // at which point the recving slice stops being used and should be garbage collected
  563. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  564. return msgBytes, nil
  565. }
  566. return nil, nil
  567. }
  568. // Call this periodically to update stats for throttling purposes.
  569. // Not goroutine-safe
  570. func (ch *Channel) updateStats() {
  571. // Exponential decay of stats.
  572. // TODO: optimize.
  573. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  574. }
  575. //-----------------------------------------------------------------------------
  576. const (
  577. maxMsgPacketPayloadSize = 1024
  578. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  579. maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  580. packetTypePing = byte(0x01)
  581. packetTypePong = byte(0x02)
  582. packetTypeMsg = byte(0x03)
  583. )
  584. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  585. type msgPacket struct {
  586. ChannelID byte
  587. EOF byte // 1 means message ends here.
  588. Bytes []byte
  589. }
  590. func (p msgPacket) String() string {
  591. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  592. }