You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

592 lines
14 KiB

11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. flow "code.google.com/p/mxk/go1/flowcontrol"
  12. "github.com/op/go-logging"
  13. . "github.com/tendermint/tendermint/binary"
  14. . "github.com/tendermint/tendermint/common"
  15. )
  16. const (
  17. numBatchPackets = 10
  18. minReadBufferSize = 1024
  19. minWriteBufferSize = 1024
  20. flushThrottleMS = 50
  21. idleTimeoutMinutes = 5
  22. updateStatsSeconds = 2
  23. pingTimeoutMinutes = 2
  24. defaultSendRate = 51200 // 5Kb/s
  25. defaultRecvRate = 51200 // 5Kb/s
  26. defaultSendQueueCapacity = 1
  27. defaultRecvBufferCapacity = 4096
  28. )
  29. type receiveCbFunc func(chId byte, msgBytes []byte)
  30. type errorCbFunc func(interface{})
  31. /*
  32. A MConnection wraps a network connection and handles buffering and multiplexing.
  33. Binary messages are sent with ".Send(channelId, msg)".
  34. Inbound message bytes are handled with an onReceive callback function.
  35. */
  36. type MConnection struct {
  37. conn net.Conn
  38. bufReader *bufio.Reader
  39. bufWriter *bufio.Writer
  40. sendMonitor *flow.Monitor
  41. recvMonitor *flow.Monitor
  42. sendRate int64
  43. recvRate int64
  44. flushTimer *ThrottleTimer // flush writes as necessary but throttled.
  45. send chan struct{}
  46. quit chan struct{}
  47. pingTimer *RepeatTimer // send pings periodically
  48. pong chan struct{}
  49. chStatsTimer *RepeatTimer // update channel stats periodically
  50. channels []*Channel
  51. channelsIdx map[byte]*Channel
  52. onReceive receiveCbFunc
  53. onError errorCbFunc
  54. started uint32
  55. stopped uint32
  56. errored uint32
  57. LocalAddress *NetAddress
  58. RemoteAddress *NetAddress
  59. }
  60. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  61. mconn := &MConnection{
  62. conn: conn,
  63. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  64. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  65. sendMonitor: flow.New(0, 0),
  66. recvMonitor: flow.New(0, 0),
  67. sendRate: defaultSendRate,
  68. recvRate: defaultRecvRate,
  69. flushTimer: NewThrottleTimer(flushThrottleMS * time.Millisecond),
  70. send: make(chan struct{}, 1),
  71. quit: make(chan struct{}),
  72. pingTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute),
  73. pong: make(chan struct{}),
  74. chStatsTimer: NewRepeatTimer(updateStatsSeconds * time.Second),
  75. onReceive: onReceive,
  76. onError: onError,
  77. LocalAddress: NewNetAddress(conn.LocalAddr()),
  78. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  79. }
  80. // Create channels
  81. var channelsIdx = map[byte]*Channel{}
  82. var channels = []*Channel{}
  83. for _, desc := range chDescs {
  84. channel := newChannel(mconn, desc)
  85. channelsIdx[channel.id] = channel
  86. channels = append(channels, channel)
  87. }
  88. mconn.channels = channels
  89. mconn.channelsIdx = channelsIdx
  90. return mconn
  91. }
  92. // .Start() begins multiplexing packets to and from "channels".
  93. func (c *MConnection) Start() {
  94. if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
  95. log.Debug("Starting %v", c)
  96. go c.sendRoutine()
  97. go c.recvRoutine()
  98. }
  99. }
  100. func (c *MConnection) Stop() {
  101. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  102. log.Debug("Stopping %v", c)
  103. close(c.quit)
  104. c.conn.Close()
  105. c.flushTimer.Stop()
  106. c.chStatsTimer.Stop()
  107. c.pingTimer.Stop()
  108. // We can't close pong safely here because
  109. // recvRoutine may write to it after we've stopped.
  110. // Though it doesn't need to get closed at all,
  111. // we close it @ recvRoutine.
  112. // close(c.pong)
  113. }
  114. }
  115. func (c *MConnection) String() string {
  116. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  117. }
  118. func (c *MConnection) flush() {
  119. err := c.bufWriter.Flush()
  120. if err != nil {
  121. if atomic.LoadUint32(&c.stopped) != 1 {
  122. log.Warning("MConnection flush failed: %v", err)
  123. }
  124. }
  125. }
  126. // Catch panics, usually caused by remote disconnects.
  127. func (c *MConnection) _recover() {
  128. if r := recover(); r != nil {
  129. stack := debug.Stack()
  130. err := StackError{r, stack}
  131. c.stopForError(err)
  132. }
  133. }
  134. func (c *MConnection) stopForError(r interface{}) {
  135. c.Stop()
  136. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  137. if c.onError != nil {
  138. c.onError(r)
  139. }
  140. }
  141. }
  142. // Queues a message to be sent to channel.
  143. func (c *MConnection) Send(chId byte, msg Binary) bool {
  144. if atomic.LoadUint32(&c.stopped) == 1 {
  145. return false
  146. }
  147. log.Debug("[%X][%v] Send: %v", chId, c, msg)
  148. // Send message to channel.
  149. channel, ok := c.channelsIdx[chId]
  150. if !ok {
  151. log.Error("Cannot send bytes, unknown channel %X", chId)
  152. return false
  153. }
  154. channel.sendBytes(BinaryBytes(msg))
  155. // Wake up sendRoutine if necessary
  156. select {
  157. case c.send <- struct{}{}:
  158. default:
  159. }
  160. return true
  161. }
  162. // Queues a message to be sent to channel.
  163. // Nonblocking, returns true if successful.
  164. func (c *MConnection) TrySend(chId byte, msg Binary) bool {
  165. if atomic.LoadUint32(&c.stopped) == 1 {
  166. return false
  167. }
  168. log.Debug("[%X][%v] TrySend: %v", chId, c, msg)
  169. // Send message to channel.
  170. channel, ok := c.channelsIdx[chId]
  171. if !ok {
  172. log.Error("Cannot send bytes, unknown channel %X", chId)
  173. return false
  174. }
  175. ok = channel.trySendBytes(BinaryBytes(msg))
  176. if ok {
  177. // Wake up sendRoutine if necessary
  178. select {
  179. case c.send <- struct{}{}:
  180. default:
  181. }
  182. }
  183. return ok
  184. }
  185. func (c *MConnection) CanSend(chId byte) bool {
  186. if atomic.LoadUint32(&c.stopped) == 1 {
  187. return false
  188. }
  189. channel, ok := c.channelsIdx[chId]
  190. if !ok {
  191. log.Error("Unknown channel %X", chId)
  192. return false
  193. }
  194. return channel.canSend()
  195. }
  196. // sendRoutine polls for packets to send from channels.
  197. func (c *MConnection) sendRoutine() {
  198. defer c._recover()
  199. FOR_LOOP:
  200. for {
  201. var n int64
  202. var err error
  203. select {
  204. case <-c.flushTimer.Ch:
  205. // NOTE: flushTimer.Set() must be called every time
  206. // something is written to .bufWriter.
  207. c.flush()
  208. case <-c.chStatsTimer.Ch:
  209. for _, channel := range c.channels {
  210. channel.updateStats()
  211. }
  212. case <-c.pingTimer.Ch:
  213. WriteByte(c.bufWriter, packetTypePing, &n, &err)
  214. c.sendMonitor.Update(int(n))
  215. c.flush()
  216. case <-c.pong:
  217. WriteByte(c.bufWriter, packetTypePong, &n, &err)
  218. c.sendMonitor.Update(int(n))
  219. c.flush()
  220. case <-c.quit:
  221. break FOR_LOOP
  222. case <-c.send:
  223. // Send some packets
  224. eof := c.sendSomePackets()
  225. if !eof {
  226. // Keep sendRoutine awake.
  227. select {
  228. case c.send <- struct{}{}:
  229. default:
  230. }
  231. }
  232. }
  233. if atomic.LoadUint32(&c.stopped) == 1 {
  234. break FOR_LOOP
  235. }
  236. if err != nil {
  237. log.Info("%v failed @ sendRoutine:\n%v", c, err)
  238. c.Stop()
  239. break FOR_LOOP
  240. }
  241. }
  242. // Cleanup
  243. }
  244. // Returns true if messages from channels were exhausted.
  245. // Blocks in accordance to .sendMonitor throttling.
  246. func (c *MConnection) sendSomePackets() bool {
  247. // Block until .sendMonitor says we can write.
  248. // Once we're ready we send more than we asked for,
  249. // but amortized it should even out.
  250. c.sendMonitor.Limit(maxPacketSize, atomic.LoadInt64(&c.sendRate), true)
  251. // Now send some packets.
  252. for i := 0; i < numBatchPackets; i++ {
  253. if c.sendPacket() {
  254. return true
  255. }
  256. }
  257. return false
  258. }
  259. // Returns true if messages from channels were exhausted.
  260. func (c *MConnection) sendPacket() bool {
  261. // Choose a channel to create a packet from.
  262. // The chosen channel will be the one whose recentlySent/priority is the least.
  263. var leastRatio float32 = math.MaxFloat32
  264. var leastChannel *Channel
  265. for _, channel := range c.channels {
  266. // If nothing to send, skip this channel
  267. if !channel.isSendPending() {
  268. continue
  269. }
  270. // Get ratio, and keep track of lowest ratio.
  271. ratio := float32(channel.recentlySent) / float32(channel.priority)
  272. if ratio < leastRatio {
  273. leastRatio = ratio
  274. leastChannel = channel
  275. }
  276. }
  277. // Nothing to send?
  278. if leastChannel == nil {
  279. return true
  280. } else {
  281. // log.Debug("Found a packet to send")
  282. }
  283. // Make & send a packet from this channel
  284. n, err := leastChannel.writePacketTo(c.bufWriter)
  285. if err != nil {
  286. log.Warning("Failed to write packet. Error: %v", err)
  287. c.stopForError(err)
  288. return true
  289. }
  290. c.sendMonitor.Update(int(n))
  291. c.flushTimer.Set()
  292. return false
  293. }
  294. // recvRoutine reads packets and reconstructs the message using the channels' "recving" buffer.
  295. // After a whole message has been assembled, it's pushed to onReceive().
  296. // Blocks depending on how the connection is throttled.
  297. func (c *MConnection) recvRoutine() {
  298. defer c._recover()
  299. FOR_LOOP:
  300. for {
  301. // Block until .recvMonitor says we can read.
  302. c.recvMonitor.Limit(maxPacketSize, atomic.LoadInt64(&c.recvRate), true)
  303. // Read packet type
  304. var n int64
  305. var err error
  306. pktType := ReadByte(c.bufReader, &n, &err)
  307. c.recvMonitor.Update(int(n))
  308. if err != nil {
  309. if atomic.LoadUint32(&c.stopped) != 1 {
  310. log.Info("%v failed @ recvRoutine with err: %v", c, err)
  311. c.Stop()
  312. }
  313. break FOR_LOOP
  314. }
  315. // Peek into bufReader for debugging
  316. if log.IsEnabledFor(logging.DEBUG) {
  317. numBytes := c.bufReader.Buffered()
  318. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  319. if err != nil {
  320. log.Debug("recvRoutine packet type %X, peeked: %X", pktType, bytes)
  321. }
  322. }
  323. // Read more depending on packet type.
  324. switch pktType {
  325. case packetTypePing:
  326. // TODO: prevent abuse, as they cause flush()'s.
  327. c.pong <- struct{}{}
  328. case packetTypePong:
  329. // do nothing
  330. case packetTypeMessage:
  331. pkt, n, err := readPacketSafe(c.bufReader)
  332. c.recvMonitor.Update(int(n))
  333. if err != nil {
  334. if atomic.LoadUint32(&c.stopped) != 1 {
  335. log.Info("%v failed @ recvRoutine", c)
  336. c.Stop()
  337. }
  338. break FOR_LOOP
  339. }
  340. channel, ok := c.channelsIdx[pkt.ChannelId]
  341. if !ok || channel == nil {
  342. Panicf("Unknown channel %X", pkt.ChannelId)
  343. }
  344. msgBytes := channel.recvPacket(pkt)
  345. if msgBytes != nil {
  346. c.onReceive(pkt.ChannelId, msgBytes)
  347. }
  348. default:
  349. Panicf("Unknown message type %X", pktType)
  350. }
  351. // TODO: shouldn't this go in the sendRoutine?
  352. // Better to send a packet when *we* haven't sent anything for a while.
  353. c.pingTimer.Reset()
  354. }
  355. // Cleanup
  356. close(c.pong)
  357. for _ = range c.pong {
  358. // Drain
  359. }
  360. }
  361. //-----------------------------------------------------------------------------
  362. type ChannelDescriptor struct {
  363. Id byte
  364. Priority uint
  365. }
  366. // TODO: lowercase.
  367. // NOTE: not goroutine-safe.
  368. type Channel struct {
  369. conn *MConnection
  370. desc *ChannelDescriptor
  371. id byte
  372. sendQueue chan []byte
  373. sendQueueSize uint32
  374. recving []byte
  375. sending []byte
  376. priority uint
  377. recentlySent int64 // exponential moving average
  378. }
  379. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  380. if desc.Priority <= 0 {
  381. panic("Channel default priority must be a postive integer")
  382. }
  383. return &Channel{
  384. conn: conn,
  385. desc: desc,
  386. id: desc.Id,
  387. sendQueue: make(chan []byte, defaultSendQueueCapacity),
  388. recving: make([]byte, 0, defaultRecvBufferCapacity),
  389. priority: desc.Priority,
  390. }
  391. }
  392. // Queues message to send to this channel.
  393. // Goroutine-safe
  394. func (ch *Channel) sendBytes(bytes []byte) {
  395. ch.sendQueue <- bytes
  396. atomic.AddUint32(&ch.sendQueueSize, 1)
  397. }
  398. // Queues message to send to this channel.
  399. // Nonblocking, returns true if successful.
  400. // Goroutine-safe
  401. func (ch *Channel) trySendBytes(bytes []byte) bool {
  402. select {
  403. case ch.sendQueue <- bytes:
  404. atomic.AddUint32(&ch.sendQueueSize, 1)
  405. return true
  406. default:
  407. return false
  408. }
  409. }
  410. // Goroutine-safe
  411. func (ch *Channel) loadSendQueueSize() (size int) {
  412. return int(atomic.LoadUint32(&ch.sendQueueSize))
  413. }
  414. // Goroutine-safe
  415. // Use only as a heuristic.
  416. func (ch *Channel) canSend() bool {
  417. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  418. }
  419. // Returns true if any packets are pending to be sent.
  420. // Call before calling nextPacket()
  421. // Goroutine-safe
  422. func (ch *Channel) isSendPending() bool {
  423. if len(ch.sending) == 0 {
  424. if len(ch.sendQueue) == 0 {
  425. return false
  426. }
  427. ch.sending = <-ch.sendQueue
  428. }
  429. return true
  430. }
  431. // Creates a new packet to send.
  432. // Not goroutine-safe
  433. func (ch *Channel) nextPacket() packet {
  434. packet := packet{}
  435. packet.ChannelId = byte(ch.id)
  436. packet.Bytes = ch.sending[:MinInt(maxPacketSize, len(ch.sending))]
  437. if len(ch.sending) <= maxPacketSize {
  438. packet.EOF = byte(0x01)
  439. ch.sending = nil
  440. atomic.AddUint32(&ch.sendQueueSize, ^uint32(0)) // decrement sendQueueSize
  441. } else {
  442. packet.EOF = byte(0x00)
  443. ch.sending = ch.sending[MinInt(maxPacketSize, len(ch.sending)):]
  444. }
  445. return packet
  446. }
  447. // Writes next packet to w.
  448. // Not goroutine-safe
  449. func (ch *Channel) writePacketTo(w io.Writer) (n int64, err error) {
  450. packet := ch.nextPacket()
  451. WriteByte(w, packetTypeMessage, &n, &err)
  452. WriteBinary(w, packet, &n, &err)
  453. if err != nil {
  454. ch.recentlySent += n
  455. }
  456. return
  457. }
  458. // Handles incoming packets. Returns a msg bytes if msg is complete.
  459. // Not goroutine-safe
  460. func (ch *Channel) recvPacket(pkt packet) []byte {
  461. ch.recving = append(ch.recving, pkt.Bytes...)
  462. if pkt.EOF == byte(0x01) {
  463. msgBytes := ch.recving
  464. ch.recving = make([]byte, 0, defaultRecvBufferCapacity)
  465. return msgBytes
  466. }
  467. return nil
  468. }
  469. // Call this periodically to update stats for throttling purposes.
  470. // Not goroutine-safe
  471. func (ch *Channel) updateStats() {
  472. // Exponential decay of stats.
  473. // TODO: optimize.
  474. ch.recentlySent = int64(float64(ch.recentlySent) * 0.5)
  475. }
  476. //-----------------------------------------------------------------------------
  477. const (
  478. maxPacketSize = 1024
  479. packetTypePing = byte(0x00)
  480. packetTypePong = byte(0x01)
  481. packetTypeMessage = byte(0x10)
  482. )
  483. // Messages in channels are chopped into smaller packets for multiplexing.
  484. type packet struct {
  485. ChannelId byte
  486. EOF byte // 1 means message ends here.
  487. Bytes []byte
  488. }
  489. func (p packet) WriteTo(w io.Writer) (n int64, err error) {
  490. WriteByte(w, p.ChannelId, &n, &err)
  491. WriteByte(w, p.EOF, &n, &err)
  492. WriteByteSlice(w, p.Bytes, &n, &err)
  493. return
  494. }
  495. func (p packet) String() string {
  496. return fmt.Sprintf("Packet{%X:%X}", p.ChannelId, p.Bytes)
  497. }
  498. func readPacketSafe(r io.Reader) (pkt packet, n int64, err error) {
  499. chId := ReadByte(r, &n, &err)
  500. eof := ReadByte(r, &n, &err)
  501. bytes := ReadByteSlice(r, &n, &err)
  502. pkt = packet{chId, eof, bytes}
  503. return
  504. }
  505. //-----------------------------------------------------------------------------
  506. // Convenience struct for writing typed messages.
  507. // Reading requires a custom decoder that switches on the first type byte of a byteslice.
  508. type TypedMessage struct {
  509. Type byte
  510. Msg Binary
  511. }
  512. func (tm TypedMessage) WriteTo(w io.Writer) (n int64, err error) {
  513. WriteByte(w, tm.Type, &n, &err)
  514. WriteBinary(w, tm.Msg, &n, &err)
  515. return
  516. }
  517. func (tm TypedMessage) String() string {
  518. return fmt.Sprintf("TMsg{%X:%v}", tm.Type, tm.Msg)
  519. }
  520. func (tm TypedMessage) Bytes() []byte {
  521. return BinaryBytes(tm)
  522. }