You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

669 lines
18 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. cmn "github.com/tendermint/go-common"
  12. cfg "github.com/tendermint/go-config"
  13. flow "github.com/tendermint/go-flowrate/flowrate"
  14. wire "github.com/tendermint/go-wire"
  15. )
  16. const (
  17. numBatchMsgPackets = 10
  18. minReadBufferSize = 1024
  19. minWriteBufferSize = 65536
  20. idleTimeoutMinutes = 5
  21. updateStatsSeconds = 2
  22. pingTimeoutSeconds = 40
  23. flushThrottleMS = 100
  24. defaultSendQueueCapacity = 1
  25. defaultRecvBufferCapacity = 4096
  26. defaultRecvMessageCapacity = 22020096 // 21MB
  27. defaultSendTimeoutSeconds = 10
  28. )
  29. type receiveCbFunc func(chID byte, msgBytes []byte)
  30. type errorCbFunc func(interface{})
  31. /*
  32. Each peer has one `MConnection` (multiplex connection) instance.
  33. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  34. several messages along a single channel of communication.
  35. Each `MConnection` handles message transmission on multiple abstract communication
  36. `Channel`s. Each channel has a globally unique byte id.
  37. The byte id and the relative priorities of each `Channel` are configured upon
  38. initialization of the connection.
  39. There are two methods for sending messages:
  40. func (m MConnection) Send(chID byte, msg interface{}) bool {}
  41. func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
  42. `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
  43. for the channel with the given id byte `chID`, or until the request times out.
  44. The message `msg` is serialized using the `tendermint/wire` submodule's
  45. `WriteBinary()` reflection routine.
  46. `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
  47. queue is full.
  48. Inbound message bytes are handled with an onReceive callback function.
  49. */
  50. type MConnection struct {
  51. cmn.BaseService
  52. conn net.Conn
  53. bufReader *bufio.Reader
  54. bufWriter *bufio.Writer
  55. sendMonitor *flow.Monitor
  56. recvMonitor *flow.Monitor
  57. sendRate int64
  58. recvRate int64
  59. send chan struct{}
  60. pong chan struct{}
  61. channels []*Channel
  62. channelsIdx map[byte]*Channel
  63. onReceive receiveCbFunc
  64. onError errorCbFunc
  65. errored uint32
  66. quit chan struct{}
  67. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
  68. pingTimer *cmn.RepeatTimer // send pings periodically
  69. chStatsTimer *cmn.RepeatTimer // update channel stats periodically
  70. LocalAddress *NetAddress
  71. RemoteAddress *NetAddress
  72. }
  73. func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  74. mconn := &MConnection{
  75. conn: conn,
  76. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  77. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  78. sendMonitor: flow.New(0, 0),
  79. recvMonitor: flow.New(0, 0),
  80. sendRate: int64(config.GetInt(configKeySendRate)),
  81. recvRate: int64(config.GetInt(configKeyRecvRate)),
  82. send: make(chan struct{}, 1),
  83. pong: make(chan struct{}),
  84. onReceive: onReceive,
  85. onError: onError,
  86. // Initialized in Start()
  87. quit: nil,
  88. flushTimer: nil,
  89. pingTimer: nil,
  90. chStatsTimer: nil,
  91. LocalAddress: NewNetAddress(conn.LocalAddr()),
  92. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  93. }
  94. // Create channels
  95. var channelsIdx = map[byte]*Channel{}
  96. var channels = []*Channel{}
  97. for _, desc := range chDescs {
  98. descCopy := *desc // copy the desc else unsafe access across connections
  99. channel := newChannel(mconn, &descCopy)
  100. channelsIdx[channel.id] = channel
  101. channels = append(channels, channel)
  102. }
  103. mconn.channels = channels
  104. mconn.channelsIdx = channelsIdx
  105. mconn.BaseService = *cmn.NewBaseService(log, "MConnection", mconn)
  106. return mconn
  107. }
  108. func (c *MConnection) OnStart() error {
  109. c.BaseService.OnStart()
  110. c.quit = make(chan struct{})
  111. c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottleMS*time.Millisecond)
  112. c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeoutSeconds*time.Second)
  113. c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStatsSeconds*time.Second)
  114. go c.sendRoutine()
  115. go c.recvRoutine()
  116. return nil
  117. }
  118. func (c *MConnection) OnStop() {
  119. c.BaseService.OnStop()
  120. c.flushTimer.Stop()
  121. c.pingTimer.Stop()
  122. c.chStatsTimer.Stop()
  123. if c.quit != nil {
  124. close(c.quit)
  125. }
  126. c.conn.Close()
  127. // We can't close pong safely here because
  128. // recvRoutine may write to it after we've stopped.
  129. // Though it doesn't need to get closed at all,
  130. // we close it @ recvRoutine.
  131. // close(c.pong)
  132. }
  133. func (c *MConnection) String() string {
  134. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  135. }
  136. func (c *MConnection) flush() {
  137. log.Debug("Flush", "conn", c)
  138. err := c.bufWriter.Flush()
  139. if err != nil {
  140. log.Warn("MConnection flush failed", "error", err)
  141. }
  142. }
  143. // Catch panics, usually caused by remote disconnects.
  144. func (c *MConnection) _recover() {
  145. if r := recover(); r != nil {
  146. stack := debug.Stack()
  147. err := cmn.StackError{r, stack}
  148. c.stopForError(err)
  149. }
  150. }
  151. func (c *MConnection) stopForError(r interface{}) {
  152. c.Stop()
  153. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  154. if c.onError != nil {
  155. c.onError(r)
  156. }
  157. }
  158. }
  159. // Queues a message to be sent to channel.
  160. func (c *MConnection) Send(chID byte, msg interface{}) bool {
  161. if !c.IsRunning() {
  162. return false
  163. }
  164. log.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  165. // Send message to channel.
  166. channel, ok := c.channelsIdx[chID]
  167. if !ok {
  168. log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  169. return false
  170. }
  171. success := channel.sendBytes(wire.BinaryBytes(msg))
  172. if success {
  173. // Wake up sendRoutine if necessary
  174. select {
  175. case c.send <- struct{}{}:
  176. default:
  177. }
  178. } else {
  179. log.Warn("Send failed", "channel", chID, "conn", c, "msg", msg)
  180. }
  181. return success
  182. }
  183. // Queues a message to be sent to channel.
  184. // Nonblocking, returns true if successful.
  185. func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
  186. if !c.IsRunning() {
  187. return false
  188. }
  189. log.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
  190. // Send message to channel.
  191. channel, ok := c.channelsIdx[chID]
  192. if !ok {
  193. log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
  194. return false
  195. }
  196. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  197. if ok {
  198. // Wake up sendRoutine if necessary
  199. select {
  200. case c.send <- struct{}{}:
  201. default:
  202. }
  203. }
  204. return ok
  205. }
  206. func (c *MConnection) CanSend(chID byte) bool {
  207. if !c.IsRunning() {
  208. return false
  209. }
  210. channel, ok := c.channelsIdx[chID]
  211. if !ok {
  212. log.Error(cmn.Fmt("Unknown channel %X", chID))
  213. return false
  214. }
  215. return channel.canSend()
  216. }
  217. // sendRoutine polls for packets to send from channels.
  218. func (c *MConnection) sendRoutine() {
  219. defer c._recover()
  220. FOR_LOOP:
  221. for {
  222. var n int
  223. var err error
  224. select {
  225. case <-c.flushTimer.Ch:
  226. // NOTE: flushTimer.Set() must be called every time
  227. // something is written to .bufWriter.
  228. c.flush()
  229. case <-c.chStatsTimer.Ch:
  230. for _, channel := range c.channels {
  231. channel.updateStats()
  232. }
  233. case <-c.pingTimer.Ch:
  234. log.Debug("Send Ping")
  235. wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  236. c.sendMonitor.Update(int(n))
  237. c.flush()
  238. case <-c.pong:
  239. log.Debug("Send Pong")
  240. wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  241. c.sendMonitor.Update(int(n))
  242. c.flush()
  243. case <-c.quit:
  244. break FOR_LOOP
  245. case <-c.send:
  246. // Send some msgPackets
  247. eof := c.sendSomeMsgPackets()
  248. if !eof {
  249. // Keep sendRoutine awake.
  250. select {
  251. case c.send <- struct{}{}:
  252. default:
  253. }
  254. }
  255. }
  256. if !c.IsRunning() {
  257. break FOR_LOOP
  258. }
  259. if err != nil {
  260. log.Warn("Connection failed @ sendRoutine", "conn", c, "error", err)
  261. c.stopForError(err)
  262. break FOR_LOOP
  263. }
  264. }
  265. // Cleanup
  266. }
  267. // Returns true if messages from channels were exhausted.
  268. // Blocks in accordance to .sendMonitor throttling.
  269. func (c *MConnection) sendSomeMsgPackets() bool {
  270. // Block until .sendMonitor says we can write.
  271. // Once we're ready we send more than we asked for,
  272. // but amortized it should even out.
  273. c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true)
  274. // Now send some msgPackets.
  275. for i := 0; i < numBatchMsgPackets; i++ {
  276. if c.sendMsgPacket() {
  277. return true
  278. }
  279. }
  280. return false
  281. }
  282. // Returns true if messages from channels were exhausted.
  283. func (c *MConnection) sendMsgPacket() bool {
  284. // Choose a channel to create a msgPacket from.
  285. // The chosen channel will be the one whose recentlySent/priority is the least.
  286. var leastRatio float32 = math.MaxFloat32
  287. var leastChannel *Channel
  288. for _, channel := range c.channels {
  289. // If nothing to send, skip this channel
  290. if !channel.isSendPending() {
  291. continue
  292. }
  293. // Get ratio, and keep track of lowest ratio.
  294. ratio := float32(channel.recentlySent) / float32(channel.priority)
  295. if ratio < leastRatio {
  296. leastRatio = ratio
  297. leastChannel = channel
  298. }
  299. }
  300. // Nothing to send?
  301. if leastChannel == nil {
  302. return true
  303. } else {
  304. // log.Info("Found a msgPacket to send")
  305. }
  306. // Make & send a msgPacket from this channel
  307. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  308. if err != nil {
  309. log.Warn("Failed to write msgPacket", "error", err)
  310. c.stopForError(err)
  311. return true
  312. }
  313. c.sendMonitor.Update(int(n))
  314. c.flushTimer.Set()
  315. return false
  316. }
  317. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  318. // After a whole message has been assembled, it's pushed to onReceive().
  319. // Blocks depending on how the connection is throttled.
  320. func (c *MConnection) recvRoutine() {
  321. defer c._recover()
  322. FOR_LOOP:
  323. for {
  324. // Block until .recvMonitor says we can read.
  325. c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true)
  326. /*
  327. // Peek into bufReader for debugging
  328. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  329. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  330. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  331. if err == nil {
  332. return bytes
  333. } else {
  334. log.Warn("Error peeking connection buffer", "error", err)
  335. return nil
  336. }
  337. }})
  338. }
  339. */
  340. // Read packet type
  341. var n int
  342. var err error
  343. pktType := wire.ReadByte(c.bufReader, &n, &err)
  344. c.recvMonitor.Update(int(n))
  345. if err != nil {
  346. if c.IsRunning() {
  347. log.Warn("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err)
  348. c.stopForError(err)
  349. }
  350. break FOR_LOOP
  351. }
  352. // Read more depending on packet type.
  353. switch pktType {
  354. case packetTypePing:
  355. // TODO: prevent abuse, as they cause flush()'s.
  356. log.Debug("Receive Ping")
  357. c.pong <- struct{}{}
  358. case packetTypePong:
  359. // do nothing
  360. log.Debug("Receive Pong")
  361. case packetTypeMsg:
  362. pkt, n, err := msgPacket{}, int(0), error(nil)
  363. wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
  364. c.recvMonitor.Update(int(n))
  365. if err != nil {
  366. if c.IsRunning() {
  367. log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
  368. c.stopForError(err)
  369. }
  370. break FOR_LOOP
  371. }
  372. channel, ok := c.channelsIdx[pkt.ChannelID]
  373. if !ok || channel == nil {
  374. cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
  375. }
  376. msgBytes, err := channel.recvMsgPacket(pkt)
  377. if err != nil {
  378. if c.IsRunning() {
  379. log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
  380. c.stopForError(err)
  381. }
  382. break FOR_LOOP
  383. }
  384. if msgBytes != nil {
  385. log.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
  386. c.onReceive(pkt.ChannelID, msgBytes)
  387. }
  388. default:
  389. cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
  390. }
  391. // TODO: shouldn't this go in the sendRoutine?
  392. // Better to send a ping packet when *we* haven't sent anything for a while.
  393. c.pingTimer.Reset()
  394. }
  395. // Cleanup
  396. close(c.pong)
  397. for _ = range c.pong {
  398. // Drain
  399. }
  400. }
  401. type ConnectionStatus struct {
  402. SendMonitor flow.Status
  403. RecvMonitor flow.Status
  404. Channels []ChannelStatus
  405. }
  406. type ChannelStatus struct {
  407. ID byte
  408. SendQueueCapacity int
  409. SendQueueSize int
  410. Priority int
  411. RecentlySent int64
  412. }
  413. func (c *MConnection) Status() ConnectionStatus {
  414. var status ConnectionStatus
  415. status.SendMonitor = c.sendMonitor.Status()
  416. status.RecvMonitor = c.recvMonitor.Status()
  417. status.Channels = make([]ChannelStatus, len(c.channels))
  418. for i, channel := range c.channels {
  419. status.Channels[i] = ChannelStatus{
  420. ID: channel.id,
  421. SendQueueCapacity: cap(channel.sendQueue),
  422. SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
  423. Priority: channel.priority,
  424. RecentlySent: channel.recentlySent,
  425. }
  426. }
  427. return status
  428. }
  429. //-----------------------------------------------------------------------------
  430. type ChannelDescriptor struct {
  431. ID byte
  432. Priority int
  433. SendQueueCapacity int
  434. RecvBufferCapacity int
  435. RecvMessageCapacity int
  436. }
  437. func (chDesc *ChannelDescriptor) FillDefaults() {
  438. if chDesc.SendQueueCapacity == 0 {
  439. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  440. }
  441. if chDesc.RecvBufferCapacity == 0 {
  442. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  443. }
  444. if chDesc.RecvMessageCapacity == 0 {
  445. chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
  446. }
  447. }
  448. // TODO: lowercase.
  449. // NOTE: not goroutine-safe.
  450. type Channel struct {
  451. conn *MConnection
  452. desc *ChannelDescriptor
  453. id byte
  454. sendQueue chan []byte
  455. sendQueueSize int32 // atomic.
  456. recving []byte
  457. sending []byte
  458. priority int
  459. recentlySent int64 // exponential moving average
  460. }
  461. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  462. desc.FillDefaults()
  463. if desc.Priority <= 0 {
  464. cmn.PanicSanity("Channel default priority must be a postive integer")
  465. }
  466. return &Channel{
  467. conn: conn,
  468. desc: desc,
  469. id: desc.ID,
  470. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  471. recving: make([]byte, 0, desc.RecvBufferCapacity),
  472. priority: desc.Priority,
  473. }
  474. }
  475. // Queues message to send to this channel.
  476. // Goroutine-safe
  477. // Times out (and returns false) after defaultSendTimeoutSeconds
  478. func (ch *Channel) sendBytes(bytes []byte) bool {
  479. timeout := time.NewTimer(defaultSendTimeoutSeconds * time.Second)
  480. select {
  481. case <-timeout.C:
  482. // timeout
  483. return false
  484. case ch.sendQueue <- bytes:
  485. atomic.AddInt32(&ch.sendQueueSize, 1)
  486. return true
  487. }
  488. }
  489. // Queues message to send to this channel.
  490. // Nonblocking, returns true if successful.
  491. // Goroutine-safe
  492. func (ch *Channel) trySendBytes(bytes []byte) bool {
  493. select {
  494. case ch.sendQueue <- bytes:
  495. atomic.AddInt32(&ch.sendQueueSize, 1)
  496. return true
  497. default:
  498. return false
  499. }
  500. }
  501. // Goroutine-safe
  502. func (ch *Channel) loadSendQueueSize() (size int) {
  503. return int(atomic.LoadInt32(&ch.sendQueueSize))
  504. }
  505. // Goroutine-safe
  506. // Use only as a heuristic.
  507. func (ch *Channel) canSend() bool {
  508. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  509. }
  510. // Returns true if any msgPackets are pending to be sent.
  511. // Call before calling nextMsgPacket()
  512. // Goroutine-safe
  513. func (ch *Channel) isSendPending() bool {
  514. if len(ch.sending) == 0 {
  515. if len(ch.sendQueue) == 0 {
  516. return false
  517. }
  518. ch.sending = <-ch.sendQueue
  519. }
  520. return true
  521. }
  522. // Creates a new msgPacket to send.
  523. // Not goroutine-safe
  524. func (ch *Channel) nextMsgPacket() msgPacket {
  525. packet := msgPacket{}
  526. packet.ChannelID = byte(ch.id)
  527. packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
  528. if len(ch.sending) <= maxMsgPacketPayloadSize {
  529. packet.EOF = byte(0x01)
  530. ch.sending = nil
  531. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  532. } else {
  533. packet.EOF = byte(0x00)
  534. ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
  535. }
  536. return packet
  537. }
  538. // Writes next msgPacket to w.
  539. // Not goroutine-safe
  540. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
  541. packet := ch.nextMsgPacket()
  542. log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  543. wire.WriteByte(packetTypeMsg, w, &n, &err)
  544. wire.WriteBinary(packet, w, &n, &err)
  545. if err == nil {
  546. ch.recentlySent += int64(n)
  547. }
  548. return
  549. }
  550. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  551. // Not goroutine-safe
  552. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  553. // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  554. if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
  555. return nil, wire.ErrBinaryReadOverflow
  556. }
  557. ch.recving = append(ch.recving, packet.Bytes...)
  558. if packet.EOF == byte(0x01) {
  559. msgBytes := ch.recving
  560. // clear the slice without re-allocating.
  561. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
  562. // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
  563. // at which point the recving slice stops being used and should be garbage collected
  564. ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
  565. return msgBytes, nil
  566. }
  567. return nil, nil
  568. }
  569. // Call this periodically to update stats for throttling purposes.
  570. // Not goroutine-safe
  571. func (ch *Channel) updateStats() {
  572. // Exponential decay of stats.
  573. // TODO: optimize.
  574. ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
  575. }
  576. //-----------------------------------------------------------------------------
  577. const (
  578. maxMsgPacketPayloadSize = 1024
  579. maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
  580. maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
  581. packetTypePing = byte(0x01)
  582. packetTypePong = byte(0x02)
  583. packetTypeMsg = byte(0x03)
  584. )
  585. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  586. type msgPacket struct {
  587. ChannelID byte
  588. EOF byte // 1 means message ends here.
  589. Bytes []byte
  590. }
  591. func (p msgPacket) String() string {
  592. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
  593. }