You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

561 lines
14 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "sync/atomic"
  9. "time"
  10. flow "code.google.com/p/mxk/go1/flowcontrol"
  11. "github.com/op/go-logging"
  12. . "github.com/tendermint/tendermint/binary"
  13. . "github.com/tendermint/tendermint/common"
  14. )
  15. const (
  16. numBatchPackets = 10
  17. minReadBufferSize = 1024
  18. minWriteBufferSize = 1024
  19. flushThrottleMS = 50
  20. idleTimeoutMinutes = 5
  21. updateStatsSeconds = 2
  22. pingTimeoutMinutes = 2
  23. defaultSendRate = 51200 // 5Kb/s
  24. defaultRecvRate = 51200 // 5Kb/s
  25. )
  26. /*
  27. A MConnection wraps a network connection and handles buffering and multiplexing.
  28. ByteSlices are sent with ".Send(channelId, bytes)".
  29. Inbound ByteSlices are pushed to the designated chan<- InboundBytes.
  30. */
  31. type MConnection struct {
  32. conn net.Conn
  33. bufReader *bufio.Reader
  34. bufWriter *bufio.Writer
  35. sendMonitor *flow.Monitor
  36. recvMonitor *flow.Monitor
  37. sendRate int64
  38. recvRate int64
  39. flushTimer *ThrottleTimer // flush writes as necessary but throttled.
  40. canSend chan struct{}
  41. quit chan struct{}
  42. pingTimer *RepeatTimer // send pings periodically
  43. pong chan struct{}
  44. chStatsTimer *RepeatTimer // update channel stats periodically
  45. channels []*Channel
  46. channelsIdx map[byte]*Channel
  47. onError func(interface{})
  48. started uint32
  49. stopped uint32
  50. errored uint32
  51. _peer *Peer // hacky optimization
  52. }
  53. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onError func(interface{})) *MConnection {
  54. mconn := &MConnection{
  55. conn: conn,
  56. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  57. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  58. sendMonitor: flow.New(0, 0),
  59. recvMonitor: flow.New(0, 0),
  60. sendRate: defaultSendRate,
  61. recvRate: defaultRecvRate,
  62. flushTimer: NewThrottleTimer(flushThrottleMS * time.Millisecond),
  63. canSend: make(chan struct{}, 1),
  64. quit: make(chan struct{}),
  65. pingTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute),
  66. pong: make(chan struct{}),
  67. chStatsTimer: NewRepeatTimer(updateStatsSeconds * time.Second),
  68. onError: onError,
  69. }
  70. // Create channels
  71. var channelsIdx = map[byte]*Channel{}
  72. var channels = []*Channel{}
  73. for _, desc := range chDescs {
  74. channel := newChannel(mconn, desc)
  75. channelsIdx[channel.id] = channel
  76. channels = append(channels, channel)
  77. }
  78. mconn.channels = channels
  79. mconn.channelsIdx = channelsIdx
  80. return mconn
  81. }
  82. // .Start() begins multiplexing packets to and from "channels".
  83. func (c *MConnection) Start() {
  84. if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
  85. log.Debug("Starting %v", c)
  86. go c.sendHandler()
  87. go c.recvHandler()
  88. }
  89. }
  90. func (c *MConnection) Stop() {
  91. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  92. log.Debug("Stopping %v", c)
  93. close(c.quit)
  94. c.conn.Close()
  95. c.flushTimer.Stop()
  96. c.chStatsTimer.Stop()
  97. c.pingTimer.Stop()
  98. // We can't close pong safely here because
  99. // recvHandler may write to it after we've stopped.
  100. // Though it doesn't need to get closed at all,
  101. // we close it @ recvHandler.
  102. // close(c.pong)
  103. }
  104. }
  105. func (c *MConnection) LocalAddress() *NetAddress {
  106. return NewNetAddress(c.conn.LocalAddr())
  107. }
  108. func (c *MConnection) RemoteAddress() *NetAddress {
  109. return NewNetAddress(c.conn.RemoteAddr())
  110. }
  111. func (c *MConnection) String() string {
  112. return fmt.Sprintf("/%v/", c.conn.RemoteAddr())
  113. }
  114. func (c *MConnection) flush() {
  115. err := c.bufWriter.Flush()
  116. if err != nil {
  117. if atomic.LoadUint32(&c.stopped) != 1 {
  118. log.Warning("MConnection flush failed: %v", err)
  119. }
  120. }
  121. }
  122. // Catch panics, usually caused by remote disconnects.
  123. func (c *MConnection) _recover() {
  124. if r := recover(); r != nil {
  125. c.stopForError(r)
  126. }
  127. }
  128. func (c *MConnection) stopForError(r interface{}) {
  129. c.Stop()
  130. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  131. if c.onError != nil {
  132. c.onError(r)
  133. }
  134. }
  135. }
  136. // Queues a message to be sent to channel.
  137. func (c *MConnection) Send(chId byte, bytes ByteSlice) bool {
  138. // Send message to channel.
  139. channel, ok := c.channelsIdx[chId]
  140. if !ok {
  141. log.Error("Cannot send bytes, unknown channel %X", chId)
  142. return false
  143. }
  144. channel.sendBytes(bytes)
  145. // Wake up sendHandler if necessary
  146. select {
  147. case c.canSend <- struct{}{}:
  148. default:
  149. }
  150. return true
  151. }
  152. // Queues a message to be sent to channel.
  153. // Nonblocking, returns true if successful.
  154. func (c *MConnection) TrySend(chId byte, bytes ByteSlice) bool {
  155. // Send message to channel.
  156. channel, ok := c.channelsIdx[chId]
  157. if !ok {
  158. log.Error("Cannot send bytes, unknown channel %X", chId)
  159. return false
  160. }
  161. ok = channel.trySendBytes(bytes)
  162. if ok {
  163. // Wake up sendHandler if necessary
  164. select {
  165. case c.canSend <- struct{}{}:
  166. default:
  167. }
  168. }
  169. return ok
  170. }
  171. // sendHandler polls for packets to send from channels.
  172. func (c *MConnection) sendHandler() {
  173. defer c._recover()
  174. FOR_LOOP:
  175. for {
  176. var err error
  177. select {
  178. case <-c.flushTimer.Ch:
  179. // NOTE: flushTimer.Set() must be called every time
  180. // something is written to .bufWriter.
  181. c.flush()
  182. case <-c.chStatsTimer.Ch:
  183. for _, channel := range c.channels {
  184. channel.updateStats()
  185. }
  186. case <-c.pingTimer.Ch:
  187. var n int64
  188. n, err = packetTypePing.WriteTo(c.bufWriter)
  189. c.sendMonitor.Update(int(n))
  190. c.flush()
  191. case <-c.pong:
  192. var n int64
  193. n, err = packetTypePong.WriteTo(c.bufWriter)
  194. c.sendMonitor.Update(int(n))
  195. c.flush()
  196. case <-c.quit:
  197. break FOR_LOOP
  198. case <-c.canSend:
  199. // Send some packets
  200. eof := c.sendSomePackets()
  201. if !eof {
  202. // Keep sendHandler awake.
  203. select {
  204. case c.canSend <- struct{}{}:
  205. default:
  206. }
  207. }
  208. }
  209. if atomic.LoadUint32(&c.stopped) == 1 {
  210. break FOR_LOOP
  211. }
  212. if err != nil {
  213. log.Info("%v failed @ sendHandler:\n%v", c, err)
  214. c.Stop()
  215. break FOR_LOOP
  216. }
  217. }
  218. // Cleanup
  219. }
  220. // Returns true if messages from channels were exhausted.
  221. // Blocks in accordance to .sendMonitor throttling.
  222. func (c *MConnection) sendSomePackets() bool {
  223. // Block until .sendMonitor says we can write.
  224. // Once we're ready we send more than we asked for,
  225. // but amortized it should even out.
  226. c.sendMonitor.Limit(maxPacketSize, atomic.LoadInt64(&c.sendRate), true)
  227. // Now send some packets.
  228. for i := 0; i < numBatchPackets; i++ {
  229. if c.sendPacket() {
  230. return true
  231. }
  232. }
  233. return false
  234. }
  235. // Returns true if messages from channels were exhausted.
  236. func (c *MConnection) sendPacket() bool {
  237. // Choose a channel to create a packet from.
  238. // The chosen channel will be the one whose recentlySent/priority is the least.
  239. var leastRatio float32 = math.MaxFloat32
  240. var leastChannel *Channel
  241. for _, channel := range c.channels {
  242. // If nothing to send, skip this channel
  243. if !channel.sendPending() {
  244. continue
  245. }
  246. // Get ratio, and keep track of lowest ratio.
  247. ratio := float32(channel.recentlySent) / float32(channel.priority)
  248. if ratio < leastRatio {
  249. leastRatio = ratio
  250. leastChannel = channel
  251. }
  252. }
  253. // Nothing to send?
  254. if leastChannel == nil {
  255. return true
  256. } else {
  257. log.Debug("Found a packet to send")
  258. }
  259. // Make & send a packet from this channel
  260. n, err := leastChannel.writePacketTo(c.bufWriter)
  261. if err != nil {
  262. log.Warning("Failed to write packet. Error: %v", err)
  263. c.stopForError(err)
  264. return true
  265. }
  266. c.sendMonitor.Update(int(n))
  267. c.flushTimer.Set()
  268. return false
  269. }
  270. // recvHandler reads packets and reconstructs the message using the channels' "recving" buffer.
  271. // After a whole message has been assembled, it's pushed to the Channel's recvQueue.
  272. // Blocks depending on how the connection is throttled.
  273. func (c *MConnection) recvHandler() {
  274. defer c._recover()
  275. FOR_LOOP:
  276. for {
  277. // Block until .recvMonitor says we can read.
  278. c.recvMonitor.Limit(maxPacketSize, atomic.LoadInt64(&c.recvRate), true)
  279. // Read packet type
  280. pktType, n, err := ReadUInt8Safe(c.bufReader)
  281. c.recvMonitor.Update(int(n))
  282. if err != nil {
  283. if atomic.LoadUint32(&c.stopped) != 1 {
  284. log.Info("%v failed @ recvHandler with err: %v", c, err)
  285. c.Stop()
  286. }
  287. break FOR_LOOP
  288. }
  289. // Peek into bufReader for debugging
  290. if log.IsEnabledFor(logging.DEBUG) {
  291. numBytes := c.bufReader.Buffered()
  292. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  293. if err != nil {
  294. log.Debug("recvHandler packet type %X, peeked: %X", pktType, bytes)
  295. }
  296. }
  297. // Read more depending on packet type.
  298. switch pktType {
  299. case packetTypePing:
  300. // TODO: prevent abuse, as they cause flush()'s.
  301. c.pong <- struct{}{}
  302. case packetTypePong:
  303. // do nothing
  304. case packetTypeMessage:
  305. pkt, n, err := readPacketSafe(c.bufReader)
  306. c.recvMonitor.Update(int(n))
  307. if err != nil {
  308. if atomic.LoadUint32(&c.stopped) != 1 {
  309. log.Info("%v failed @ recvHandler", c)
  310. c.Stop()
  311. }
  312. break FOR_LOOP
  313. }
  314. channel := c.channels[pkt.ChannelId]
  315. if channel == nil {
  316. Panicf("Unknown channel %v", pkt.ChannelId)
  317. }
  318. channel.recvPacket(pkt)
  319. default:
  320. Panicf("Unknown message type %v", pktType)
  321. }
  322. // TODO: shouldn't this go in the sendHandler?
  323. // Better to send a packet when *we* haven't sent anything for a while.
  324. c.pingTimer.Reset()
  325. }
  326. // Cleanup
  327. close(c.pong)
  328. for _ = range c.pong {
  329. // Drain
  330. }
  331. }
  332. //-----------------------------------------------------------------------------
  333. type ChannelDescriptor struct {
  334. Id byte
  335. SendQueueCapacity int // One per MConnection.
  336. RecvQueueCapacity int // Global for this channel.
  337. RecvBufferSize int
  338. DefaultPriority uint
  339. // TODO: kinda hacky.
  340. // This is created by the switch, one per channel.
  341. recvQueue chan InboundBytes
  342. }
  343. // TODO: lowercase.
  344. // NOTE: not goroutine-safe.
  345. type Channel struct {
  346. conn *MConnection
  347. desc *ChannelDescriptor
  348. id byte
  349. recvQueue chan InboundBytes
  350. sendQueue chan ByteSlice
  351. recving ByteSlice
  352. sending ByteSlice
  353. priority uint
  354. recentlySent int64 // exponential moving average
  355. }
  356. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  357. if desc.DefaultPriority <= 0 {
  358. panic("Channel default priority must be a postive integer")
  359. }
  360. return &Channel{
  361. conn: conn,
  362. desc: desc,
  363. id: desc.Id,
  364. recvQueue: desc.recvQueue,
  365. sendQueue: make(chan ByteSlice, desc.SendQueueCapacity),
  366. recving: make([]byte, 0, desc.RecvBufferSize),
  367. priority: desc.DefaultPriority,
  368. }
  369. }
  370. // Queues message to send to this channel.
  371. func (ch *Channel) sendBytes(bytes ByteSlice) {
  372. ch.sendQueue <- bytes
  373. }
  374. // Queues message to send to this channel.
  375. // Nonblocking, returns true if successful.
  376. func (ch *Channel) trySendBytes(bytes ByteSlice) bool {
  377. select {
  378. case ch.sendQueue <- bytes:
  379. return true
  380. default:
  381. return false
  382. }
  383. }
  384. // Returns true if any packets are pending to be sent.
  385. func (ch *Channel) sendPending() bool {
  386. if len(ch.sending) == 0 {
  387. if len(ch.sendQueue) == 0 {
  388. return false
  389. }
  390. ch.sending = <-ch.sendQueue
  391. }
  392. return true
  393. }
  394. // Creates a new packet to send.
  395. func (ch *Channel) nextPacket() packet {
  396. packet := packet{}
  397. packet.ChannelId = Byte(ch.id)
  398. packet.Bytes = ch.sending[:MinInt(maxPacketSize, len(ch.sending))]
  399. if len(ch.sending) <= maxPacketSize {
  400. packet.EOF = Byte(0x01)
  401. ch.sending = nil
  402. } else {
  403. packet.EOF = Byte(0x00)
  404. ch.sending = ch.sending[MinInt(maxPacketSize, len(ch.sending)):]
  405. }
  406. return packet
  407. }
  408. // Writes next packet to w.
  409. func (ch *Channel) writePacketTo(w io.Writer) (n int64, err error) {
  410. packet := ch.nextPacket()
  411. n, err = WriteTo(packetTypeMessage, w, n, err)
  412. n, err = WriteTo(packet, w, n, err)
  413. if err != nil {
  414. ch.recentlySent += n
  415. }
  416. return
  417. }
  418. // Handles incoming packets.
  419. func (ch *Channel) recvPacket(pkt packet) {
  420. ch.recving = append(ch.recving, pkt.Bytes...)
  421. if pkt.EOF == Byte(0x01) {
  422. ch.recvQueue <- InboundBytes{ch.conn, ch.recving}
  423. ch.recving = make([]byte, 0, ch.desc.RecvBufferSize)
  424. }
  425. }
  426. // Call this periodically to update stats for throttling purposes.
  427. func (ch *Channel) updateStats() {
  428. // Exponential decay of stats.
  429. // TODO: optimize.
  430. ch.recentlySent = int64(float64(ch.recentlySent) * 0.5)
  431. }
  432. //-----------------------------------------------------------------------------
  433. const (
  434. maxPacketSize = 1024
  435. packetTypePing = UInt8(0x00)
  436. packetTypePong = UInt8(0x01)
  437. packetTypeMessage = UInt8(0x10)
  438. )
  439. // Messages in channels are chopped into smaller packets for multiplexing.
  440. type packet struct {
  441. ChannelId Byte
  442. EOF Byte // 1 means message ends here.
  443. Bytes ByteSlice
  444. }
  445. func (p packet) WriteTo(w io.Writer) (n int64, err error) {
  446. n, err = WriteTo(p.ChannelId, w, n, err)
  447. n, err = WriteTo(p.EOF, w, n, err)
  448. n, err = WriteTo(p.Bytes, w, n, err)
  449. return
  450. }
  451. func (p packet) String() string {
  452. return fmt.Sprintf("%v:%X", p.ChannelId, p.Bytes)
  453. }
  454. func readPacketSafe(r io.Reader) (pkt packet, n int64, err error) {
  455. chId, n_, err := ReadByteSafe(r)
  456. n += n_
  457. if err != nil {
  458. return
  459. }
  460. eof, n_, err := ReadByteSafe(r)
  461. n += n_
  462. if err != nil {
  463. return
  464. }
  465. // TODO: packet length sanity check.
  466. bytes, n_, err := ReadByteSliceSafe(r)
  467. n += n_
  468. if err != nil {
  469. return
  470. }
  471. return packet{chId, eof, bytes}, n, nil
  472. }
  473. //-----------------------------------------------------------------------------
  474. type InboundBytes struct {
  475. MConn *MConnection
  476. Bytes ByteSlice
  477. }
  478. //-----------------------------------------------------------------------------
  479. // Convenience struct for writing typed messages.
  480. // Reading requires a custom decoder that switches on the first type byte of a ByteSlice.
  481. type TypedMessage struct {
  482. Type Byte
  483. Msg Binary
  484. }
  485. func (tm TypedMessage) WriteTo(w io.Writer) (n int64, err error) {
  486. n, err = WriteTo(tm.Type, w, n, err)
  487. n, err = WriteTo(tm.Msg, w, n, err)
  488. return
  489. }
  490. func (tm TypedMessage) String() string {
  491. return fmt.Sprintf("<%X:%v>", tm.Type, tm.Msg)
  492. }
  493. func (tm TypedMessage) Bytes() ByteSlice {
  494. return BinaryBytes(tm)
  495. }