You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

601 lines
14 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "sync/atomic"
  9. "time"
  10. flow "code.google.com/p/mxk/go1/flowcontrol"
  11. "github.com/op/go-logging"
  12. . "github.com/tendermint/tendermint/binary"
  13. . "github.com/tendermint/tendermint/common"
  14. )
  15. const (
  16. numBatchPackets = 10
  17. minReadBufferSize = 1024
  18. minWriteBufferSize = 1024
  19. flushThrottleMS = 50
  20. idleTimeoutMinutes = 5
  21. updateStatsSeconds = 2
  22. pingTimeoutMinutes = 2
  23. defaultSendRate = 51200 // 5Kb/s
  24. defaultRecvRate = 51200 // 5Kb/s
  25. )
  26. /*
  27. A MConnection wraps a network connection and handles buffering and multiplexing.
  28. Binary messages are sent with ".Send(channelId, msg)".
  29. Inbound ByteSlices are pushed to the designated chan<- InboundBytes.
  30. */
  31. type MConnection struct {
  32. conn net.Conn
  33. bufReader *bufio.Reader
  34. bufWriter *bufio.Writer
  35. sendMonitor *flow.Monitor
  36. recvMonitor *flow.Monitor
  37. sendRate int64
  38. recvRate int64
  39. flushTimer *ThrottleTimer // flush writes as necessary but throttled.
  40. send chan struct{}
  41. quit chan struct{}
  42. pingTimer *RepeatTimer // send pings periodically
  43. pong chan struct{}
  44. chStatsTimer *RepeatTimer // update channel stats periodically
  45. channels []*Channel
  46. channelsIdx map[byte]*Channel
  47. onError func(interface{})
  48. started uint32
  49. stopped uint32
  50. errored uint32
  51. Peer *Peer // hacky optimization, gets set by Peer
  52. LocalAddress *NetAddress
  53. RemoteAddress *NetAddress
  54. }
  55. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onError func(interface{})) *MConnection {
  56. mconn := &MConnection{
  57. conn: conn,
  58. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  59. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  60. sendMonitor: flow.New(0, 0),
  61. recvMonitor: flow.New(0, 0),
  62. sendRate: defaultSendRate,
  63. recvRate: defaultRecvRate,
  64. flushTimer: NewThrottleTimer(flushThrottleMS * time.Millisecond),
  65. send: make(chan struct{}, 1),
  66. quit: make(chan struct{}),
  67. pingTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute),
  68. pong: make(chan struct{}),
  69. chStatsTimer: NewRepeatTimer(updateStatsSeconds * time.Second),
  70. onError: onError,
  71. LocalAddress: NewNetAddress(conn.LocalAddr()),
  72. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  73. }
  74. // Create channels
  75. var channelsIdx = map[byte]*Channel{}
  76. var channels = []*Channel{}
  77. for _, desc := range chDescs {
  78. channel := newChannel(mconn, desc)
  79. channelsIdx[channel.id] = channel
  80. channels = append(channels, channel)
  81. }
  82. mconn.channels = channels
  83. mconn.channelsIdx = channelsIdx
  84. return mconn
  85. }
  86. // .Start() begins multiplexing packets to and from "channels".
  87. func (c *MConnection) Start() {
  88. if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
  89. log.Debug("Starting %v", c)
  90. go c.sendRoutine()
  91. go c.recvRoutine()
  92. }
  93. }
  94. func (c *MConnection) Stop() {
  95. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  96. log.Debug("Stopping %v", c)
  97. close(c.quit)
  98. c.conn.Close()
  99. c.flushTimer.Stop()
  100. c.chStatsTimer.Stop()
  101. c.pingTimer.Stop()
  102. // We can't close pong safely here because
  103. // recvRoutine may write to it after we've stopped.
  104. // Though it doesn't need to get closed at all,
  105. // we close it @ recvRoutine.
  106. // close(c.pong)
  107. }
  108. }
  109. func (c *MConnection) String() string {
  110. return fmt.Sprintf("/%v/", c.conn.RemoteAddr())
  111. }
  112. func (c *MConnection) flush() {
  113. err := c.bufWriter.Flush()
  114. if err != nil {
  115. if atomic.LoadUint32(&c.stopped) != 1 {
  116. log.Warning("MConnection flush failed: %v", err)
  117. }
  118. }
  119. }
  120. // Catch panics, usually caused by remote disconnects.
  121. func (c *MConnection) _recover() {
  122. if r := recover(); r != nil {
  123. c.stopForError(r)
  124. }
  125. }
  126. func (c *MConnection) stopForError(r interface{}) {
  127. c.Stop()
  128. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  129. if c.onError != nil {
  130. c.onError(r)
  131. }
  132. }
  133. }
  134. // Queues a message to be sent to channel.
  135. func (c *MConnection) Send(chId byte, msg Binary) bool {
  136. if atomic.LoadUint32(&c.stopped) == 1 {
  137. return false
  138. }
  139. // Send message to channel.
  140. channel, ok := c.channelsIdx[chId]
  141. if !ok {
  142. log.Error("Cannot send bytes, unknown channel %X", chId)
  143. return false
  144. }
  145. channel.sendBytes(BinaryBytes(msg))
  146. // Wake up sendRoutine if necessary
  147. select {
  148. case c.send <- struct{}{}:
  149. default:
  150. }
  151. return true
  152. }
  153. // Queues a message to be sent to channel.
  154. // Nonblocking, returns true if successful.
  155. func (c *MConnection) TrySend(chId byte, msg Binary) bool {
  156. if atomic.LoadUint32(&c.stopped) == 1 {
  157. return false
  158. }
  159. // Send message to channel.
  160. channel, ok := c.channelsIdx[chId]
  161. if !ok {
  162. log.Error("Cannot send bytes, unknown channel %X", chId)
  163. return false
  164. }
  165. ok = channel.trySendBytes(BinaryBytes(msg))
  166. if ok {
  167. // Wake up sendRoutine if necessary
  168. select {
  169. case c.send <- struct{}{}:
  170. default:
  171. }
  172. }
  173. return ok
  174. }
  175. func (c *MConnection) CanSend(chId byte) bool {
  176. if atomic.LoadUint32(&c.stopped) == 1 {
  177. return false
  178. }
  179. channel, ok := c.channelsIdx[chId]
  180. if !ok {
  181. log.Error("Unknown channel %X", chId)
  182. return false
  183. }
  184. return channel.canSend()
  185. }
  186. // sendRoutine polls for packets to send from channels.
  187. func (c *MConnection) sendRoutine() {
  188. defer c._recover()
  189. FOR_LOOP:
  190. for {
  191. var err error
  192. select {
  193. case <-c.flushTimer.Ch:
  194. // NOTE: flushTimer.Set() must be called every time
  195. // something is written to .bufWriter.
  196. c.flush()
  197. case <-c.chStatsTimer.Ch:
  198. for _, channel := range c.channels {
  199. channel.updateStats()
  200. }
  201. case <-c.pingTimer.Ch:
  202. var n int64
  203. n, err = packetTypePing.WriteTo(c.bufWriter)
  204. c.sendMonitor.Update(int(n))
  205. c.flush()
  206. case <-c.pong:
  207. var n int64
  208. n, err = packetTypePong.WriteTo(c.bufWriter)
  209. c.sendMonitor.Update(int(n))
  210. c.flush()
  211. case <-c.quit:
  212. break FOR_LOOP
  213. case <-c.send:
  214. // Send some packets
  215. eof := c.sendSomePackets()
  216. if !eof {
  217. // Keep sendRoutine awake.
  218. select {
  219. case c.send <- struct{}{}:
  220. default:
  221. }
  222. }
  223. }
  224. if atomic.LoadUint32(&c.stopped) == 1 {
  225. break FOR_LOOP
  226. }
  227. if err != nil {
  228. log.Info("%v failed @ sendRoutine:\n%v", c, err)
  229. c.Stop()
  230. break FOR_LOOP
  231. }
  232. }
  233. // Cleanup
  234. }
  235. // Returns true if messages from channels were exhausted.
  236. // Blocks in accordance to .sendMonitor throttling.
  237. func (c *MConnection) sendSomePackets() bool {
  238. // Block until .sendMonitor says we can write.
  239. // Once we're ready we send more than we asked for,
  240. // but amortized it should even out.
  241. c.sendMonitor.Limit(maxPacketSize, atomic.LoadInt64(&c.sendRate), true)
  242. // Now send some packets.
  243. for i := 0; i < numBatchPackets; i++ {
  244. if c.sendPacket() {
  245. return true
  246. }
  247. }
  248. return false
  249. }
  250. // Returns true if messages from channels were exhausted.
  251. func (c *MConnection) sendPacket() bool {
  252. // Choose a channel to create a packet from.
  253. // The chosen channel will be the one whose recentlySent/priority is the least.
  254. var leastRatio float32 = math.MaxFloat32
  255. var leastChannel *Channel
  256. for _, channel := range c.channels {
  257. // If nothing to send, skip this channel
  258. if !channel.sendPending() {
  259. continue
  260. }
  261. // Get ratio, and keep track of lowest ratio.
  262. ratio := float32(channel.recentlySent) / float32(channel.priority)
  263. if ratio < leastRatio {
  264. leastRatio = ratio
  265. leastChannel = channel
  266. }
  267. }
  268. // Nothing to send?
  269. if leastChannel == nil {
  270. return true
  271. } else {
  272. log.Debug("Found a packet to send")
  273. }
  274. // Make & send a packet from this channel
  275. n, err := leastChannel.writePacketTo(c.bufWriter)
  276. if err != nil {
  277. log.Warning("Failed to write packet. Error: %v", err)
  278. c.stopForError(err)
  279. return true
  280. }
  281. c.sendMonitor.Update(int(n))
  282. c.flushTimer.Set()
  283. return false
  284. }
  285. // recvRoutine reads packets and reconstructs the message using the channels' "recving" buffer.
  286. // After a whole message has been assembled, it's pushed to the Channel's recvQueue.
  287. // Blocks depending on how the connection is throttled.
  288. func (c *MConnection) recvRoutine() {
  289. defer c._recover()
  290. FOR_LOOP:
  291. for {
  292. // Block until .recvMonitor says we can read.
  293. c.recvMonitor.Limit(maxPacketSize, atomic.LoadInt64(&c.recvRate), true)
  294. // Read packet type
  295. pktType, n, err := ReadUInt8Safe(c.bufReader)
  296. c.recvMonitor.Update(int(n))
  297. if err != nil {
  298. if atomic.LoadUint32(&c.stopped) != 1 {
  299. log.Info("%v failed @ recvRoutine with err: %v", c, err)
  300. c.Stop()
  301. }
  302. break FOR_LOOP
  303. }
  304. // Peek into bufReader for debugging
  305. if log.IsEnabledFor(logging.DEBUG) {
  306. numBytes := c.bufReader.Buffered()
  307. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  308. if err != nil {
  309. log.Debug("recvRoutine packet type %X, peeked: %X", pktType, bytes)
  310. }
  311. }
  312. // Read more depending on packet type.
  313. switch pktType {
  314. case packetTypePing:
  315. // TODO: prevent abuse, as they cause flush()'s.
  316. c.pong <- struct{}{}
  317. case packetTypePong:
  318. // do nothing
  319. case packetTypeMessage:
  320. pkt, n, err := readPacketSafe(c.bufReader)
  321. c.recvMonitor.Update(int(n))
  322. if err != nil {
  323. if atomic.LoadUint32(&c.stopped) != 1 {
  324. log.Info("%v failed @ recvRoutine", c)
  325. c.Stop()
  326. }
  327. break FOR_LOOP
  328. }
  329. channel := c.channels[pkt.ChannelId]
  330. if channel == nil {
  331. Panicf("Unknown channel %v", pkt.ChannelId)
  332. }
  333. channel.recvPacket(pkt)
  334. default:
  335. Panicf("Unknown message type %v", pktType)
  336. }
  337. // TODO: shouldn't this go in the sendRoutine?
  338. // Better to send a packet when *we* haven't sent anything for a while.
  339. c.pingTimer.Reset()
  340. }
  341. // Cleanup
  342. close(c.pong)
  343. for _ = range c.pong {
  344. // Drain
  345. }
  346. }
  347. //-----------------------------------------------------------------------------
  348. type ChannelDescriptor struct {
  349. Id byte
  350. SendQueueCapacity int // One per MConnection.
  351. RecvQueueCapacity int // Global for this channel.
  352. RecvBufferSize int
  353. DefaultPriority uint
  354. // TODO: kinda hacky.
  355. // This is created by the switch, one per channel.
  356. recvQueue chan InboundBytes
  357. }
  358. // TODO: lowercase.
  359. // NOTE: not goroutine-safe.
  360. type Channel struct {
  361. conn *MConnection
  362. desc *ChannelDescriptor
  363. id byte
  364. recvQueue chan InboundBytes
  365. sendQueue chan ByteSlice
  366. sendQueueSize uint32
  367. recving ByteSlice
  368. sending ByteSlice
  369. priority uint
  370. recentlySent int64 // exponential moving average
  371. }
  372. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  373. if desc.DefaultPriority <= 0 {
  374. panic("Channel default priority must be a postive integer")
  375. }
  376. return &Channel{
  377. conn: conn,
  378. desc: desc,
  379. id: desc.Id,
  380. recvQueue: desc.recvQueue,
  381. sendQueue: make(chan ByteSlice, desc.SendQueueCapacity),
  382. recving: make([]byte, 0, desc.RecvBufferSize),
  383. priority: desc.DefaultPriority,
  384. }
  385. }
  386. // Queues message to send to this channel.
  387. // Goroutine-safe
  388. func (ch *Channel) sendBytes(bytes ByteSlice) {
  389. ch.sendQueue <- bytes
  390. atomic.AddUint32(&ch.sendQueueSize, 1)
  391. }
  392. // Queues message to send to this channel.
  393. // Nonblocking, returns true if successful.
  394. // Goroutine-safe
  395. func (ch *Channel) trySendBytes(bytes ByteSlice) bool {
  396. select {
  397. case ch.sendQueue <- bytes:
  398. atomic.AddUint32(&ch.sendQueueSize, 1)
  399. return true
  400. default:
  401. return false
  402. }
  403. }
  404. // Goroutine-safe
  405. func (ch *Channel) loadSendQueueSize() (size int) {
  406. return int(atomic.LoadUint32(&ch.sendQueueSize))
  407. }
  408. // Goroutine-safe
  409. // Use only as a heuristic.
  410. func (ch *Channel) canSend() bool {
  411. return ch.loadSendQueueSize() < ch.desc.SendQueueCapacity
  412. }
  413. // Returns true if any packets are pending to be sent.
  414. // Call before calling nextPacket()
  415. // Goroutine-safe
  416. func (ch *Channel) sendPending() bool {
  417. if len(ch.sending) == 0 {
  418. if len(ch.sendQueue) == 0 {
  419. return false
  420. }
  421. ch.sending = <-ch.sendQueue
  422. }
  423. return true
  424. }
  425. // Creates a new packet to send.
  426. // Not goroutine-safe
  427. func (ch *Channel) nextPacket() packet {
  428. packet := packet{}
  429. packet.ChannelId = Byte(ch.id)
  430. packet.Bytes = ch.sending[:MinInt(maxPacketSize, len(ch.sending))]
  431. if len(ch.sending) <= maxPacketSize {
  432. packet.EOF = Byte(0x01)
  433. ch.sending = nil
  434. atomic.AddUint32(&ch.sendQueueSize, ^uint32(0)) // decrement sendQueueSize
  435. } else {
  436. packet.EOF = Byte(0x00)
  437. ch.sending = ch.sending[MinInt(maxPacketSize, len(ch.sending)):]
  438. }
  439. return packet
  440. }
  441. // Writes next packet to w.
  442. // Not goroutine-safe
  443. func (ch *Channel) writePacketTo(w io.Writer) (n int64, err error) {
  444. packet := ch.nextPacket()
  445. n, err = WriteTo(packetTypeMessage, w, n, err)
  446. n, err = WriteTo(packet, w, n, err)
  447. if err != nil {
  448. ch.recentlySent += n
  449. }
  450. return
  451. }
  452. // Handles incoming packets.
  453. // Not goroutine-safe
  454. func (ch *Channel) recvPacket(pkt packet) {
  455. ch.recving = append(ch.recving, pkt.Bytes...)
  456. if pkt.EOF == Byte(0x01) {
  457. ch.recvQueue <- InboundBytes{ch.conn, ch.recving}
  458. ch.recving = make([]byte, 0, ch.desc.RecvBufferSize)
  459. }
  460. }
  461. // Call this periodically to update stats for throttling purposes.
  462. // Not goroutine-safe
  463. func (ch *Channel) updateStats() {
  464. // Exponential decay of stats.
  465. // TODO: optimize.
  466. ch.recentlySent = int64(float64(ch.recentlySent) * 0.5)
  467. }
  468. //-----------------------------------------------------------------------------
  469. const (
  470. maxPacketSize = 1024
  471. packetTypePing = UInt8(0x00)
  472. packetTypePong = UInt8(0x01)
  473. packetTypeMessage = UInt8(0x10)
  474. )
  475. // Messages in channels are chopped into smaller packets for multiplexing.
  476. type packet struct {
  477. ChannelId Byte
  478. EOF Byte // 1 means message ends here.
  479. Bytes ByteSlice
  480. }
  481. func (p packet) WriteTo(w io.Writer) (n int64, err error) {
  482. n, err = WriteTo(p.ChannelId, w, n, err)
  483. n, err = WriteTo(p.EOF, w, n, err)
  484. n, err = WriteTo(p.Bytes, w, n, err)
  485. return
  486. }
  487. func (p packet) String() string {
  488. return fmt.Sprintf("%v:%X", p.ChannelId, p.Bytes)
  489. }
  490. func readPacketSafe(r io.Reader) (pkt packet, n int64, err error) {
  491. chId, n_, err := ReadByteSafe(r)
  492. n += n_
  493. if err != nil {
  494. return
  495. }
  496. eof, n_, err := ReadByteSafe(r)
  497. n += n_
  498. if err != nil {
  499. return
  500. }
  501. // TODO: packet length sanity check.
  502. bytes, n_, err := ReadByteSliceSafe(r)
  503. n += n_
  504. if err != nil {
  505. return
  506. }
  507. return packet{chId, eof, bytes}, n, nil
  508. }
  509. //-----------------------------------------------------------------------------
  510. type InboundBytes struct {
  511. MConn *MConnection
  512. Bytes ByteSlice
  513. }
  514. //-----------------------------------------------------------------------------
  515. // Convenience struct for writing typed messages.
  516. // Reading requires a custom decoder that switches on the first type byte of a ByteSlice.
  517. type TypedMessage struct {
  518. Type Byte
  519. Msg Binary
  520. }
  521. func (tm TypedMessage) WriteTo(w io.Writer) (n int64, err error) {
  522. n, err = WriteTo(tm.Type, w, n, err)
  523. n, err = WriteTo(tm.Msg, w, n, err)
  524. return
  525. }
  526. func (tm TypedMessage) String() string {
  527. return fmt.Sprintf("<%X:%v>", tm.Type, tm.Msg)
  528. }
  529. func (tm TypedMessage) Bytes() ByteSlice {
  530. return BinaryBytes(tm)
  531. }